Error Handling
A stream’s failure type E is part of its signature, just like an Effect’s.
When a stream fails, every downstream operator short-circuits and the failure
surfaces wherever you run it. Effect gives you the same recovery vocabulary you
use elsewhere — switch to a fallback stream, match on a specific tagged error,
retry on a schedule, or run cleanup on failure — all expressed inside the
stream so the result type reflects what can still go wrong.
import { Effect, Stream } from "effect"
// A stream that emits a couple of values and then fails.const flaky = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("boom" as const)), Stream.concat(Stream.make(4, 5)))
const fallback = Stream.make(10, 20, 30)
// `Stream.catch` is the catch-all: on any failure it switches to the stream you// return. Values emitted *before* the failure are kept; the rest come from the// fallback. The recovered stream's error channel is now `never`.export const recovered = flaky.pipe( Stream.catch(() => fallback), Stream.runCollect)// => [1, 2, 3, 10, 20, 30]Recovering from specific errors
Section titled “Recovering from specific errors”When a stream can fail with several tagged errors, recover from each one
individually with Stream.catchTag (or catchTags for several at once). This
keeps the other error types in the signature so the compiler still forces you to
handle them.
import { Effect, Schema, Stream } from "effect"
class NotFound extends Schema.TaggedErrorClass<NotFound>()("NotFound", { id: Schema.String}) {}
class RateLimited extends Schema.TaggedErrorClass<RateLimited>()("RateLimited", { retryAfterMillis: Schema.Number}) {}
declare const records: Stream.Stream<string, NotFound | RateLimited>
// Handle `NotFound` by substituting an empty stream; `RateLimited` still// propagates, so the result type is `Stream<string, RateLimited>`.export const handled = records.pipe( Stream.catchTag("NotFound", () => Stream.empty))
// Handle several tags at once with a record of handlers.export const handledBoth = records.pipe( Stream.catchTags({ NotFound: () => Stream.empty, RateLimited: (e) => Stream.fromEffect( Effect.as(Effect.sleep(e.retryAfterMillis), "retried-placeholder") ) }))For richer matching there is also Stream.catchIf (recover when a predicate or
refinement matches) and Stream.catchCause (inspect the full Cause, including
defects). See Error Management for how tagged errors,
Cause, and these combinators relate across all of Effect.
Retrying a failing stream
Section titled “Retrying a failing stream”Transient failures — a flaky socket, a rate-limited API — are best handled by
retrying rather than giving up. Stream.retry takes a
Schedule and restarts the stream from the beginning each time it
fails, according to the policy.
import { Effect, Schedule, Stream } from "effect"
const readSensor = Effect.fn("readSensor")(function*() { // Imagine an effect that occasionally fails to read a hardware sensor. return yield* Effect.fail("sensor unavailable" as const)})
// Retry with exponential backoff, giving up after 5 attempts. `Schedule.both`// combines the two policies with AND semantics: it keeps recurring only while// *both* the exponential backoff and the `recurs(5)` cap still want to recur.export const sensorStream = Stream.fromEffect(readSensor()).pipe( Stream.retry(Schedule.exponential("100 millis").pipe( Schedule.both(Schedule.recurs(5)) )))Cleanup and timeouts
Section titled “Cleanup and timeouts”Stream.onError registers a finalizer that runs if (and only if) the stream
fails, receiving the Cause. It does not change the failure — use it for
cleanup, logging, or metrics, not recovery.
import { Cause, Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe( Stream.concat(Stream.fail("oops" as const)), // Runs on failure; the original error still propagates afterwards. Stream.onError((cause) => Effect.logError(`stream failed: ${Cause.pretty(cause)}`) ))Two operators guard against streams that stall rather than fail. Stream.timeout
ends the stream if no value is produced within a duration; Stream.timeoutOrElse
switches to a fallback stream instead.
import { Effect, Stream } from "effect"
const slow: Stream.Stream<number> = Stream.fromEffect( Effect.as(Effect.never, 0))
// End the stream (emit nothing more) if no element arrives within 2 seconds.export const bounded = slow.pipe(Stream.timeout("2 seconds"))
// Or switch to a fallback stream after the timeout.export const orFallback = slow.pipe( Stream.timeoutOrElse({ duration: "2 seconds", orElse: () => Stream.make(1, 2, 3) }))Together these let a long-running pipeline degrade gracefully: retry the transient failures, time out the stalls, fall back where recovery makes sense, and clean up on the paths that cannot recover.