Sink
A Sink<A, In, L, E, R> is the consumer side of streaming. Where a Stream
describes how values are produced, a Sink describes how they are consumed: it
pulls In elements, may fail with E, requires services R, and produces a
result A once it is done — optionally returning L leftover elements it did
not use. You run a stream through a sink with Stream.run, and every run*
destructor you have already seen (runCollect, runFold, …) is really a
specialised sink underneath.
┌─── result produced by the Sink │ ┌─── elements consumed by the Sink │ │ ┌─── leftover elements │ │ │ ┌─── possible errors │ │ │ │ ┌─── required services ▼ ▼ ▼ ▼ ▼ Sink<A, In, L, E, R>import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.take(2)` consumes exactly two elements and returns them as an array.// Its type is `Sink<Array<number>, number, number>` — it produces an array,// consumes numbers, and reports the unused elements ([3, 4, 5]) as leftovers.const firstTwo = Sink.take<number>(2)
// `Stream.run` feeds the stream into the sink and yields the sink's result.export const result: Effect.Effect<Array<number>> = numbers.pipe( Stream.run(firstTwo))// => [1, 2]Built-in sinks
Section titled “Built-in sinks”The Sink module ships the consumers you reach for most often. Each is a value
you can pass to Stream.run:
import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.collect()` gathers every element into an array.export const all = numbers.pipe(Stream.run(Sink.collect<number>()))// => [1, 2, 3, 4, 5]
// `Sink.sum` and `Sink.count` aggregate without retaining elements.export const total = numbers.pipe(Stream.run(Sink.sum)) // => 15export const howMany = numbers.pipe(Stream.run(Sink.count)) // => 5
// `Sink.head()` / `Sink.last()` capture an edge element as an Option.export const first = numbers.pipe(Stream.run(Sink.head<number>()))
// `Sink.forEach` runs an effect per element and produces void — a streaming// equivalent of `Stream.runForEach`.export const logged = numbers.pipe( Stream.run(Sink.forEach((n) => Effect.logInfo(`got ${n}`))))
// `Sink.drain` consumes and discards everything, producing void.export const drained = numbers.pipe(Stream.run(Sink.drain))Folding and early termination
Section titled “Folding and early termination”Like a stream, a sink can fold elements into a single accumulated value — and,
crucially, it can stop early. Sink.fold takes an initial state, a while
predicate, and a step function; it keeps consuming only while the predicate holds.
Elements pulled past the stopping point come back as leftovers.
import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5, 6)
// Sum elements while the running total stays below 10. As soon as it reaches 10// the sink completes; remaining input becomes leftover.const sumUntilTen = Sink.fold( () => 0, // initial state (a thunk so it can be re-evaluated safely) (total) => total < 10, // keep going while this holds (total, n: number) => Effect.succeed(total + n) // effectful step)
export const partialSum = numbers.pipe(Stream.run(sumUntilTen))Sink.reduce is the simpler, non-effectful cousin for folding without a
continuation predicate, and Sink.takeWhile collects the longest prefix matching
a predicate. For batch-level folding over each pulled array there is
Sink.foldArray.
Transforming sinks
Section titled “Transforming sinks”Sinks compose. You can map their result, adapt their input, recover from failures, and even sequence one after another.
import { Effect, Sink, Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
// `Sink.map` transforms the *result* once the sink completes.const average = Sink.collect<number>().pipe( Sink.map((arr) => arr.length === 0 ? 0 : arr.reduce((a, b) => a + b, 0) / arr.length))
// `Sink.mapInput` adapts the *input* side, letting a `Sink<_, number>` consume// a stream of strings by parsing first.const sumOfParsed = Sink.sum.pipe( Sink.mapInput((s: string) => Number(s)))
// `Sink.flatMap` runs a second sink after the first, using the first result to// decide what to do next. Here we take 2 elements, then drain the rest.const takeThenDrain = Sink.take<number>(2).pipe( Sink.flatMap((firstTwo) => Sink.drain.pipe(Sink.map(() => firstTwo)) ))
export const avg = numbers.pipe(Stream.run(average))export const head2 = numbers.pipe(Stream.run(takeThenDrain))Sink.orElse switches to a fallback sink if the first one fails, and
Sink.catchCause recovers based on the full Cause — the same error-handling
shape you use for streams and
effects.
Building a custom sink
Section titled “Building a custom sink”When the built-ins do not fit, Sink.make<In>() lets you describe a consumer as
a pipeline over a Stream<In>. The final step must return an Effect; its
success value becomes the sink’s result. This is how you turn any stream-shaped
computation into a reusable consumer.
import { Effect, Sink, Stream } from "effect"
// A custom sink that collects elements and returns both the count and the sum,// expressed as a small stream pipeline ending in an Effect.const countAndSum = Sink.make<number>()((stream: Stream.Stream<number>) => stream.pipe( Stream.runFold( () => ({ count: 0, sum: 0 }), (acc, n) => ({ count: acc.count + 1, sum: acc.sum + n }) ) ))
export const summary = Stream.make(10, 20, 30).pipe(Stream.run(countAndSum))// => { count: 3, sum: 60 }Because a sink is a first-class value, you can name it, test it in isolation, and
reuse it across many streams — the same way you reuse an Effect. Reach for an
explicit sink whenever a consumption strategy is worth a name; for one-off
consumption, the run* destructors are the
shorter path.