Skip to content

Channel

Channel is the low-level streaming primitive that both Stream and Sink are built on. Most application code never touches it directly — streams and sinks cover the common cases far more ergonomically. You reach for Channel when you are building a new stream operator, adapting a pull-based source, or need fine-grained control over how input, output, errors, the final value, and resources compose.

A channel has seven type parameters:

import type { Channel } from "effect"
type Example = Channel.Channel<
number, // OutElem — elements it emits downstream
Error, // OutErr — error it can fail with
string, // OutDone — terminal value when it finishes
Uint8Array, // InElem — elements it reads from upstream
Error, // InErr — error it may receive from upstream
unknown, // InDone — upstream terminal value
never // Env — services required while running
>

Think of a channel as a process with two sides and a final value:

  • The output side (OutElem, OutErr, OutDone) is what the channel produces: a sequence of elements, an optional typed failure, and a single done value when it completes.
  • The input side (InElem, InErr, InDone) is the upstream protocol it consumes when piped after another channel. A source channel ignores its input (the input params default to unknown).
  • Env is the Effect environment needed while the channel is interpreted.

The done value is distinct from the emitted elements. Channel.fromArray([1, 2, 3]) emits 1, 2, 3 and completes with void; a channel can also emit nothing and complete with a value (Channel.end("done")).

  • A Stream<A, E, R> is essentially a Channel<NonEmptyReadonlyArray<A>, E, void, unknown, unknown, unknown, R> — a channel that ignores its input and emits chunks (non-empty arrays) of elements. Convert between them with Stream.toChannel / Stream.fromChannel.
  • A Sink consumes a channel’s output and produces a summary value; it uses the input side. Use Sink.toChannel / Sink.fromChannel.
import { Channel, Effect, Stream } from "effect"
// A Stream is a Channel under the hood
const stream = Stream.make(1, 2, 3)
const asChannel = Stream.toChannel(stream)
// Channel<NonEmptyReadonlyArray<number>, never, void, ...>
const backToStream = Stream.fromChannel(asChannel)

A practical example: a custom decode channel

Section titled “A practical example: a custom decode channel”

Channels shine when you build a reusable stream operator. Here we splice a custom byte-decoding channel into a stream with Stream.pipeThroughChannel. The channel reads chunks of Uint8Array, decodes them to text, and splits the text into lines.

import { Channel, Effect, Stream } from "effect"
// A channel that turns chunks of bytes into chunks of lines.
// InElem = NonEmptyReadonlyArray<Uint8Array> (what a byte Stream emits)
// OutElem = NonEmptyReadonlyArray<string> (what a line Stream wants)
const bytesToLines = Channel.decodeText<never, void>().pipe(
Channel.pipeTo(Channel.splitLines())
)
const bytes = Stream.make(
new TextEncoder().encode("hello\nwor"),
new TextEncoder().encode("ld\nbye\n")
)
const lines = bytes.pipe(Stream.pipeThroughChannel(bytesToLines))
Effect.runPromise(Stream.runCollect(lines)).then(console.log)
// => [ "hello", "world", "bye" ]

The same channel can be run on its own with the run* family if you do not need a stream:

import { Channel, Effect } from "effect"
const program = Channel.fromArray([1, 2, 3]).pipe(
Channel.map((n) => n * 2),
Channel.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 2, 4, 6 ]

Every public export, grouped by purpose. Each entry has a short description and a runnable example. Reconcile against the source if you depend on exact variance.

Checks whether a value is a Channel.

import { Channel } from "effect"
Channel.isChannel(Channel.succeed(42)) // => true
Channel.isChannel("nope") // => false

The string brand ("~effect/Channel") stored on channel values; used by isChannel.

The default chunk size used by array-batching constructors.

import { Channel } from "effect"
console.log(Channel.DefaultChunkSize) // => 4096

The string literal type for merge halting: "left" | "right" | "both" | "either".

These build channels directly from pull functions. You rarely need them unless implementing new primitives.

Builds a channel from a function that turns the upstream Pull into a new Pull inside a scope. This is the foundational constructor — most others are defined in terms of it.

import { Channel, Effect } from "effect"
// Pass the upstream pull through unchanged (an identity channel)
const ch = Channel.fromTransform((upstream, _scope) => Effect.succeed(upstream))

