Skip to content

Sink

A Sink<A, In, L, E, R> is the consumer side of streaming. Where a Stream describes how values are produced, a Sink describes how they are consumed: it pulls In elements, may fail with E, requires services R, and produces a result A once it is done — optionally returning L leftover elements it did not use. You run a stream through a sink with Stream.run, and every run* destructor you have already seen (runCollect, runFold, …) is really a specialised sink underneath.

┌─── result produced by the Sink
│ ┌─── elements consumed by the Sink
│ │ ┌─── leftover elements
│ │ │ ┌─── possible errors
│ │ │ │ ┌─── required services
▼ ▼ ▼ ▼ ▼
Sink<A, In, L, E, R>
import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.take(2)` consumes exactly two elements and returns them as an array.
// Its type is `Sink<Array<number>, number, number>` — it produces an array,
// consumes numbers, and reports the unused elements ([3, 4, 5]) as leftovers.
const firstTwo = Sink.take<number>(2)
// `Stream.run` feeds the stream into the sink and yields the sink's result.
export const result: Effect.Effect<Array<number>> = numbers.pipe(
Stream.run(firstTwo)
)
// => [1, 2]

The Sink module ships the consumers you reach for most often. Each is a value you can pass to Stream.run:

import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.collect()` gathers every element into an array.
export const all = numbers.pipe(Stream.run(Sink.collect<number>()))
// => [1, 2, 3, 4, 5]
// `Sink.sum` and `Sink.count` aggregate without retaining elements.
export const total = numbers.pipe(Stream.run(Sink.sum)) // => 15
export const howMany = numbers.pipe(Stream.run(Sink.count)) // => 5
// `Sink.head()` / `Sink.last()` capture an edge element as an Option.
export const first = numbers.pipe(Stream.run(Sink.head<number>()))
// `Sink.forEach` runs an effect per element and produces void — a streaming
// equivalent of `Stream.runForEach`.
export const logged = numbers.pipe(
Stream.run(Sink.forEach((n) => Effect.logInfo(`got ${n}`)))
)
// `Sink.drain` consumes and discards everything, producing void.
export const drained = numbers.pipe(Stream.run(Sink.drain))

Like a stream, a sink can fold elements into a single accumulated value — and, crucially, it can stop early. Sink.fold takes an initial state, a while predicate, and a step function; it keeps consuming only while the predicate holds. Elements pulled past the stopping point come back as leftovers.

import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5, 6)
// Sum elements while the running total stays below 10. As soon as it reaches 10
// the sink completes; remaining input becomes leftover.
const sumUntilTen = Sink.fold(
() => 0, // initial state (a thunk so it can be re-evaluated safely)
(total) => total < 10, // keep going while this holds
(total, n: number) => Effect.succeed(total + n) // effectful step
)
export const partialSum = numbers.pipe(Stream.run(sumUntilTen))

Sink.reduce is the simpler, non-effectful cousin for folding without a continuation predicate, and Sink.takeWhile collects the longest prefix matching a predicate. For batch-level folding over each pulled array there is Sink.foldArray.

Sinks compose. You can map their result, adapt their input, recover from failures, and even sequence one after another.

import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.map` transforms the *result* once the sink completes.
const average = Sink.collect<number>().pipe(
Sink.map((arr) => arr.length === 0 ? 0 : arr.reduce((a, b) => a + b, 0) / arr.length)
)
// `Sink.mapInput` adapts the *input* side, letting a `Sink<_, number>` consume
// a stream of strings by parsing first.
const sumOfParsed = Sink.sum.pipe(
Sink.mapInput((s: string) => Number(s))
)
// `Sink.flatMap` runs a second sink after the first, using the first result to
// decide what to do next. Here we take 2 elements, then drain the rest.
const takeThenDrain = Sink.take<number>(2).pipe(
Sink.flatMap((firstTwo) =>
Sink.drain.pipe(Sink.map(() => firstTwo))
)
)
export const avg = numbers.pipe(Stream.run(average))
export const head2 = numbers.pipe(Stream.run(takeThenDrain))

Sink.orElse switches to a fallback sink if the first one fails, and Sink.catchCause recovers based on the full Cause — the same error-handling shape you use for streams and effects.

When the built-ins do not fit, Sink.make<In>() lets you describe a consumer as a pipeline over a Stream<In>. The final step must return an Effect; its success value becomes the sink’s result. This is how you turn any stream-shaped computation into a reusable consumer.

import { Effect, Sink, Stream } from "effect"
// A custom sink that collects elements and returns both the count and the sum,
// expressed as a small stream pipeline ending in an Effect.
const countAndSum = Sink.make<number>()((stream: Stream.Stream<number>) =>
stream.pipe(
Stream.runFold(
() => ({ count: 0, sum: 0 }),
(acc, n) => ({ count: acc.count + 1, sum: acc.sum + n })
)
)
)
export const summary = Stream.make(10, 20, 30).pipe(Stream.run(countAndSum))
// => { count: 3, sum: 60 }

Because a sink is a first-class value, you can name it, test it in isolation, and reuse it across many streams — the same way you reuse an Effect. Reach for an explicit sink whenever a consumption strategy is worth a name; for one-off consumption, the run* destructors are the shorter path.