Skip to content

Error Handling

A stream’s failure type E is part of its signature, just like an Effect’s. When a stream fails, every downstream operator short-circuits and the failure surfaces wherever you run it. Effect gives you the same recovery vocabulary you use elsewhere — switch to a fallback stream, match on a specific tagged error, retry on a schedule, or run cleanup on failure — all expressed inside the stream so the result type reflects what can still go wrong.

import { Effect, Stream } from "effect"
// A stream that emits a couple of values and then fails.
const flaky = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("boom" as const)),
Stream.concat(Stream.make(4, 5))
)
const fallback = Stream.make(10, 20, 30)
// `Stream.catch` is the catch-all: on any failure it switches to the stream you
// return. Values emitted *before* the failure are kept; the rest come from the
// fallback. The recovered stream's error channel is now `never`.
export const recovered = flaky.pipe(
Stream.catch(() => fallback),
Stream.runCollect
)
// => [1, 2, 3, 10, 20, 30]

When a stream can fail with several tagged errors, recover from each one individually with Stream.catchTag (or catchTags for several at once). This keeps the other error types in the signature so the compiler still forces you to handle them.

import { Effect, Schema, Stream } from "effect"
class NotFound extends Schema.TaggedErrorClass<NotFound>()("NotFound", {
id: Schema.String
}) {}
class RateLimited extends Schema.TaggedErrorClass<RateLimited>()("RateLimited", {
retryAfterMillis: Schema.Number
}) {}
declare const records: Stream.Stream<string, NotFound | RateLimited>
// Handle `NotFound` by substituting an empty stream; `RateLimited` still
// propagates, so the result type is `Stream<string, RateLimited>`.
export const handled = records.pipe(
Stream.catchTag("NotFound", () => Stream.empty)
)
// Handle several tags at once with a record of handlers.
export const handledBoth = records.pipe(
Stream.catchTags({
NotFound: () => Stream.empty,
RateLimited: (e) => Stream.fromEffect(
Effect.as(Effect.sleep(e.retryAfterMillis), "retried-placeholder")
)
})
)

For richer matching there is also Stream.catchIf (recover when a predicate or refinement matches) and Stream.catchCause (inspect the full Cause, including defects). See Error Management for how tagged errors, Cause, and these combinators relate across all of Effect.

Transient failures — a flaky socket, a rate-limited API — are best handled by retrying rather than giving up. Stream.retry takes a Schedule and restarts the stream from the beginning each time it fails, according to the policy.

import { Effect, Schedule, Stream } from "effect"
const readSensor = Effect.fn("readSensor")(function*() {
// Imagine an effect that occasionally fails to read a hardware sensor.
return yield* Effect.fail("sensor unavailable" as const)
})
// Retry with exponential backoff, giving up after 5 attempts. `Schedule.both`
// combines the two policies with AND semantics: it keeps recurring only while
// *both* the exponential backoff and the `recurs(5)` cap still want to recur.
export const sensorStream = Stream.fromEffect(readSensor()).pipe(
Stream.retry(Schedule.exponential("100 millis").pipe(
Schedule.both(Schedule.recurs(5))
))
)

Stream.onError registers a finalizer that runs if (and only if) the stream fails, receiving the Cause. It does not change the failure — use it for cleanup, logging, or metrics, not recovery.

import { Cause, Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("oops" as const)),
// Runs on failure; the original error still propagates afterwards.
Stream.onError((cause) =>
Effect.logError(`stream failed: ${Cause.pretty(cause)}`)
)
)

Two operators guard against streams that stall rather than fail. Stream.timeout ends the stream if no value is produced within a duration; Stream.timeoutOrElse switches to a fallback stream instead.

import { Effect, Stream } from "effect"
const slow: Stream.Stream<number> = Stream.fromEffect(
Effect.as(Effect.never, 0)
)
// End the stream (emit nothing more) if no element arrives within 2 seconds.
export const bounded = slow.pipe(Stream.timeout("2 seconds"))
// Or switch to a fallback stream after the timeout.
export const orFallback = slow.pipe(
Stream.timeoutOrElse({
duration: "2 seconds",
orElse: () => Stream.make(1, 2, 3)
})
)

Together these let a long-running pipeline degrade gracefully: retry the transient failures, time out the stalls, fall back where recovery makes sense, and clean up on the paths that cannot recover.