Like fromTransform, but also provides a forked scope that closes when the resulting channel completes — useful for scoped resource lifecycles inside a custom channel.

Builds a channel from an Effect that produces a Pull (a repeatedly evaluated effect that emits elements or signals done).

import { Channel, Effect } from "effect"
const ch = Channel.fromPull(Effect.succeed(Effect.succeed(42)))

Transforms an existing channel by rewriting its Pull implementation.

import { Channel, Effect } from "effect"
const doubled = Channel.transformPull(
Channel.fromIterable([1, 2, 3]),
(pull) => Effect.succeed(Effect.map(pull, (n) => n * 2))
)
// => emits 2, 4, 6

The inverse of fromTransform: extracts a channel’s underlying transform function.

import { Channel } from "effect"
const transform = Channel.toTransform(Channel.succeed(42))

Creates a channel that an imperative callback fills via a Queue. Optional bufferSize and strategy ("sliding" | "dropping" | "suspend") control backpressure.

import { Channel, Effect, Queue } from "effect"
const ch = Channel.callback<number>((queue) =>
Effect.gen(function*() {
yield* Queue.offer(queue, 1)
yield* Queue.offer(queue, 2)
})
)
// => emits 1, 2

Like callback, but emits non-empty arrays of the queued elements.

import { Channel, Effect, Queue } from "effect"
const ch = Channel.callbackArray<number>((queue) =>
Queue.offerAll(queue, [1, 2, 3]).pipe(Effect.asVoid)
)
// => emits [1, 2, 3]

Lazily defers channel construction until the channel is run.

import { Channel } from "effect"
const ch = Channel.suspend(() => Channel.succeed(Math.random()))

Emits a single value, then completes.

import { Channel } from "effect"
Channel.succeed(42) // => emits 42

Emits a single lazily computed value.

import { Channel } from "effect"
Channel.sync(() => Date.now()) // => emits the time when run

Completes immediately with a done value, emitting nothing.

import { Channel } from "effect"
Channel.end("done") // => emits nothing, done = "done"

Like end, but the done value is computed lazily.

import { Channel } from "effect"
Channel.endSync(() => "done")

A channel that emits no elements and completes with void.

import { Channel } from "effect"
Channel.empty // => emits nothing

A channel that never emits and never completes.

import { Channel } from "effect"
Channel.never // => hangs forever

Forwards upstream input elements, errors, and the done value unchanged.

import { Channel } from "effect"
const id = Channel.identity<number, never, void>()
// pipe a channel into this and it passes through

Fails immediately with the given error.

import { Channel } from "effect"
Channel.fail("boom") // => fails with "boom"

Fails with a lazily computed error.

import { Channel } from "effect"
Channel.failSync(() => new Error("late"))

Fails immediately with a Cause.

import { Cause, Channel } from "effect"
Channel.failCause(Cause.fail("boom"))

Fails with a lazily computed Cause.

import { Cause, Channel } from "effect"
Channel.failCauseSync(() => Cause.die("defect"))

Fails with an unrecoverable defect.

import { Channel } from "effect"
Channel.die(new Error("bug")) // => dies

Runs an effect and emits its success as a single element.

import { Channel, Effect } from "effect"
Channel.fromEffect(Effect.succeed("hi")) // => emits "hi"

Runs an effect and uses its success as the done value, emitting no elements.

import { Channel, Effect } from "effect"
Channel.fromEffectDone(Effect.succeed(42)) // => emits nothing, done = 42

Runs an effect and discards its result, emitting nothing.

import { Channel, Effect, Console } from "effect"
Channel.fromEffectDrain(Console.log("side effect"))

Builds a channel from an effect producing a Take: success emits a non-empty array, failure fails the channel, and a done Take completes it.

import { Channel, Effect } from "effect"
// A `Take<A>` is a non-empty array of elements, or an `Exit` for done/failure.
Channel.fromEffectTake(Effect.succeed([1, 2, 3] as const))
// => emits [1, 2, 3]

Emits every value produced by an iterator; the iterator’s return becomes the done value.

import { Channel } from "effect"
Channel.fromIterator(() => [1, 2, 3][Symbol.iterator]())
// => emits 1, 2, 3

Like fromIterator, but emits non-empty arrays batched by chunkSize (default DefaultChunkSize).

