Skip to content

Consuming Streams

A Stream is a description — it does nothing until you run it. The run* family are the destructors that pull values through the pipeline and hand you back an Effect. Which one you choose depends on what you want out the other end: every element, all elements collected, an accumulated total, or just the side effects.

import { Effect, Stream } from "effect"
interface Order {
readonly id: string
readonly grandTotalCents: number
readonly priority: "normal" | "high"
}
const orders: Stream.Stream<Order> = Stream.make(
{ id: "ord_1", grandTotalCents: 4_500, priority: "normal" },
{ id: "ord_2", grandTotalCents: 28_000, priority: "high" },
{ id: "ord_3", grandTotalCents: 9_900, priority: "normal" }
)
// `runForEach` runs an effectful consumer for *every* element and discards the
// elements. Use it when the value of running is the side effect (logging,
// writing, publishing). It returns Effect<void, E, R>.
export const logEach = orders.pipe(
Stream.runForEach((order) =>
Effect.logInfo(`order ${order.id}: $${(order.grandTotalCents / 100).toFixed(2)}`)
)
)

These three cover the bulk of consumption. Pick based on the shape of the result you need:

import { Effect, Stream } from "effect"
const totals: Stream.Stream<number> = Stream.make(4_500, 28_000, 9_900)
// `runCollect` materialises every element into an immutable array.
// Only use it when the stream is bounded and fits comfortably in memory.
export const collected: Effect.Effect<Array<number>> = Stream.runCollect(totals)
// `runDrain` runs the stream purely for its effects and throws the output away.
// It is the right choice for infinite or very large streams whose work happens
// in upstream `tap`/`mapEffect` operators.
export const drained: Effect.Effect<void> = Stream.runDrain(totals)
// `runFold` reduces the stream to a single accumulated value, pulling one
// element at a time — constant memory regardless of stream length. The initial
// state is a thunk (`() => 0`) so it can be re-evaluated safely.
export const sum: Effect.Effect<number> = totals.pipe(
Stream.runFold(() => 0, (acc, cents) => acc + cents)
)

runFold is the streaming equivalent of Array.reduce, but it never holds the whole stream in memory — it threads the accumulator through as each element arrives. For an effectful reducer (e.g. one that writes to a database per step), use Stream.runFoldEffect.

When you only care about the first or last element, or a simple count, there are dedicated destructors that short-circuit appropriately:

import { Effect, Option, Stream } from "effect"
const orders = Stream.make(
{ id: "ord_1", priority: "normal" as const },
{ id: "ord_2", priority: "high" as const },
{ id: "ord_3", priority: "high" as const }
)
// `runHead` pulls only the first element (then stops the stream) as an Option.
export const first: Effect.Effect<Option.Option<{ id: string }>> =
Stream.runHead(orders)
// `runLast` returns the final element as an Option.
export const last = Stream.runLast(orders)
// `runCount` counts elements without retaining them.
export const howMany: Effect.Effect<number> = Stream.runCount(orders)
// `runForEachWhile` consumes until the predicate returns `false`, letting you
// stop early based on the data itself.
export const untilHigh = orders.pipe(
Stream.runForEachWhile((order) =>
Effect.succeed(order.priority !== "high")
)
)

For anything more elaborate than the built-in destructors — collecting the first N elements, summing while leaving leftovers, fanning out to a queue — describe the consumption as a Sink and run it with Stream.run:

import { Effect, Sink, Stream } from "effect"
const cents = Stream.make(4_500, 28_000, 9_900)
// `Stream.run` consumes the stream with any Sink. `Sink.sum` adds up the
// numbers; the result is an Effect<number>.
export const total: Effect.Effect<number> = cents.pipe(Stream.run(Sink.sum))

run is the most general destructor — runCollect, runFold, and the rest are really specialised sinks. Reach for an explicit Sink when you want to reuse a consumption strategy or combine several. The full vocabulary lives on the Sink page.