Skip to content

Creating Streams

Most real streams originate from some external source — an array already in memory, an effect you want to repeat, a paginated HTTP endpoint, a callback-based event API, or a Node.js readable. Stream ships a constructor for each shape so you can lift the source into a typed, resource-safe pipeline. The constructors below cover the cases you will reach for most often.

import { Effect, Schedule, Stream } from "effect"
// `Stream.make` / `Stream.fromIterable` turn values already in memory into a
// stream. `make` takes varargs; `fromIterable` takes any iterable.
export const numbers = Stream.fromIterable<number>([1, 2, 3, 4, 5])
export const letters = Stream.make("a", "b", "c")
// `Stream.range` emits an inclusive range of integers.
export const oneToTen = Stream.range(1, 10)
// `Stream.fromEffect` runs an effect once and emits its single result. Useful
// for adapting an existing Effect into a (one-element) stream.
export const config = Stream.fromEffect(Effect.succeed({ retries: 3 }))
// `Stream.fromEffectSchedule` turns a single effect into a *polling* stream by
// re-running it on a Schedule. Great for metrics, health checks, and refresh
// loops. Here we sample every 30 seconds, but only keep the first 3 samples.
export const samples = Stream.fromEffectSchedule(
Effect.succeed(3),
Schedule.spaced("30 seconds")
).pipe(Stream.take(3))

APIs that return one page at a time map cleanly onto Stream.paginate. You give it a starting cursor and a function that, for each cursor, returns the page of values plus an Option of the next cursor (Option.none() ends the stream).

import { Array, Effect, Option, Stream } from "effect"
// Read a paginated "jobs" API. Each call returns one page of results and the
// next cursor; returning `Option.none()` stops the stream.
export const fetchJobsPage = Stream.paginate(
0, // start at page 0 (the cursor)
Effect.fn(function*(page) {
// Simulate network latency for the page request.
yield* Effect.sleep("50 millis")
const results = Array.range(0, 99).map((i) => `Job ${i + 1 + page * 100}`)
// Only fetch the first 10 pages, then end the stream.
const nextPage = page <= 10 ? Option.some(page + 1) : Option.none()
return [results, nextPage] as const
})
)

Stream.paginate flattens the per-page arrays for you: the resulting stream emits individual string jobs, not arrays of jobs.

JavaScript surfaces two common imperative shapes: async iterables (async function*) and callback registrations (event listeners, subscriptions). Both lift into a stream, and both let you convert thrown/raw errors into a typed error so the failure shows up in the stream’s E channel.

import { Effect, Queue, Schema, Stream } from "effect"
// A typed error for whatever the async iterable might throw. `Schema.Defect`
// captures an unknown cause without forcing you to model it.
class LetterError extends Schema.TaggedErrorClass<LetterError>()("LetterError", {
cause: Schema.Defect
}) {}
async function* asyncLetters() {
yield "a"
yield "b"
yield "c"
}
// `Stream.fromAsyncIterable` drives the iterator and maps any thrown error
// into our typed `LetterError`.
export const letters = Stream.fromAsyncIterable(
asyncLetters(),
(cause) => new LetterError({ cause })
)
const button = document.getElementById("my-button")!
// `Stream.fromEventListener` is the dedicated constructor for DOM events.
export const clicks = Stream.fromEventListener<PointerEvent>(button, "click")
// `Stream.callback` adapts *any* callback-based API. You receive a `Queue` and
// push values into it; the stream emits whatever you offer. Register the
// listener with `Effect.acquireRelease` so it is torn down when the stream ends.
export const clicksViaCallback = Stream.callback<PointerEvent>(
Effect.fn(function*(queue) {
function onEvent(event: PointerEvent) {
// `offerUnsafe` enqueues without an effect — safe inside a sync callback.
Queue.offerUnsafe(queue, event)
}
yield* Effect.acquireRelease(
Effect.sync(() => button.addEventListener("click", onEvent)),
() => Effect.sync(() => button.removeEventListener("click", onEvent))
)
})
)

The acquireRelease finalizer is the key to leak-free streaming: when the consumer stops pulling — whether the stream completes, fails, or is interrupted — the listener is removed. See Resource Management for how acquire/release and Scope work in general.

On the Node platform, NodeStream.fromReadable bridges a node:stream readable into an Effect Stream, converting low-level emitter errors into a typed error.

import { NodeStream } from "@effect/platform-node"
import { Schema } from "effect"
import { Readable } from "node:stream"
export class NodeStreamError extends Schema.TaggedErrorClass<NodeStreamError>()(
"NodeStreamError",
{ cause: Schema.Defect }
) {}
// Build the readable lazily (per subscription) and map its errors. `closeOnDone`
// destroys the readable when the stream finishes (the default).
export const fileChunks = NodeStream.fromReadable<string>({
evaluate: () => Readable.from(["Hello", " ", "world", "!"]),
onError: (cause) => new NodeStreamError({ cause }),
closeOnDone: true
})

With a stream in hand, the next step is to do something with it — see Consuming streams.