import { Channel } from "effect"
Channel.fromIteratorArray(() => [1, 2, 3, 4, 5][Symbol.iterator](), 2)
// => emits [1, 2], [3, 4], [5]

Emits every element of an iterable.

import { Channel } from "effect"
Channel.fromIterable(new Set([1, 2, 3])) // => emits 1, 2, 3

Like fromIterable, but emits non-empty arrays batched by chunkSize.

import { Channel } from "effect"
Channel.fromIterableArray([1, 2, 3, 4], 2)
// => emits [1, 2], [3, 4]

Emits each element of an array, then completes.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]) // => emits 1, 2, 3

Emits each element of a Chunk.

import { Channel, Chunk } from "effect"
Channel.fromChunk(Chunk.make(1, 2, 3)) // => emits 1, 2, 3

Reads elements one at a time from a queue.

import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() {
const queue = yield* Queue.unbounded<number>()
yield* Queue.offerAll(queue, [1, 2, 3])
return Channel.fromQueue(queue)
})

Reads available elements from a queue as non-empty arrays.

import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() {
const queue = yield* Queue.unbounded<number>()
yield* Queue.offerAll(queue, [1, 2, 3])
return Channel.fromQueueArray(queue) // => emits [1, 2, 3]
})

Emits values received from a PubSub subscription.

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(16)
const sub = yield* PubSub.subscribe(pubsub)
return Channel.fromSubscription(sub)
})

Like fromSubscription, but emits received values as non-empty arrays.

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(16)
const sub = yield* PubSub.subscribe(pubsub)
return Channel.fromSubscriptionArray(sub)
})

Subscribes to a PubSub and emits each published value.

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<number>()
return Channel.fromPubSub(pubsub)
})

Like fromPubSub, but emits published values as non-empty arrays.

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<number>()
return Channel.fromPubSubArray(pubsub)
})

Subscribes to a PubSub of Take values and emits the take outputs as non-empty arrays.

import { Channel, Effect, PubSub } from "effect"
import type { Take } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<Take.Take<number>>()
return Channel.fromPubSubTake(pubsub)
})

Emits each schedule output; the done value is the last output.

import { Channel, Schedule } from "effect"
Channel.fromSchedule(Schedule.spaced("1 second"))
// => emits the recurrence count on each tick

Pulls values from an AsyncIterable; the iterator return becomes the done value. The onError function maps thrown/rejected errors to a typed channel error.

import { Channel } from "effect"
async function* gen() {
yield 1
yield 2
}
Channel.fromAsyncIterable(gen(), (e) => new Error(String(e)))
// => emits 1, 2

Like fromAsyncIterable, but emits each value as a single-element non-empty array.

import { Channel } from "effect"
async function* gen() {
yield 1
}
Channel.fromAsyncIterableArray(gen(), (e) => new Error(String(e)))
// => emits [1]

Acquires a resource, uses it to build a channel, and runs release with the channel’s Exit when it finishes, fails, or is interrupted. Acquisition is uninterruptible.

import { Channel, Effect } from "effect"
Channel.acquireUseRelease(
Effect.succeed("conn"),
(conn) => Channel.succeed(conn.toUpperCase()),
(conn, _exit) => Effect.log(`closing ${conn}`)
)

Acquires a resource, emits it as a single element, and registers release in the channel scope.

import { Channel, Effect } from "effect"
Channel.acquireRelease(
Effect.succeed("conn"),
(conn, _exit) => Effect.log(`closing ${conn}`)
)
// => emits "conn"

Runs a channel with a Scope provided for its lifetime, removing the Scope requirement.

import { Channel, Effect, Scope } from "effect"
declare const scopedChannel: Channel.Channel<number, never, void, unknown, unknown, unknown, Scope.Scope>
const ch = Channel.scoped(scopedChannel) // Scope removed from Env

Attaches a finalizer that always runs once the channel begins, regardless of outcome.

import { Channel, Console } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.ensuring(Console.log("done"))
)

Attaches an exit-aware finalizer that receives the channel’s Exit.

import { Channel, Console, Exit } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.onExit((exit) =>
Console.log(Exit.isSuccess(exit) ? "ok" : "failed")
)
)

Attaches a finalizer that runs only on failure, receiving the failure Cause.

