Concurrency, Merging & Zipping
A single stream models one source of values over time. Real programs usually
have several sources — two WebSocket feeds, a poll loop plus a manual refresh
button, a producer fanning out to many consumers. The Stream module gives you
a vocabulary for combining streams, and the way it combines them controls
timing:
- Merge interleaves elements from both streams as they arrive — order is determined by which source emits first.
- Concatenation (
Stream.concat) runs the first stream to completion, then the second — strictly sequential. See Transforming Streams forconcat. - Zip advances both streams in lockstep, pairing the Nth element of one with the Nth element of the other and stopping when either runs dry.
- Latest-wins zipping (
zipLatest) re-emits whenever either side produces a new value, pairing it with the most recent value seen from the other side. - Race runs streams concurrently but keeps only the first one to emit.
- Cross is the cartesian product: every element of the left paired with every element of the right.
Anywhere a combinator runs multiple inner streams concurrently it accepts the
shared concurrency options shape ({ concurrency, bufferSize }); see
Concurrency Options for the full shape.
Common case: merging two event sources
Section titled “Common case: merging two event sources”The most common multi-stream task is consuming two independent sources at once,
emitting from whichever fires first. Stream.merge runs both concurrently and
interleaves their output:
import { Console, Effect, Schedule, Stream } from "effect"
// Two independent sources producing at different rates.const fast = Stream.make("a", "b", "c").pipe( Stream.schedule(Schedule.spaced("10 millis")))const slow = Stream.make("X", "Y").pipe( Stream.schedule(Schedule.spaced("25 millis")))
const program = Effect.gen(function* () { const result = yield* Stream.merge(fast, slow).pipe(Stream.runCollect) yield* Console.log(result) // => ["a", "b", "X", "c", "Y"] (interleaved by arrival time)})
Effect.runPromise(program)By default a merged stream ends only when both sides end. Pass a
haltStrategy to change that — "left" ends when the left side ends, "right"
when the right does, "either" as soon as one ends, "both" (the default) when
both have ended:
import { Effect, Stream } from "effect"
const left = Stream.make(1, 2, 3)const right = Stream.make(10, 20, 30, 40, 50)
const program = Stream.merge(left, right, { haltStrategy: "left" }).pipe( Stream.runCollect)// stops once `left` is exhausted, dropping the rest of `right`Common case: fanning out with broadcast
Section titled “Common case: fanning out with broadcast”The other everyday need is the inverse: one upstream, many consumers, each
seeing every element. Stream.broadcastN splits a stream into a fixed-size
tuple of N downstream streams that each replay the same elements. The upstream
only starts once all downstream streams are subscribed, and runs inside a scope:
import { Console, Effect, Stream } from "effect"
const program = Effect.scoped( Effect.gen(function* () { // Split one source into two independent downstream streams. const [forLogging, forMetrics] = yield* Stream.make(1, 2, 3, 4).pipe( Stream.broadcastN({ n: 2, capacity: 16 }) )
// Each consumer sees every element. Run them concurrently. const [logged, summed] = yield* Effect.all( [ Stream.runCollect(forLogging), forMetrics.pipe(Stream.runFold(() => 0, (acc, n) => acc + n)) ], { concurrency: "unbounded" } )
yield* Console.log(logged) // => [1, 2, 3, 4] yield* Console.log(summed) // => 10 }))
Effect.runPromise(program)With the default "suspend" strategy the upstream can only run capacity chunks
ahead of the slowest consumer, so a slow consumer applies backpressure to the
others. If a downstream stream is interrupted it unsubscribes and stops
contributing backpressure.
Reference
Section titled “Reference”Merging
Section titled “Merging”Merges two streams, emitting elements from both as they arrive. Ends when both
streams end unless you pass a haltStrategy ("left" | "right" | "both" | "either").
import { Effect, Stream } from "effect"
const fast = Stream.make(1, 2, 3)const slow = Stream.fromEffect(Effect.delay(Effect.succeed(4), "50 millis"))
Stream.merge(fast, slow).pipe(Stream.runCollect)// => [1, 2, 3, 4]mergeEffect
Section titled “mergeEffect”Merges a stream with a background Effect, keeping the stream’s elements. The
effect runs concurrently, fails the stream if it fails, and is interrupted when
the stream completes.
import { Console, Effect, Stream } from "effect"
Stream.make(1, 2, 3).pipe( Stream.mergeEffect(Console.log("side task")), Stream.runCollect)// logs "side task"; collected => [1, 2, 3]mergeResult
Section titled “mergeResult”Merges two streams into a stream of Result, tagging left values as
Result.succeed and right values as Result.fail. Useful for keeping track of
which source each element came from.
import { Effect, Result, Stream } from "effect"
const left = Stream.fromEffect(Effect.succeed("left"))const right = Stream.fromEffect(Effect.delay(Effect.succeed("right"), "10 millis"))
left.pipe( Stream.mergeResult(right), Stream.map( Result.match({ onSuccess: (value) => `left:${value}`, onFailure: (value) => `right:${value}` }) ))// => ["left:left", "right:right"]mergeLeft
Section titled “mergeLeft”Merges two streams but emits only the left values; the right stream still runs for its effects (and propagates failures). Completes when the left stream completes, interrupting the right.
import { Effect, Stream } from "effect"
const left = Stream.make(1, 2)const right = Stream.make("a", "b")
left.pipe(Stream.mergeLeft(right), Stream.runCollect)// => [1, 2]mergeRight
Section titled “mergeRight”The mirror of mergeLeft: emits only the right values while the left runs
for its effects. Completes when the right stream completes.
import { Effect, Stream } from "effect"
const left = Stream.make("left-1", "left-2")const right = Stream.make(1, 2)
Stream.mergeRight(left, right).pipe(Stream.runCollect)// => [1, 2]mergeAll
Section titled “mergeAll”Merges an iterable of already-created streams, running up to concurrency
inner streams at once. concurrency is required (a number or "unbounded");
bufferSize tunes buffering between inner streams.
import { Effect, Stream } from "effect"
const streams = [ Stream.fromEffect(Effect.delay(Effect.succeed("A"), "20 millis")), Stream.fromEffect(Effect.delay(Effect.succeed("B"), "10 millis"))]
Stream.mergeAll(streams, { concurrency: 2 }).pipe(Stream.runCollect)// => ["B", "A"] (emitted as they resolve)Racing
Section titled “Racing”Runs both streams concurrently until one emits its first value, then mirrors that winner and interrupts the loser. A side that fails or completes before emitting does not win unless both do. After a winner is chosen, its later failures propagate.
import { Effect, Schedule, Stream } from "effect"
const a = Stream.make(0, 1, 2)const b = Stream.fromSchedule(Schedule.spaced("1 second"))
Stream.race(a, b).pipe(Stream.runCollect)// => [0, 1, 2] (`a` emits first, so `b` is interrupted)raceAll
Section titled “raceAll”Like race but variadic: runs all streams concurrently and mirrors the first to
emit, interrupting the rest.
import { Effect, Schedule, Stream } from "effect"
Stream.raceAll( Stream.fromSchedule(Schedule.spaced("1 second")), Stream.make(0, 1, 2)).pipe(Stream.runCollect)// => [0, 1, 2]Zipping (lockstep)
Section titled “Zipping (lockstep)”Zipping pairs elements point-wise: the Nth element of one stream with the Nth of the other. The result ends when either stream ends, so the shorter stream determines the length.
Zips two streams point-wise into tuples.
import { Effect, Stream } from "effect"
Stream.zip(Stream.make(1, 2, 3), Stream.make("a", "b", "c")).pipe( Stream.runCollect)// => [[1, "a"], [2, "b"], [3, "c"]]zipWith
Section titled “zipWith”Like zip but combines each pair with a function instead of building tuples.
import { Effect, Stream } from "effect"
Stream.zipWith( Stream.make(1, 2, 3, 4, 5, 6), Stream.make("a", "b", "c"), (n, s) => `${n}-${s}`).pipe(Stream.runCollect)// => ["1-a", "2-b", "3-c"] (stops at the shorter side)zipWithArray
Section titled “zipWithArray”The chunk-level primitive behind zipWith. The combining function receives the
two non-empty chunks and returns [output, leftoverLeft, leftoverRight]; the
leftovers carry into the next pull. Use it when zipping logic depends on whole
chunks.
import { Array, Effect, Stream } from "effect"
const left = Stream.fromArrays([1, 2, 3], [4, 5])const right = Stream.fromArrays(["a", "b"], ["c", "d", "e"])
Stream.zipWithArray(left, right, (l, r) => { const n = Math.min(l.length, r.length) const output = Array.makeBy(n, (i) => [l[i], r[i]] as const) return [output, l.slice(n), r.slice(n)]}).pipe(Stream.runCollect)// => [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]]zipLeft
Section titled “zipLeft”Zips point-wise but keeps only the left values; the right side advances in lockstep but its values are discarded. Ends when either side ends.
import { Effect, Stream } from "effect"
Stream.zipLeft(Stream.make(1, 2, 3, 4), Stream.make("a", "b")).pipe( Stream.runCollect)// => [1, 2]zipRight
Section titled “zipRight”Keeps only the right values while advancing both in lockstep.
import { Effect, Stream } from "effect"
Stream.zipRight(Stream.make(1, 2), Stream.make("a", "b", "c", "d")).pipe( Stream.runCollect)// => ["a", "b"]zipFlatten
Section titled “zipFlatten”Zips point-wise and flattens the left tuple, appending the right element. Handy for accumulating wide tuples across several zips.
import { Effect, Stream } from "effect"
const s1 = Stream.make([1, "a"] as const, [2, "b"] as const, [3, "c"] as const)const s2 = Stream.make("x", "y", "z")
Stream.zipFlatten(s1, s2).pipe(Stream.runCollect)// => [[1, "a", "x"], [2, "b", "y"], [3, "c", "z"]]combine
Section titled “combine”Combines two streams element-by-element with a stateful pull function. You get the current state and pull effects for each side, and return the next value plus the next state. Use it for custom interleaving logic.
import { Effect, Stream } from "effect"
// Alternate strictly between left and right.Stream.combine( Stream.make("A", "B", "C"), Stream.make(1, 2, 3), () => true, // initial state: take from left next (takeLeft, pullLeft, pullRight) => takeLeft ? Effect.map(pullLeft, (value) => [`L:${value}`, false] as const) : Effect.map(pullRight, (value) => [`R:${value}`, true] as const)).pipe(Stream.runCollect)// => ["L:A", "R:1", "L:B", "R:2", "L:C", "R:3"]combineArray
Section titled “combineArray”The chunk-level version of combine: the pull functions yield non-empty chunks
and the function returns the next chunk plus state. Prefer it when each emitted
chunk depends on both sides and local state.
import { Effect, Stream } from "effect"
Stream.make(1, 2).pipe( Stream.combineArray( Stream.make(10, 20), () => true, (useLeft, pullLeft, pullRight) => Effect.gen(function* () { const array = useLeft ? yield* pullLeft : yield* pullRight return [array, !useLeft] as const }) ), Stream.runCollect)// => [1, 2, 10, 20]Latest-wins zipping
Section titled “Latest-wins zipping”Unlike lockstep zipping, these emit whenever either side produces a new value, pairing it with the latest value seen from the other side. Both sides must emit at least once before the first output.
zipLatest
Section titled “zipLatest”Pairs each new element with the most recent value from the other stream.
import { Effect, Stream } from "effect"
Stream.zipLatest(Stream.make(1), Stream.make("a")).pipe(Stream.runCollect)// => [[1, "a"]]zipLatestWith
Section titled “zipLatestWith”Like zipLatest but applies a combining function instead of producing tuples.
import { Effect, Stream } from "effect"
Stream.make(1, 2, 3).pipe( Stream.rechunk(1), Stream.zipLatestWith(Stream.make(10, 20).pipe(Stream.rechunk(1)), (n, m) => n + m), Stream.runCollect)// => [11, 12, 22, 23]zipLatestAll
Section titled “zipLatestAll”Variadic latest-wins zip across any number of streams, producing tuples of the latest value from each. The result type is the tuple of element types.
import { Effect, Stream } from "effect"
Stream.zipLatestAll( Stream.make(1).pipe(Stream.rechunk(1)), Stream.make("a").pipe(Stream.rechunk(1)), Stream.make(true).pipe(Stream.rechunk(1))).pipe(Stream.runCollect)// => [[1, "a", true]]Cartesian product
Section titled “Cartesian product”The cartesian product of two streams: the right stream is rerun for every left element, producing tuples of all pairs.
import { Effect, Stream } from "effect"
Stream.cross(Stream.make(1, 2), Stream.make("a", "b")).pipe(Stream.runCollect)// => [[1, "a"], [1, "b"], [2, "a"], [2, "b"]]crossWith
Section titled “crossWith”Like cross but combines each pair with a function.
import { Effect, Stream } from "effect"
Stream.crossWith(Stream.make(1, 2), Stream.make("a", "b"), (n, s) => `${n}-${s}`).pipe( Stream.runCollect)// => ["1-a", "1-b", "2-a", "2-b"]Fan-out / sharing
Section titled “Fan-out / sharing”These split a single upstream so multiple consumers can each observe its
elements. All of them are scoped — run them inside Effect.scoped (or yield in a
scoped Effect.gen).
broadcastN
Section titled “broadcastN”Splits a stream into a fixed-size tuple of N downstream streams that each emit
the same elements. The upstream starts after all N are subscribed. capacity
bounds buffering; strategy ("suspend" | "sliding" | "dropping", default
"suspend") controls behavior when a consumer falls behind; replay re-emits
the last N elements to new subscribers.
import { Console, Effect, Stream } from "effect"
Effect.scoped( Effect.gen(function* () { const [left, right] = yield* Stream.make(1, 2, 3).pipe( Stream.broadcastN({ n: 2, capacity: 8 }) ) const values = yield* Effect.all( [Stream.runCollect(left), Stream.runCollect(right)], { concurrency: "unbounded" } ) yield* Console.log(values) // => [[1, 2, 3], [1, 2, 3]] }))broadcast
Section titled “broadcast”Returns a single PubSub-backed stream that multicasts the source to every
subscriber. Each time you consume the returned stream you get a fresh
subscription. replay lets late subscribers see recent elements.
import { Console, Effect, Stream } from "effect"
Effect.scoped( Effect.gen(function* () { const broadcasted = yield* Stream.broadcast(Stream.fromArray([1, 2, 3]), { capacity: 8, replay: 3 }) const [a, b] = yield* Effect.all( [Stream.runCollect(broadcasted), Stream.runCollect(broadcasted)], { concurrency: "unbounded" } ) yield* Console.log([a, b]) // => [[1, 2, 3], [1, 2, 3]] }))Multicasts a single upstream to multiple consumers, subscribing lazily when the
first consumer starts. The upstream keeps running while there is at least one
consumer and is finalized after the last one leaves. idleTimeToLive keeps the
upstream alive between consumers so a later subscriber continues from the next
element instead of restarting.
import { Console, Effect, Stream } from "effect"
Effect.scoped( Effect.gen(function* () { const shared = yield* Stream.make(1, 2, 3).pipe(Stream.share({ capacity: 16 }))
const first = yield* shared.pipe(Stream.take(1), Stream.runCollect) const second = yield* shared.pipe(Stream.take(1), Stream.runCollect)
yield* Console.log([first, second]) // => [[1], [1]] }))Interruption coordination
Section titled “Interruption coordination”These let one stream’s lifetime be steered by an external signal or a background stream — closely related to merging because they coordinate concurrent activities.
interruptWhen
Section titled “interruptWhen”Interrupts the stream immediately when the given effect completes, including any in-progress pull from upstream. If the effect fails, that failure is emitted.
import { Console, Deferred, Effect, Stream } from "effect"
const program = Effect.gen(function* () { const interrupt = yield* Deferred.make<void>() const stream = Stream.make(1, 2, 3).pipe( Stream.tap((value) => value === 2 ? Deferred.succeed(interrupt, void 0) : Effect.void ), Stream.interruptWhen(Deferred.await(interrupt)) ) yield* Console.log(yield* Stream.runCollect(stream))})
Effect.runPromise(program)// => [1, 2]haltWhen
Section titled “haltWhen”Like interruptWhen but graceful: it stops before the next pull once the
effect completes, rather than interrupting an in-progress pull. The effect’s
failure still fails the stream.
import { Console, Deferred, Effect, Stream } from "effect"
const program = Effect.gen(function* () { const halt = yield* Deferred.make<void>() const values = yield* Stream.fromArray([1, 2, 3]).pipe( Stream.tap((value) => (value === 2 ? Deferred.succeed(halt, void 0) : Effect.void)), Stream.haltWhen(Deferred.await(halt)), Stream.runCollect ) yield* Console.log(values)})
Effect.runPromise(program)// => [1, 2]drainFork
Section titled “drainFork”Runs a background stream concurrently for its effects while the foreground
stream runs, interrupting the background when the foreground completes (and
failing if the background fails). Built on mergeEffect + runDrain — a handy
way to keep a side process alive only as long as the main stream.
import { Console, Effect, Stream } from "effect"
const foreground = Stream.make(1, 2)const background = Stream.fromEffect(Console.log("background task"))
foreground.pipe(Stream.drainFork(background), Stream.runCollect)// logs "background task"; collected => [1, 2]