Creating Streams
Most real streams originate from some external source — an array already in
memory, an effect you want to repeat, a paginated HTTP endpoint, a callback-based
event API, or a Node.js readable. Stream ships a constructor for each shape so
you can lift the source into a typed, resource-safe pipeline. The constructors
below cover the cases you will reach for most often.
import { Effect, Schedule, Stream } from "effect"
// `Stream.make` / `Stream.fromIterable` turn values already in memory into a// stream. `make` takes varargs; `fromIterable` takes any iterable.export const numbers = Stream.fromIterable<number>([1, 2, 3, 4, 5])export const letters = Stream.make("a", "b", "c")
// `Stream.range` emits an inclusive range of integers.export const oneToTen = Stream.range(1, 10)
// `Stream.fromEffect` runs an effect once and emits its single result. Useful// for adapting an existing Effect into a (one-element) stream.export const config = Stream.fromEffect(Effect.succeed({ retries: 3 }))
// `Stream.fromEffectSchedule` turns a single effect into a *polling* stream by// re-running it on a Schedule. Great for metrics, health checks, and refresh// loops. Here we sample every 30 seconds, but only keep the first 3 samples.export const samples = Stream.fromEffectSchedule( Effect.succeed(3), Schedule.spaced("30 seconds")).pipe(Stream.take(3))Paginated sources
Section titled “Paginated sources”APIs that return one page at a time map cleanly onto Stream.paginate. You give
it a starting cursor and a function that, for each cursor, returns the page of
values plus an Option of the next cursor (Option.none() ends the stream).
import { Array, Effect, Option, Stream } from "effect"
// Read a paginated "jobs" API. Each call returns one page of results and the// next cursor; returning `Option.none()` stops the stream.export const fetchJobsPage = Stream.paginate( 0, // start at page 0 (the cursor) Effect.fn(function*(page) { // Simulate network latency for the page request. yield* Effect.sleep("50 millis")
const results = Array.range(0, 99).map((i) => `Job ${i + 1 + page * 100}`)
// Only fetch the first 10 pages, then end the stream. const nextPage = page <= 10 ? Option.some(page + 1) : Option.none()
return [results, nextPage] as const }))Stream.paginate flattens the per-page arrays for you: the resulting stream
emits individual string jobs, not arrays of jobs.
Async iterables and callback APIs
Section titled “Async iterables and callback APIs”JavaScript surfaces two common imperative shapes: async iterables
(async function*) and callback registrations (event listeners, subscriptions).
Both lift into a stream, and both let you convert thrown/raw errors into a typed
error so the failure shows up in the stream’s E channel.
import { Effect, Queue, Schema, Stream } from "effect"
// A typed error for whatever the async iterable might throw. `Schema.Defect`// captures an unknown cause without forcing you to model it.class LetterError extends Schema.TaggedErrorClass<LetterError>()("LetterError", { cause: Schema.Defect}) {}
async function* asyncLetters() { yield "a" yield "b" yield "c"}
// `Stream.fromAsyncIterable` drives the iterator and maps any thrown error// into our typed `LetterError`.export const letters = Stream.fromAsyncIterable( asyncLetters(), (cause) => new LetterError({ cause }))
const button = document.getElementById("my-button")!
// `Stream.fromEventListener` is the dedicated constructor for DOM events.export const clicks = Stream.fromEventListener<PointerEvent>(button, "click")
// `Stream.callback` adapts *any* callback-based API. You receive a `Queue` and// push values into it; the stream emits whatever you offer. Register the// listener with `Effect.acquireRelease` so it is torn down when the stream ends.export const clicksViaCallback = Stream.callback<PointerEvent>( Effect.fn(function*(queue) { function onEvent(event: PointerEvent) { // `offerUnsafe` enqueues without an effect — safe inside a sync callback. Queue.offerUnsafe(queue, event) }
yield* Effect.acquireRelease( Effect.sync(() => button.addEventListener("click", onEvent)), () => Effect.sync(() => button.removeEventListener("click", onEvent)) ) }))The acquireRelease finalizer is the key to leak-free streaming: when the
consumer stops pulling — whether the stream completes, fails, or is interrupted —
the listener is removed. See Resource Management for how
acquire/release and Scope work in general.
Node.js readable streams
Section titled “Node.js readable streams”On the Node platform, NodeStream.fromReadable bridges a node:stream readable
into an Effect Stream, converting low-level emitter errors into a typed error.
import { NodeStream } from "@effect/platform-node"import { Schema } from "effect"import { Readable } from "node:stream"
export class NodeStreamError extends Schema.TaggedErrorClass<NodeStreamError>()( "NodeStreamError", { cause: Schema.Defect }) {}
// Build the readable lazily (per subscription) and map its errors. `closeOnDone`// destroys the readable when the stream finishes (the default).export const fileChunks = NodeStream.fromReadable<string>({ evaluate: () => Readable.from(["Hello", " ", "world", "!"]), onError: (cause) => new NodeStreamError({ cause }), closeOnDone: true})With a stream in hand, the next step is to do something with it — see Consuming streams.