import { Channel, Console } from "effect"
Channel.fail("boom").pipe(
Channel.onError((cause) => Console.error(cause))
)

Runs an effect before the channel starts; its success is ignored, its failure aborts the channel.

import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.onStart(Console.log("starting"))
)

Runs an effect the first time the channel emits an element (which is still emitted unchanged).

import { Channel, Console } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.onFirst((n) => Console.log(`first: ${n}`))
)

Runs an effect when the channel completes successfully, before the done value propagates.

import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.onEnd(Console.log("finished"))
)

Maps each output element with a pure function (receives the element and its index).

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.map((n) => n * 2))
// => emits 2, 4, 6

Maps the channel’s done value.

import { Channel } from "effect"
Channel.end(3).pipe(Channel.mapDone((n) => n + 1))
// => done = 4

Maps the done value with an effectful function.

import { Channel, Effect } from "effect"
Channel.end(3).pipe(
Channel.mapDoneEffect((n) => Effect.succeed(n * 10))
)
// => done = 30

Maps each element with an effect. options.concurrency and options.unordered control parallelism.

import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.mapEffect((n) => Effect.succeed(n * 2), { concurrency: 2 })
)
// => emits 2, 4, 6

Maps the channel’s input elements with an effectful function (contravariant side).

import { Channel, Effect } from "effect"
Channel.identity<number, never, void>().pipe(
Channel.mapInput((s: string) => Effect.succeed(s.length))
)

Maps the channel’s input errors.

import { Channel, Effect } from "effect"
Channel.identity<number, Error, void>().pipe(
Channel.mapInputError((s: string) => Effect.succeed(new Error(s)))
)

Runs a side effect on each element, leaving the element unchanged.

import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.tap((n) => Console.log(n))
)
// => emits 1, 2 while logging each

Maps each element to a channel and flattens the children. options.concurrency / options.bufferSize control concurrency.

import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.flatMap((n) => Channel.fromArray([n, n * 10]))
)
// => emits 1, 10, 2, 20

Flattens a channel whose elements are themselves channels.

import { Channel } from "effect"
Channel.flatten(
Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3])])
)
// => emits 1, 2, 3

Flattens a channel of arrays into individual elements.

import { Channel } from "effect"
Channel.flattenArray(Channel.fromArray([[1, 2], [3, 4]]))
// => emits 1, 2, 3, 4

Flattens a channel of Take values: successes become non-empty arrays, failures fail, done completes.

import { Channel } from "effect"
// Each `Take` here is a non-empty array of elements.
Channel.flattenTake(Channel.fromArray([[1, 2], [3]] as const))
// => emits [1, 2], [3]

Keeps only elements matching a predicate (refinements narrow the output type).

import { Channel } from "effect"
Channel.fromArray([1, 2, 3, 4]).pipe(Channel.filter((n) => n % 2 === 0))
// => emits 2, 4

Keeps and maps elements with a Filter; rejected elements are dropped.

import { Channel, Filter } from "effect"
Channel.fromArray([1, 2, 3, 4]).pipe(
Channel.filterMap(Filter.fromPredicate((n: number) => n % 2 === 0))
)
// => emits 2, 4

Filters elements with an effectful predicate.

import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.filterEffect((n) => Effect.succeed(n > 1))
)
// => emits 2, 3

Filters and maps elements with an effectful Filter.

import { Channel, Effect, Result } from "effect"
// A FilterEffect returns Effect<Result<Pass, Fail>>: success keeps & maps, failure drops.
Channel.fromArray([1, 2, 3]).pipe(
Channel.filterMapEffect((n: number) =>
Effect.succeed(n > 1 ? Result.succeed(n * 10) : Result.fail(n))
)
)
// => emits 20, 30

Filters each element inside emitted arrays, emitting only non-empty filtered arrays.

import { Channel } from "effect"
Channel.fromArray([[1, 2, 3], [4, 5]]).pipe(
Channel.filterArray((n) => n % 2 === 1)
)
// => emits [1, 3], [5]

Filters and maps each element inside emitted arrays with a Filter.

import { Channel, Filter } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe(
Channel.filterMapArray(Filter.fromPredicate((n: number) => n % 2 === 0))
)
// => emits [2], [4]

Filters each element inside emitted arrays with an effectful predicate.

