# Concurrency, Merging & Zipping

A single stream models one source of values over time. Real programs usually
have *several* sources — two WebSocket feeds, a poll loop plus a manual refresh
button, a producer fanning out to many consumers. The `Stream` module gives you
a vocabulary for combining streams, and the way it combines them controls
*timing*:

- **Merge** interleaves elements from both streams **as they arrive** — order is
  determined by which source emits first.
- **Concatenation** (`Stream.concat`) runs the first stream to completion, then
  the second — strictly sequential. See
  [Transforming Streams](https://effect.plants.sh/streaming/transforming-streams/) for `concat`.
- **Zip** advances both streams in **lockstep**, pairing the Nth element of one
  with the Nth element of the other and stopping when either runs dry.
- **Latest-wins zipping** (`zipLatest`) re-emits whenever *either* side produces
  a new value, pairing it with the most recent value seen from the other side.
- **Race** runs streams concurrently but keeps only the first one to emit.
- **Cross** is the cartesian product: every element of the left paired with
  every element of the right.

Anywhere a combinator runs multiple inner streams concurrently it accepts the
shared concurrency options shape (`{ concurrency, bufferSize }`); see
[Concurrency Options](https://effect.plants.sh/concurrency/concurrency-options/) for the full shape.

## Common case: merging two event sources

The most common multi-stream task is consuming two independent sources at once,
emitting from whichever fires first. `Stream.merge` runs both concurrently and
interleaves their output:

```ts
import { Console, Effect, Schedule, Stream } from "effect"

// Two independent sources producing at different rates.
const fast = Stream.make("a", "b", "c").pipe(
  Stream.schedule(Schedule.spaced("10 millis"))
)
const slow = Stream.make("X", "Y").pipe(
  Stream.schedule(Schedule.spaced("25 millis"))
)

const program = Effect.gen(function* () {
  const result = yield* Stream.merge(fast, slow).pipe(Stream.runCollect)
  yield* Console.log(result)
  // => ["a", "b", "X", "c", "Y"]  (interleaved by arrival time)
})

Effect.runPromise(program)
```

By default a merged stream ends only when **both** sides end. Pass a
`haltStrategy` to change that — `"left"` ends when the left side ends, `"right"`
when the right does, `"either"` as soon as one ends, `"both"` (the default) when
both have ended:

```ts
import { Effect, Stream } from "effect"

const left = Stream.make(1, 2, 3)
const right = Stream.make(10, 20, 30, 40, 50)

const program = Stream.merge(left, right, { haltStrategy: "left" }).pipe(
  Stream.runCollect
)
// stops once `left` is exhausted, dropping the rest of `right`
```

## Common case: fanning out with broadcast

The other everyday need is the inverse: one upstream, many consumers, each
seeing every element. `Stream.broadcastN` splits a stream into a fixed-size
tuple of N downstream streams that each replay the same elements. The upstream
only starts once all downstream streams are subscribed, and runs inside a scope:

```ts
import { Console, Effect, Stream } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    // Split one source into two independent downstream streams.
    const [forLogging, forMetrics] = yield* Stream.make(1, 2, 3, 4).pipe(
      Stream.broadcastN({ n: 2, capacity: 16 })
    )

    // Each consumer sees every element. Run them concurrently.
    const [logged, summed] = yield* Effect.all(
      [
        Stream.runCollect(forLogging),
        forMetrics.pipe(Stream.runFold(() => 0, (acc, n) => acc + n))
      ],
      { concurrency: "unbounded" }
    )

    yield* Console.log(logged) // => [1, 2, 3, 4]
    yield* Console.log(summed) // => 10
  })
)

Effect.runPromise(program)
```

With the default `"suspend"` strategy the upstream can only run `capacity` chunks
ahead of the slowest consumer, so a slow consumer applies backpressure to the
others. If a downstream stream is interrupted it unsubscribes and stops
contributing backpressure.

---

# Reference

## Merging

### merge

Merges two streams, emitting elements from both as they arrive. Ends when both
streams end unless you pass a `haltStrategy` (`"left" | "right" | "both" |
"either"`).

```ts
import { Effect, Stream } from "effect"

const fast = Stream.make(1, 2, 3)
const slow = Stream.fromEffect(Effect.delay(Effect.succeed(4), "50 millis"))

Stream.merge(fast, slow).pipe(Stream.runCollect)
// => [1, 2, 3, 4]
```

### mergeEffect

Merges a stream with a background `Effect`, keeping the stream's elements. The
effect runs concurrently, fails the stream if it fails, and is interrupted when
the stream completes.

```ts
import { Console, Effect, Stream } from "effect"

Stream.make(1, 2, 3).pipe(
  Stream.mergeEffect(Console.log("side task")),
  Stream.runCollect
)
// logs "side task"; collected => [1, 2, 3]
```

### mergeResult

Merges two streams into a stream of `Result`, tagging left values as
`Result.succeed` and right values as `Result.fail`. Useful for keeping track of
which source each element came from.

```ts
import { Effect, Result, Stream } from "effect"

const left = Stream.fromEffect(Effect.succeed("left"))
const right = Stream.fromEffect(Effect.delay(Effect.succeed("right"), "10 millis"))

left.pipe(
  Stream.mergeResult(right),
  Stream.map(
    Result.match({
      onSuccess: (value) => `left:${value}`,
      onFailure: (value) => `right:${value}`
    })
  )
)
// => ["left:left", "right:right"]
```

### mergeLeft

Merges two streams but emits only the **left** values; the right stream still
runs for its effects (and propagates failures). Completes when the left stream
completes, interrupting the right.

```ts
import { Effect, Stream } from "effect"

const left = Stream.make(1, 2)
const right = Stream.make("a", "b")

left.pipe(Stream.mergeLeft(right), Stream.runCollect)
// => [1, 2]
```

### mergeRight

The mirror of `mergeLeft`: emits only the **right** values while the left runs
for its effects. Completes when the right stream completes.

```ts
import { Effect, Stream } from "effect"

const left = Stream.make("left-1", "left-2")
const right = Stream.make(1, 2)

Stream.mergeRight(left, right).pipe(Stream.runCollect)
// => [1, 2]
```

### mergeAll

Merges an iterable of already-created streams, running up to `concurrency`
inner streams at once. `concurrency` is required (a number or `"unbounded"`);
`bufferSize` tunes buffering between inner streams.

```ts
import { Effect, Stream } from "effect"

const streams = [
  Stream.fromEffect(Effect.delay(Effect.succeed("A"), "20 millis")),
  Stream.fromEffect(Effect.delay(Effect.succeed("B"), "10 millis"))
]

Stream.mergeAll(streams, { concurrency: 2 }).pipe(Stream.runCollect)
// => ["B", "A"]  (emitted as they resolve)
```

:::note[Merge vs. concat]
`Stream.merge` interleaves by arrival; `Stream.concat` runs the first stream to
completion then the second, preserving strict order. Reach for `concat` when
sequencing matters and `merge` when you want concurrency. See
[Transforming Streams](https://effect.plants.sh/streaming/transforming-streams/) for `concat`.
:::

## Racing

### race

Runs both streams concurrently until one emits its **first** value, then mirrors
that winner and interrupts the loser. A side that fails or completes *before*
emitting does not win unless both do. After a winner is chosen, its later
failures propagate.

```ts
import { Effect, Schedule, Stream } from "effect"

const a = Stream.make(0, 1, 2)
const b = Stream.fromSchedule(Schedule.spaced("1 second"))

Stream.race(a, b).pipe(Stream.runCollect)
// => [0, 1, 2]  (`a` emits first, so `b` is interrupted)
```

### raceAll

Like `race` but variadic: runs all streams concurrently and mirrors the first to
emit, interrupting the rest.

```ts
import { Effect, Schedule, Stream } from "effect"

Stream.raceAll(
  Stream.fromSchedule(Schedule.spaced("1 second")),
  Stream.make(0, 1, 2)
).pipe(Stream.runCollect)
// => [0, 1, 2]
```

## Zipping (lockstep)

Zipping pairs elements point-wise: the Nth element of one stream with the Nth of
the other. The result ends when **either** stream ends, so the shorter stream
determines the length.

### zip

Zips two streams point-wise into tuples.

```ts
import { Effect, Stream } from "effect"

Stream.zip(Stream.make(1, 2, 3), Stream.make("a", "b", "c")).pipe(
  Stream.runCollect
)
// => [[1, "a"], [2, "b"], [3, "c"]]
```

### zipWith

Like `zip` but combines each pair with a function instead of building tuples.

```ts
import { Effect, Stream } from "effect"

Stream.zipWith(
  Stream.make(1, 2, 3, 4, 5, 6),
  Stream.make("a", "b", "c"),
  (n, s) => `${n}-${s}`
).pipe(Stream.runCollect)
// => ["1-a", "2-b", "3-c"]  (stops at the shorter side)
```

### zipWithArray

The chunk-level primitive behind `zipWith`. The combining function receives the
two non-empty chunks and returns `[output, leftoverLeft, leftoverRight]`; the
leftovers carry into the next pull. Use it when zipping logic depends on whole
chunks.

```ts
import { Array, Effect, Stream } from "effect"

const left = Stream.fromArrays([1, 2, 3], [4, 5])
const right = Stream.fromArrays(["a", "b"], ["c", "d", "e"])

Stream.zipWithArray(left, right, (l, r) => {
  const n = Math.min(l.length, r.length)
  const output = Array.makeBy(n, (i) => [l[i], r[i]] as const)
  return [output, l.slice(n), r.slice(n)]
}).pipe(Stream.runCollect)
// => [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]]
```

### zipLeft

Zips point-wise but keeps only the **left** values; the right side advances in
lockstep but its values are discarded. Ends when either side ends.

```ts
import { Effect, Stream } from "effect"

Stream.zipLeft(Stream.make(1, 2, 3, 4), Stream.make("a", "b")).pipe(
  Stream.runCollect
)
// => [1, 2]
```

### zipRight

Keeps only the **right** values while advancing both in lockstep.

```ts
import { Effect, Stream } from "effect"

Stream.zipRight(Stream.make(1, 2), Stream.make("a", "b", "c", "d")).pipe(
  Stream.runCollect
)
// => ["a", "b"]
```

### zipFlatten

Zips point-wise and flattens the left tuple, appending the right element. Handy
for accumulating wide tuples across several zips.

```ts
import { Effect, Stream } from "effect"

const s1 = Stream.make([1, "a"] as const, [2, "b"] as const, [3, "c"] as const)
const s2 = Stream.make("x", "y", "z")

Stream.zipFlatten(s1, s2).pipe(Stream.runCollect)
// => [[1, "a", "x"], [2, "b", "y"], [3, "c", "z"]]
```

### combine

Combines two streams element-by-element with a stateful pull function. You get
the current state and pull effects for each side, and return the next value plus
the next state. Use it for custom interleaving logic.

```ts
import { Effect, Stream } from "effect"

// Alternate strictly between left and right.
Stream.combine(
  Stream.make("A", "B", "C"),
  Stream.make(1, 2, 3),
  () => true, // initial state: take from left next
  (takeLeft, pullLeft, pullRight) =>
    takeLeft
      ? Effect.map(pullLeft, (value) => [`L:${value}`, false] as const)
      : Effect.map(pullRight, (value) => [`R:${value}`, true] as const)
).pipe(Stream.runCollect)
// => ["L:A", "R:1", "L:B", "R:2", "L:C", "R:3"]
```

### combineArray

The chunk-level version of `combine`: the pull functions yield non-empty chunks
and the function returns the next chunk plus state. Prefer it when each emitted
chunk depends on both sides and local state.

```ts
import { Effect, Stream } from "effect"

Stream.make(1, 2).pipe(
  Stream.combineArray(
    Stream.make(10, 20),
    () => true,
    (useLeft, pullLeft, pullRight) =>
      Effect.gen(function* () {
        const array = useLeft ? yield* pullLeft : yield* pullRight
        return [array, !useLeft] as const
      })
  ),
  Stream.runCollect
)
// => [1, 2, 10, 20]
```

## Latest-wins zipping

Unlike lockstep zipping, these emit whenever *either* side produces a new value,
pairing it with the latest value seen from the other side. Both sides must emit
at least once before the first output.

:::caution
Tracking the "latest" value is done per-chunk: only the *last* element of each
emitted chunk is used for zipping. Use `Stream.rechunk(1)` if you need every
element considered.
:::

### zipLatest

Pairs each new element with the most recent value from the other stream.

```ts
import { Effect, Stream } from "effect"

Stream.zipLatest(Stream.make(1), Stream.make("a")).pipe(Stream.runCollect)
// => [[1, "a"]]
```

### zipLatestWith

Like `zipLatest` but applies a combining function instead of producing tuples.

```ts
import { Effect, Stream } from "effect"

Stream.make(1, 2, 3).pipe(
  Stream.rechunk(1),
  Stream.zipLatestWith(Stream.make(10, 20).pipe(Stream.rechunk(1)), (n, m) => n + m),
  Stream.runCollect
)
// => [11, 12, 22, 23]
```

### zipLatestAll

Variadic latest-wins zip across any number of streams, producing tuples of the
latest value from each. The result type is the tuple of element types.

```ts
import { Effect, Stream } from "effect"

Stream.zipLatestAll(
  Stream.make(1).pipe(Stream.rechunk(1)),
  Stream.make("a").pipe(Stream.rechunk(1)),
  Stream.make(true).pipe(Stream.rechunk(1))
).pipe(Stream.runCollect)
// => [[1, "a", true]]
```

## Cartesian product

### cross

The cartesian product of two streams: the right stream is rerun for every left
element, producing tuples of all pairs.

```ts
import { Effect, Stream } from "effect"

Stream.cross(Stream.make(1, 2), Stream.make("a", "b")).pipe(Stream.runCollect)
// => [[1, "a"], [1, "b"], [2, "a"], [2, "b"]]
```

### crossWith

Like `cross` but combines each pair with a function.

```ts
import { Effect, Stream } from "effect"

Stream.crossWith(Stream.make(1, 2), Stream.make("a", "b"), (n, s) => `${n}-${s}`).pipe(
  Stream.runCollect
)
// => ["1-a", "1-b", "2-a", "2-b"]
```

## Fan-out / sharing

These split a single upstream so multiple consumers can each observe its
elements. All of them are scoped — run them inside `Effect.scoped` (or yield in a
scoped `Effect.gen`).

### broadcastN

Splits a stream into a fixed-size tuple of N downstream streams that each emit
the same elements. The upstream starts after all N are subscribed. `capacity`
bounds buffering; `strategy` (`"suspend" | "sliding" | "dropping"`, default
`"suspend"`) controls behavior when a consumer falls behind; `replay` re-emits
the last N elements to new subscribers.

```ts
import { Console, Effect, Stream } from "effect"

Effect.scoped(
  Effect.gen(function* () {
    const [left, right] = yield* Stream.make(1, 2, 3).pipe(
      Stream.broadcastN({ n: 2, capacity: 8 })
    )
    const values = yield* Effect.all(
      [Stream.runCollect(left), Stream.runCollect(right)],
      { concurrency: "unbounded" }
    )
    yield* Console.log(values) // => [[1, 2, 3], [1, 2, 3]]
  })
)
```

### broadcast

Returns a *single* PubSub-backed stream that multicasts the source to every
subscriber. Each time you consume the returned stream you get a fresh
subscription. `replay` lets late subscribers see recent elements.

```ts
import { Console, Effect, Stream } from "effect"

Effect.scoped(
  Effect.gen(function* () {
    const broadcasted = yield* Stream.broadcast(Stream.fromArray([1, 2, 3]), {
      capacity: 8,
      replay: 3
    })
    const [a, b] = yield* Effect.all(
      [Stream.runCollect(broadcasted), Stream.runCollect(broadcasted)],
      { concurrency: "unbounded" }
    )
    yield* Console.log([a, b]) // => [[1, 2, 3], [1, 2, 3]]
  })
)
```

### share

Multicasts a single upstream to multiple consumers, subscribing lazily when the
first consumer starts. The upstream keeps running while there is at least one
consumer and is finalized after the last one leaves. `idleTimeToLive` keeps the
upstream alive between consumers so a later subscriber continues from the next
element instead of restarting.

```ts
import { Console, Effect, Stream } from "effect"

Effect.scoped(
  Effect.gen(function* () {
    const shared = yield* Stream.make(1, 2, 3).pipe(Stream.share({ capacity: 16 }))

    const first = yield* shared.pipe(Stream.take(1), Stream.runCollect)
    const second = yield* shared.pipe(Stream.take(1), Stream.runCollect)

    yield* Console.log([first, second]) // => [[1], [1]]
  })
)
```

## Interruption coordination

These let one stream's lifetime be steered by an external signal or a background
stream — closely related to merging because they coordinate concurrent
activities.

### interruptWhen

Interrupts the stream **immediately** when the given effect completes, including
any in-progress pull from upstream. If the effect fails, that failure is emitted.

```ts
import { Console, Deferred, Effect, Stream } from "effect"

const program = Effect.gen(function* () {
  const interrupt = yield* Deferred.make<void>()
  const stream = Stream.make(1, 2, 3).pipe(
    Stream.tap((value) =>
      value === 2 ? Deferred.succeed(interrupt, void 0) : Effect.void
    ),
    Stream.interruptWhen(Deferred.await(interrupt))
  )
  yield* Console.log(yield* Stream.runCollect(stream))
})

Effect.runPromise(program)
// => [1, 2]
```

### haltWhen

Like `interruptWhen` but **graceful**: it stops *before the next pull* once the
effect completes, rather than interrupting an in-progress pull. The effect's
failure still fails the stream.

```ts
import { Console, Deferred, Effect, Stream } from "effect"

const program = Effect.gen(function* () {
  const halt = yield* Deferred.make<void>()
  const values = yield* Stream.fromArray([1, 2, 3]).pipe(
    Stream.tap((value) => (value === 2 ? Deferred.succeed(halt, void 0) : Effect.void)),
    Stream.haltWhen(Deferred.await(halt)),
    Stream.runCollect
  )
  yield* Console.log(values)
})

Effect.runPromise(program)
// => [1, 2]
```

### drainFork

Runs a background stream concurrently for its effects while the foreground
stream runs, interrupting the background when the foreground completes (and
failing if the background fails). Built on `mergeEffect` + `runDrain` — a handy
way to keep a side process alive only as long as the main stream.

```ts
import { Console, Effect, Stream } from "effect"

const foreground = Stream.make(1, 2)
const background = Stream.fromEffect(Console.log("background task"))

foreground.pipe(Stream.drainFork(background), Stream.runCollect)
// logs "background task"; collected => [1, 2]
```