Consuming Streams
A Stream is a description — it does nothing until you run it. The run*
family are the destructors that pull values through the pipeline and hand you
back an Effect. Which one you choose depends on what you want out the other
end: every element, all elements collected, an accumulated total, or just the
side effects.
import { Effect, Stream } from "effect"
interface Order { readonly id: string readonly grandTotalCents: number readonly priority: "normal" | "high"}
const orders: Stream.Stream<Order> = Stream.make( { id: "ord_1", grandTotalCents: 4_500, priority: "normal" }, { id: "ord_2", grandTotalCents: 28_000, priority: "high" }, { id: "ord_3", grandTotalCents: 9_900, priority: "normal" })
// `runForEach` runs an effectful consumer for *every* element and discards the// elements. Use it when the value of running is the side effect (logging,// writing, publishing). It returns Effect<void, E, R>.export const logEach = orders.pipe( Stream.runForEach((order) => Effect.logInfo(`order ${order.id}: $${(order.grandTotalCents / 100).toFixed(2)}`) ))runCollect, runDrain, runFold
Section titled “runCollect, runDrain, runFold”These three cover the bulk of consumption. Pick based on the shape of the result you need:
import { Effect, Stream } from "effect"
const totals: Stream.Stream<number> = Stream.make(4_500, 28_000, 9_900)
// `runCollect` materialises every element into an immutable array.// Only use it when the stream is bounded and fits comfortably in memory.export const collected: Effect.Effect<Array<number>> = Stream.runCollect(totals)
// `runDrain` runs the stream purely for its effects and throws the output away.// It is the right choice for infinite or very large streams whose work happens// in upstream `tap`/`mapEffect` operators.export const drained: Effect.Effect<void> = Stream.runDrain(totals)
// `runFold` reduces the stream to a single accumulated value, pulling one// element at a time — constant memory regardless of stream length. The initial// state is a thunk (`() => 0`) so it can be re-evaluated safely.export const sum: Effect.Effect<number> = totals.pipe( Stream.runFold(() => 0, (acc, cents) => acc + cents))runFold is the streaming equivalent of Array.reduce, but it never holds the
whole stream in memory — it threads the accumulator through as each element
arrives. For an effectful reducer (e.g. one that writes to a database per
step), use Stream.runFoldEffect.
Edge elements and counting
Section titled “Edge elements and counting”When you only care about the first or last element, or a simple count, there are dedicated destructors that short-circuit appropriately:
import { Effect, Option, Stream } from "effect"
const orders = Stream.make( { id: "ord_1", priority: "normal" as const }, { id: "ord_2", priority: "high" as const }, { id: "ord_3", priority: "high" as const })
// `runHead` pulls only the first element (then stops the stream) as an Option.export const first: Effect.Effect<Option.Option<{ id: string }>> = Stream.runHead(orders)
// `runLast` returns the final element as an Option.export const last = Stream.runLast(orders)
// `runCount` counts elements without retaining them.export const howMany: Effect.Effect<number> = Stream.runCount(orders)
// `runForEachWhile` consumes until the predicate returns `false`, letting you// stop early based on the data itself.export const untilHigh = orders.pipe( Stream.runForEachWhile((order) => Effect.succeed(order.priority !== "high") ))Consuming through a Sink
Section titled “Consuming through a Sink”For anything more elaborate than the built-in destructors — collecting the first
N elements, summing while leaving leftovers, fanning out to a queue — describe
the consumption as a Sink and run it with Stream.run:
import { Effect, Sink, Stream } from "effect"
const cents = Stream.make(4_500, 28_000, 9_900)
// `Stream.run` consumes the stream with any Sink. `Sink.sum` adds up the// numbers; the result is an Effect<number>.export const total: Effect.Effect<number> = cents.pipe(Stream.run(Sink.sum))run is the most general destructor — runCollect, runFold, and the rest are
really specialised sinks. Reach for an explicit Sink when you want to reuse a
consumption strategy or combine several. The full vocabulary lives on the
Sink page.