import { Channel, Effect } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe(
Channel.filterArrayEffect((n) => Effect.succeed(n > 2))
)
// => emits [3, 4]

Filters and maps each element inside emitted arrays with an effectful Filter.

import { Channel, Effect, Result } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe(
Channel.filterMapArrayEffect((n: number) =>
Effect.succeed(n > 2 ? Result.succeed(n) : Result.fail(n))
)
)
// => emits [3, 4]

Stateful map: each element updates an accumulator and yields zero or more output values. The onHalt option can flush remaining state.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.mapAccum(() => 0, (sum, n) => [sum + n, [sum + n]] as const)
)
// => emits 1, 3, 6

Emits the initial accumulator plus each intermediate result as it folds over the elements.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.scan(0, (sum, n) => sum + n))
// => emits 0, 1, 3, 6

Like scan, but the accumulator function is effectful.

import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.scanEffect(0, (sum, n) => Effect.succeed(sum + n))
)
// => emits 0, 1, 3, 6

Maps each element to a channel, but starting a new child interrupts the previous one (default concurrency 1).

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.switchMap((n) => Channel.fromArray([`v-${n}`]))
)
// => emits "v-1", "v-2", "v-3"

Runs an input handler against upstream while the wrapped channel runs without receiving input directly.

import { Channel, Effect } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.embedInput((upstream) =>
upstream.pipe(Effect.forever, Effect.ignore)
)
)

Buffers individual output elements in a queue so a fast producer runs ahead of a slow consumer. strategy defaults to "suspend" (backpressure); "dropping"/"sliding" may discard.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.buffer({ capacity: 16, strategy: "suspend" })
)

Buffers array output elements; upstream array boundaries are not preserved.

import { Channel } from "effect"
Channel.fromArray([[1, 2], [3]]).pipe(
Channel.bufferArray({ capacity: 16 })
)

Runs the second channel after the first completes, concatenating their output.

import { Channel } from "effect"
Channel.concat(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]))
// => emits 1, 2, 3, 4

Like concat, but the second channel is built from the first channel’s done value.

import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.concatWith(() => Channel.succeed(99))
)
// => emits 1, 2, 99

Combines two channels with a stateful pull function that reads from both sides.

import { Channel, Effect } from "effect"
Channel.combine(
Channel.fromArray([1, 2]),
Channel.fromArray([10, 20]),
() => 0,
(s, left, right) =>
Effect.map(Effect.zip(left, right), ([a, b]) => [a + b, s] as const)
)
// => emits 11, 22

Runs a fallback channel if the source completes without emitting any element.

import { Channel } from "effect"
Channel.empty.pipe(
Channel.orElseIfEmpty(() => Channel.succeed("fallback"))
)
// => emits "fallback"

Pipes this channel’s output into another channel’s input. The result keeps this channel’s input type and the other’s output/done types.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.pipeTo(Channel.identity<number, never, void>())
)
// => emits 1, 2, 3

Like pipeTo, but this channel’s failures are preserved and not observed by the downstream channel.

import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.pipeToOrFail(Channel.identity<number, never, void>())
)

Runs two channels concurrently, interleaving their output. haltStrategy controls when the merged channel stops ("left" | "right" | "both" | "either", default "both").

import { Channel } from "effect"
Channel.merge(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]), {
haltStrategy: "either"
})
// => emits 1, 2, 3, 4 (order may interleave)

Merges a channel of channels with bounded concurrency and bufferSize.

import { Channel } from "effect"
Channel.mergeAll({ concurrency: 2 })(
Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3, 4])])
)
// => emits 1, 2, 3, 4 (order may interleave)

Runs an effect concurrently with a channel, emitting only the channel’s output and failing if the effect fails.

import { Channel, Effect, Console } from "effect"
Channel.fromArray([1, 2]).pipe(
Channel.mergeEffect(Console.log("running alongside"))
)

Repeats the channel according to a Schedule.

import { Channel, Schedule } from "effect"
Channel.fromArray([1, 2]).pipe(Channel.repeat(Schedule.recurs(1)))
// => emits 1, 2, 1, 2

Repeats the channel forever (done type becomes never).

import { Channel } from "effect"
Channel.forever(Channel.fromArray([1, 2]))
// => emits 1, 2, 1, 2, ...

Runs a schedule step for each output element, applying delays between elements.

