Streaming
A Stream<A, E, R> is an effectful, pull-based sequence of values. Where an
Effect<A, E, R> produces a single result (or fails with E), a Stream
produces zero, one, or many A values over time — and may also fail with E
or require services R. Streams can be finite (the lines of a file) or infinite
(a polling loop, a socket, a clock tick), and they evaluate lazily: nothing runs
until you run the stream with a destructor like Stream.runForEach or a
Sink.
Because streaming is pull-based, a Stream only does as much work as its
consumer demands. This gives you backpressure for free, bounded memory over huge
or unbounded sources, and the same composability you get from Effect —
typed errors, resource safety via Scope, and structured
concurrency.
import { Effect, Stream } from "effect"
// A pipeline reads numbers, keeps the even ones, doubles them, and logs each.// Nothing executes until `runForEach` pulls values through the pipeline.const program = Stream.range(1, 10).pipe( Stream.filter((n) => n % 2 === 0), Stream.map((n) => n * 2), Stream.runForEach((n) => Effect.logInfo(`value: ${n}`)))
Effect.runFork(program)Under the hood a Stream is built on Channel, the lower-level primitive that
also powers Sink. You rarely touch Channel directly, but
it is what makes operators like Stream.pipeThroughChannel (used for
encoding) possible.
In this section
Section titled “In this section”- Creating streams — build streams from iterables, effects, paginated APIs, callbacks, and Node readables.
- Consuming streams — run streams with
runForEach,runCollect,runDrain, andrunFold. - Transforming streams —
map,flatMap,filter,mapEffect, grouping, and concurrency. - Error handling — recover, retry, and time out within streams.
- Encoding — decode and encode structured data with
NdjsonandMsgpack. - Sink — describe how a stream is consumed.
When to reach for a Stream
Section titled “When to reach for a Stream”Use a Stream when a source naturally yields many values over time and you want
to process them incrementally instead of materialising everything in memory:
log lines, paginated HTTP responses, websocket messages, file chunks, database
cursors, or a polling loop. For a single asynchronous result, a plain
Effect is the right tool.