import { Channel, Schedule } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.schedule(Schedule.spaced("100 millis"))
)

Consumes all output, emitting nothing and preserving only the done value.

import { Channel } from "effect"
Channel.drain(Channel.fromArray([1, 2, 3]))
// => emits nothing

Recovers from any failure Cause by building a replacement channel.

import { Channel } from "effect"
Channel.fail("boom").pipe(
Channel.catchCause(() => Channel.succeed("recovered"))
)
// => emits "recovered"

Recovers from a typed error value by building a replacement channel.

import { Channel } from "effect"
Channel.fail("boom").pipe(
Channel.catch((err) => Channel.succeed(`saw ${err}`))
)
// => emits "saw boom"

Recovers from a failure Cause only when a predicate over the cause matches.

import { Cause, Channel } from "effect"
Channel.fail("boom").pipe(
Channel.catchCauseIf(Cause.hasFails, () => Channel.succeed("ok"))
)

Recovers using a Filter over the Cause; the recovery function receives the selected value.

import { Cause, Channel } from "effect"
// `Cause.findError` is a Filter<Cause<E>, E, ...> selecting the first failure error.
Channel.fail("boom").pipe(
Channel.catchCauseFilter(Cause.findError, (err) => Channel.succeed(`saw ${err}`))
)

Recovers from typed errors that match a predicate or refinement; non-matching errors propagate.

import { Channel } from "effect"
Channel.fail("boom").pipe(
Channel.catchIf((e): e is string => typeof e === "string", () => Channel.succeed("ok"))
)

Recovers from errors selected by a Filter; the recovery function receives the filtered value.

import { Channel, Filter } from "effect"
Channel.fail({ _tag: "Net" } as const).pipe(
Channel.catchFilter(
Filter.fromPredicate((e: { _tag: string }) => e._tag === "Net"),
() => Channel.succeed("ok")
)
)

Recovers from a specific tagged error (or array of tags), with an optional orElse for the rest.

import { Channel, Data } from "effect"
class NetError extends Data.TaggedError("NetError")<{ url: string }> {}
Channel.fail(new NetError({ url: "/x" })).pipe(
Channel.catchTag("NetError", (e) => Channel.succeed(`retry ${e.url}`))
)

Recovers from a specific reason tag nested inside a tagged error.

import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}
Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe(
Channel.catchReason("AiError", "RateLimit", (r) => Channel.succeed(`wait ${r.retryAfter}`))
)

Recovers from multiple nested reasons using an object of handlers keyed by reason tag.

import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class Quota extends Data.TaggedError("Quota")<{ limit: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit | Quota }> {}
Channel.fail(new AiError({ reason: new Quota({ limit: 5 }) })).pipe(
Channel.catchReasons("AiError", {
RateLimit: (r) => Channel.succeed(`wait ${r.retryAfter}`),
Quota: (r) => Channel.succeed(`limit ${r.limit}`)
})
)

Flattens a tagged error’s nested reason into the channel’s own error channel.

import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}
Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe(
Channel.unwrapReason("AiError")
)
// now fails with RateLimit directly

Runs an effect with the failure Cause, then re-raises the original failure.

import { Channel, Console } from "effect"
Channel.fail("boom").pipe(
Channel.tapCause((cause) => Console.error(cause))
)

Runs an effect when the channel fails with a typed error, then preserves the failure.

import { Channel, Console } from "effect"
Channel.fail("boom").pipe(
Channel.tapError((err) => Console.error(err))
)

Transforms the channel’s typed error.

import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.mapError((s) => new Error(s)))

Converts typed errors into unrecoverable defects (error channel becomes never).

import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.orDie)

Ignores typed errors, turning a failed channel into an empty one. Use { log: true } to log the cause first.

import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.ignore())
// => emits nothing, no failure

Like ignore, but also swallows defects.

import { Channel } from "effect"
Channel.die("bug").pipe(Channel.ignoreCause())

Retries the channel according to a Schedule whenever it fails.

import { Channel, Schedule } from "effect"
Channel.fail("boom").pipe(Channel.retry(Schedule.recurs(3)))

Races the channel against an effect; if the effect finishes first, its value becomes the done value.

import { Channel, Effect } from "effect"
Channel.never.pipe(
Channel.interruptWhen(Effect.as(Effect.sleep("1 second"), "timeout"))
)
// => done = "timeout"

Stops the channel when an effect completes or fails; the effect’s success becomes the done value, its failure fails the channel.

import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe(
Channel.haltWhen(Effect.as(Effect.sleep("1 second"), "stop"))
)

These operate on chunks (NonEmptyReadonlyArray), matching the shape Stream uses.

Splits incoming string chunks into lines, recognizing \n, \r\n, and standalone \r, even across chunk boundaries.

import { Channel, Effect, Stream } from "effect"
Effect.runPromise(
Stream.runCollect(Stream.splitLines(Stream.make("hel", "lo\r\nwor", "ld\n")))
).then(console.log)
// => [ "hello", "world" ]

Decodes incoming Uint8Array chunks into string chunks using TextDecoder with streaming enabled, so multi-byte characters may span chunk boundaries. Highlighted in the v4 channel improvements.

import { Channel } from "effect"
const decode = Channel.decodeText<never, void>("utf-8")
// InElem: NonEmptyReadonlyArray<Uint8Array> -> OutElem: NonEmptyReadonlyArray<string>

Encodes incoming string chunks into Uint8Array chunks using TextEncoder.

import { Channel } from "effect"
const encode = Channel.encodeText<never, void>()
// InElem: NonEmptyReadonlyArray<string> -> OutElem: NonEmptyReadonlyArray<Uint8Array>

Builds a channel from the surrounding Context.

import { Channel, Context } from "effect"
const Config = Context.Reference("Config", { defaultValue: () => ({ n: 1 }) })
Channel.contextWith((ctx: Context.Context<typeof Config>) =>
Channel.succeed(Context.get(ctx, Config).n)
)

Provides a Context, removing those service requirements.

import { Channel, Context } from "effect"
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { svc: string }>
declare const ctx: Context.Context<{ svc: string }>
const provided = Channel.provideContext(ch, ctx)

Provides a single service by key.

import { Channel, Context } from "effect"
const Greeting = Context.Reference("Greeting", { defaultValue: () => "hi" })
Channel.contextWith((ctx: Context.Context<typeof Greeting>) =>
Channel.succeed(Context.get(ctx, Greeting))
).pipe(Channel.provideService(Greeting, "hello"))

Provides a service obtained from an effect (which may fail).

import { Channel, Context, Effect } from "effect"
const Token = Context.Reference("Token", { defaultValue: () => "" })
declare const ch: Channel.Channel<string, never, void, unknown, unknown, unknown, typeof Token>
Channel.provideServiceEffect(ch, Token, Effect.succeed("abc"))

Provides a Layer or Context, removing the corresponding requirements. options.local builds a fresh layer instance.

import { Channel, Layer } from "effect"
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { db: string }>
declare const DbLive: Layer.Layer<{ db: string }>
const provided = Channel.provide(ch, DbLive)

Transforms the surrounding context before the channel runs.

import { Channel, Context } from "effect"
declare const ch: Channel.Channel<number, never, void>
Channel.updateContext(ch, (ctx: Context.Context<never>) => ctx)

Reads a service from context, transforms it, and provides the updated value.

import { Channel, Context } from "effect"
const Count = Context.Reference("Count", { defaultValue: () => 0 })
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, typeof Count>
Channel.updateService(ch, Count, (n) => n + 1)

Runs the channel inside a tracing span, ending the span with the channel’s exit.

import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.withSpan("process-batch"))

The starting channel emitting an empty object.

import { Channel } from "effect"
Channel.Do // => emits {}

Adds a field produced by running another channel for each emitted object.

import { Channel } from "effect"
Channel.Do.pipe(
Channel.bind("x", () => Channel.succeed(1)),
Channel.bind("y", () => Channel.succeed(2))
)
// => emits { x: 1, y: 2 }

Wraps each emitted element in an object under the given field name.

import { Channel } from "effect"
Channel.succeed(42).pipe(Channel.bindTo("answer"))
// => emits { answer: 42 }

Builds a channel from a (possibly scoped) effect that yields a channel.

import { Channel, Effect } from "effect"
Channel.unwrap(Effect.succeed(Channel.fromArray([1, 2, 3])))
// => emits 1, 2, 3

These interpret a channel that no longer needs upstream input, producing an Effect.

Collects all output elements into an array.

import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runCollect(Channel.fromArray([1, 2, 3]))).then(console.log)
// => [ 1, 2, 3 ]

Discards all output, returning the done value.

import { Channel, Effect } from "effect"
Effect.runPromise(
Channel.fromArray([1, 2]).pipe(
Channel.concatWith(() => Channel.end("done")),
Channel.runDrain
)
).then(console.log)
// => "done"

Counts the emitted elements.

import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runCount(Channel.fromArray([1, 2, 3]))).then(console.log)
// => 3

Runs an effect for each output element, returning the done value.

import { Channel, Console, Effect } from "effect"
Effect.runPromise(
Channel.runForEach(Channel.fromArray([1, 2]), (n) => Console.log(n))
)

Runs an effectful predicate per element, stopping early when it returns false.

import { Channel, Effect } from "effect"
Channel.runForEachWhile(Channel.fromArray([1, 2, 3]), (n) => Effect.succeed(n < 3))
// stops before emitting past 3

Folds over all output elements with a pure accumulator.

import { Channel, Effect } from "effect"
Effect.runPromise(
Channel.runFold(Channel.fromArray([1, 2, 3]), () => 0, (acc, n) => acc + n)
).then(console.log)
// => 6

Folds over all output elements with an effectful accumulator.

import { Channel, Effect } from "effect"
Channel.runFoldEffect(
Channel.fromArray([1, 2, 3]),
() => 0,
(acc, n) => Effect.succeed(acc + n)
)

Returns the first output element as an Option.

import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runHead(Channel.fromArray([1, 2, 3]))).then(console.log)
// => Option.some(1)

Returns the last output element as an Option.

import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runLast(Channel.fromArray([1, 2, 3]))).then(console.log)
// => Option.some(3)

Runs the channel and returns only its done value.

import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runDone(Channel.end("done"))).then(console.log)
// => "done"

Converts the channel to a scoped Pull for manual, low-level consumption (pulls are serialized).

import { Channel, Effect } from "effect"
const pull = Effect.scoped(Channel.toPull(Channel.fromArray([1, 2, 3])))

Converts the channel to a Pull within an existing scope you control.

import { Channel, Effect, Scope } from "effect"
Effect.gen(function*() {
const scope = yield* Scope.make()
const pull = yield* Channel.toPullScoped(Channel.fromArray([1, 2, 3]), scope)
return pull
})

Runs the channel, offering each element into a queue; ends or fails the queue accordingly.

import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() {
const queue = yield* Queue.unbounded<number>()
yield* Channel.runIntoQueue(Channel.fromArray([1, 2, 3]), queue)
})

Like runIntoQueue, for a channel emitting non-empty arrays (each element is offered individually).

import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() {
const queue = yield* Queue.unbounded<number>()
yield* Channel.runIntoQueueArray(Channel.fromArray([[1, 2], [3]]), queue)
})

Creates a scoped queue and forks the channel to feed it for concurrent consumption.

import { Channel, Effect } from "effect"
Effect.scoped(
Channel.toQueue(Channel.fromArray([1, 2, 3]), { capacity: 16 })
)

Like toQueue, for an array-emitting channel (offers each element individually).

import { Channel, Effect } from "effect"
Effect.scoped(
Channel.toQueueArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)

Runs the channel, publishing each element to a PubSub. options.shutdownOnEnd shuts it down when done.

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<number>()
yield* Channel.runIntoPubSub(Channel.fromArray([1, 2, 3]), pubsub)
})

Like runIntoPubSub, for an array-emitting channel (publishes each element individually).

import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<number>()
yield* Channel.runIntoPubSubArray(Channel.fromArray([[1, 2], [3]]), pubsub)
})

Creates a scoped PubSub and forks the channel to feed it. shutdownOnEnd defaults to true.

import { Channel, Effect } from "effect"
Effect.scoped(
Channel.toPubSub(Channel.fromArray([1, 2, 3]), { capacity: 16 })
)

Like toPubSub, for an array-emitting channel (publishes each element individually).

import { Channel, Effect } from "effect"
Effect.scoped(
Channel.toPubSubArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)

Creates a scoped PubSub of Take values; the channel’s final Exit is published so subscribers can observe completion or failure.

import { Channel, Effect } from "effect"
Effect.scoped(
Channel.toPubSubTake(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)