Channel
Channel is the low-level streaming primitive that both Stream
and Sink are built on. Most application code never touches it directly —
streams and sinks cover the common cases far more ergonomically. You reach for Channel when you
are building a new stream operator, adapting a pull-based source, or need fine-grained control
over how input, output, errors, the final value, and resources compose.
The mental model
Section titled “The mental model”A channel has seven type parameters:
import type { Channel } from "effect"
type Example = Channel.Channel< number, // OutElem — elements it emits downstream Error, // OutErr — error it can fail with string, // OutDone — terminal value when it finishes Uint8Array, // InElem — elements it reads from upstream Error, // InErr — error it may receive from upstream unknown, // InDone — upstream terminal value never // Env — services required while running>Think of a channel as a process with two sides and a final value:
- The output side (
OutElem,OutErr,OutDone) is what the channel produces: a sequence of elements, an optional typed failure, and a single done value when it completes. - The input side (
InElem,InErr,InDone) is the upstream protocol it consumes when piped after another channel. A source channel ignores its input (the input params default tounknown). Envis the Effect environment needed while the channel is interpreted.
The done value is distinct from the emitted elements. Channel.fromArray([1, 2, 3]) emits
1, 2, 3 and completes with void; a channel can also emit nothing and complete with a value
(Channel.end("done")).
How Stream and Sink relate
Section titled “How Stream and Sink relate”- A
Stream<A, E, R>is essentially aChannel<NonEmptyReadonlyArray<A>, E, void, unknown, unknown, unknown, R>— a channel that ignores its input and emits chunks (non-empty arrays) of elements. Convert between them withStream.toChannel/Stream.fromChannel. - A
Sinkconsumes a channel’s output and produces a summary value; it uses the input side. UseSink.toChannel/Sink.fromChannel.
import { Channel, Effect, Stream } from "effect"
// A Stream is a Channel under the hoodconst stream = Stream.make(1, 2, 3)const asChannel = Stream.toChannel(stream)// Channel<NonEmptyReadonlyArray<number>, never, void, ...>
const backToStream = Stream.fromChannel(asChannel)A practical example: a custom decode channel
Section titled “A practical example: a custom decode channel”Channels shine when you build a reusable stream operator. Here we splice a custom byte-decoding
channel into a stream with Stream.pipeThroughChannel. The channel reads chunks of Uint8Array,
decodes them to text, and splits the text into lines.
import { Channel, Effect, Stream } from "effect"
// A channel that turns chunks of bytes into chunks of lines.// InElem = NonEmptyReadonlyArray<Uint8Array> (what a byte Stream emits)// OutElem = NonEmptyReadonlyArray<string> (what a line Stream wants)const bytesToLines = Channel.decodeText<never, void>().pipe( Channel.pipeTo(Channel.splitLines()))
const bytes = Stream.make( new TextEncoder().encode("hello\nwor"), new TextEncoder().encode("ld\nbye\n"))
const lines = bytes.pipe(Stream.pipeThroughChannel(bytesToLines))
Effect.runPromise(Stream.runCollect(lines)).then(console.log)// => [ "hello", "world", "bye" ]The same channel can be run on its own with the run* family if you do not need a stream:
import { Channel, Effect } from "effect"
const program = Channel.fromArray([1, 2, 3]).pipe( Channel.map((n) => n * 2), Channel.runCollect)
Effect.runPromise(program).then(console.log)// => [ 2, 4, 6 ]Reference
Section titled “Reference”Every public export, grouped by purpose. Each entry has a short description and a runnable example. Reconcile against the source if you depend on exact variance.
Guards and constants
Section titled “Guards and constants”isChannel
Section titled “isChannel”Checks whether a value is a Channel.
import { Channel } from "effect"
Channel.isChannel(Channel.succeed(42)) // => trueChannel.isChannel("nope") // => falseTypeId
Section titled “TypeId”The string brand ("~effect/Channel") stored on channel values; used by isChannel.
DefaultChunkSize
Section titled “DefaultChunkSize”The default chunk size used by array-batching constructors.
import { Channel } from "effect"
console.log(Channel.DefaultChunkSize) // => 4096HaltStrategy
Section titled “HaltStrategy”The string literal type for merge halting: "left" | "right" | "both" | "either".
Low-level constructors
Section titled “Low-level constructors”These build channels directly from pull functions. You rarely need them unless implementing new primitives.
fromTransform
Section titled “fromTransform”Builds a channel from a function that turns the upstream Pull into a new Pull inside a scope.
This is the foundational constructor — most others are defined in terms of it.
import { Channel, Effect } from "effect"
// Pass the upstream pull through unchanged (an identity channel)const ch = Channel.fromTransform((upstream, _scope) => Effect.succeed(upstream))fromTransformBracket
Section titled “fromTransformBracket”Like fromTransform, but also provides a forked scope that closes when the resulting channel
completes — useful for scoped resource lifecycles inside a custom channel.
fromPull
Section titled “fromPull”Builds a channel from an Effect that produces a Pull (a repeatedly evaluated effect that emits
elements or signals done).
import { Channel, Effect } from "effect"
const ch = Channel.fromPull(Effect.succeed(Effect.succeed(42)))transformPull
Section titled “transformPull”Transforms an existing channel by rewriting its Pull implementation.
import { Channel, Effect } from "effect"
const doubled = Channel.transformPull( Channel.fromIterable([1, 2, 3]), (pull) => Effect.succeed(Effect.map(pull, (n) => n * 2)))// => emits 2, 4, 6toTransform
Section titled “toTransform”The inverse of fromTransform: extracts a channel’s underlying transform function.
import { Channel } from "effect"
const transform = Channel.toTransform(Channel.succeed(42))callback
Section titled “callback”Creates a channel that an imperative callback fills via a Queue. Optional bufferSize and
strategy ("sliding" | "dropping" | "suspend") control backpressure.
import { Channel, Effect, Queue } from "effect"
const ch = Channel.callback<number>((queue) => Effect.gen(function*() { yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2) }))// => emits 1, 2callbackArray
Section titled “callbackArray”Like callback, but emits non-empty arrays of the queued elements.
import { Channel, Effect, Queue } from "effect"
const ch = Channel.callbackArray<number>((queue) => Queue.offerAll(queue, [1, 2, 3]).pipe(Effect.asVoid))// => emits [1, 2, 3]suspend
Section titled “suspend”Lazily defers channel construction until the channel is run.
import { Channel } from "effect"
const ch = Channel.suspend(() => Channel.succeed(Math.random()))Value constructors
Section titled “Value constructors”succeed
Section titled “succeed”Emits a single value, then completes.
import { Channel } from "effect"
Channel.succeed(42) // => emits 42Emits a single lazily computed value.
import { Channel } from "effect"
Channel.sync(() => Date.now()) // => emits the time when runCompletes immediately with a done value, emitting nothing.
import { Channel } from "effect"
Channel.end("done") // => emits nothing, done = "done"endSync
Section titled “endSync”Like end, but the done value is computed lazily.
import { Channel } from "effect"
Channel.endSync(() => "done")A channel that emits no elements and completes with void.
import { Channel } from "effect"
Channel.empty // => emits nothingA channel that never emits and never completes.
import { Channel } from "effect"
Channel.never // => hangs foreveridentity
Section titled “identity”Forwards upstream input elements, errors, and the done value unchanged.
import { Channel } from "effect"
const id = Channel.identity<number, never, void>()// pipe a channel into this and it passes throughFails immediately with the given error.
import { Channel } from "effect"
Channel.fail("boom") // => fails with "boom"failSync
Section titled “failSync”Fails with a lazily computed error.
import { Channel } from "effect"
Channel.failSync(() => new Error("late"))failCause
Section titled “failCause”Fails immediately with a Cause.
import { Cause, Channel } from "effect"
Channel.failCause(Cause.fail("boom"))failCauseSync
Section titled “failCauseSync”Fails with a lazily computed Cause.
import { Cause, Channel } from "effect"
Channel.failCauseSync(() => Cause.die("defect"))Fails with an unrecoverable defect.
import { Channel } from "effect"
Channel.die(new Error("bug")) // => diesConstructors from sources
Section titled “Constructors from sources”fromEffect
Section titled “fromEffect”Runs an effect and emits its success as a single element.
import { Channel, Effect } from "effect"
Channel.fromEffect(Effect.succeed("hi")) // => emits "hi"fromEffectDone
Section titled “fromEffectDone”Runs an effect and uses its success as the done value, emitting no elements.
import { Channel, Effect } from "effect"
Channel.fromEffectDone(Effect.succeed(42)) // => emits nothing, done = 42fromEffectDrain
Section titled “fromEffectDrain”Runs an effect and discards its result, emitting nothing.
import { Channel, Effect, Console } from "effect"
Channel.fromEffectDrain(Console.log("side effect"))fromEffectTake
Section titled “fromEffectTake”Builds a channel from an effect producing a Take: success emits a non-empty array, failure fails
the channel, and a done Take completes it.
import { Channel, Effect } from "effect"
// A `Take<A>` is a non-empty array of elements, or an `Exit` for done/failure.Channel.fromEffectTake(Effect.succeed([1, 2, 3] as const))// => emits [1, 2, 3]fromIterator
Section titled “fromIterator”Emits every value produced by an iterator; the iterator’s return becomes the done value.
import { Channel } from "effect"
Channel.fromIterator(() => [1, 2, 3][Symbol.iterator]())// => emits 1, 2, 3fromIteratorArray
Section titled “fromIteratorArray”Like fromIterator, but emits non-empty arrays batched by chunkSize (default DefaultChunkSize).
import { Channel } from "effect"
Channel.fromIteratorArray(() => [1, 2, 3, 4, 5][Symbol.iterator](), 2)// => emits [1, 2], [3, 4], [5]fromIterable
Section titled “fromIterable”Emits every element of an iterable.
import { Channel } from "effect"
Channel.fromIterable(new Set([1, 2, 3])) // => emits 1, 2, 3fromIterableArray
Section titled “fromIterableArray”Like fromIterable, but emits non-empty arrays batched by chunkSize.
import { Channel } from "effect"
Channel.fromIterableArray([1, 2, 3, 4], 2)// => emits [1, 2], [3, 4]fromArray
Section titled “fromArray”Emits each element of an array, then completes.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]) // => emits 1, 2, 3fromChunk
Section titled “fromChunk”Emits each element of a Chunk.
import { Channel, Chunk } from "effect"
Channel.fromChunk(Chunk.make(1, 2, 3)) // => emits 1, 2, 3fromQueue
Section titled “fromQueue”Reads elements one at a time from a queue.
import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() { const queue = yield* Queue.unbounded<number>() yield* Queue.offerAll(queue, [1, 2, 3]) return Channel.fromQueue(queue)})fromQueueArray
Section titled “fromQueueArray”Reads available elements from a queue as non-empty arrays.
import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() { const queue = yield* Queue.unbounded<number>() yield* Queue.offerAll(queue, [1, 2, 3]) return Channel.fromQueueArray(queue) // => emits [1, 2, 3]})fromSubscription
Section titled “fromSubscription”Emits values received from a PubSub subscription.
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.bounded<string>(16) const sub = yield* PubSub.subscribe(pubsub) return Channel.fromSubscription(sub)})fromSubscriptionArray
Section titled “fromSubscriptionArray”Like fromSubscription, but emits received values as non-empty arrays.
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.bounded<string>(16) const sub = yield* PubSub.subscribe(pubsub) return Channel.fromSubscriptionArray(sub)})fromPubSub
Section titled “fromPubSub”Subscribes to a PubSub and emits each published value.
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.unbounded<number>() return Channel.fromPubSub(pubsub)})fromPubSubArray
Section titled “fromPubSubArray”Like fromPubSub, but emits published values as non-empty arrays.
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.unbounded<number>() return Channel.fromPubSubArray(pubsub)})fromPubSubTake
Section titled “fromPubSubTake”Subscribes to a PubSub of Take values and emits the take outputs as non-empty arrays.
import { Channel, Effect, PubSub } from "effect"import type { Take } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.unbounded<Take.Take<number>>() return Channel.fromPubSubTake(pubsub)})fromSchedule
Section titled “fromSchedule”Emits each schedule output; the done value is the last output.
import { Channel, Schedule } from "effect"
Channel.fromSchedule(Schedule.spaced("1 second"))// => emits the recurrence count on each tickfromAsyncIterable
Section titled “fromAsyncIterable”Pulls values from an AsyncIterable; the iterator return becomes the done value. The onError
function maps thrown/rejected errors to a typed channel error.
import { Channel } from "effect"
async function* gen() { yield 1 yield 2}
Channel.fromAsyncIterable(gen(), (e) => new Error(String(e)))// => emits 1, 2fromAsyncIterableArray
Section titled “fromAsyncIterableArray”Like fromAsyncIterable, but emits each value as a single-element non-empty array.
import { Channel } from "effect"
async function* gen() { yield 1}
Channel.fromAsyncIterableArray(gen(), (e) => new Error(String(e)))// => emits [1]Resource management
Section titled “Resource management”acquireUseRelease
Section titled “acquireUseRelease”Acquires a resource, uses it to build a channel, and runs release with the channel’s Exit when
it finishes, fails, or is interrupted. Acquisition is uninterruptible.
import { Channel, Effect } from "effect"
Channel.acquireUseRelease( Effect.succeed("conn"), (conn) => Channel.succeed(conn.toUpperCase()), (conn, _exit) => Effect.log(`closing ${conn}`))acquireRelease
Section titled “acquireRelease”Acquires a resource, emits it as a single element, and registers release in the channel scope.
import { Channel, Effect } from "effect"
Channel.acquireRelease( Effect.succeed("conn"), (conn, _exit) => Effect.log(`closing ${conn}`))// => emits "conn"scoped
Section titled “scoped”Runs a channel with a Scope provided for its lifetime, removing the Scope requirement.
import { Channel, Effect, Scope } from "effect"
declare const scopedChannel: Channel.Channel<number, never, void, unknown, unknown, unknown, Scope.Scope>const ch = Channel.scoped(scopedChannel) // Scope removed from Envensuring
Section titled “ensuring”Attaches a finalizer that always runs once the channel begins, regardless of outcome.
import { Channel, Console } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.ensuring(Console.log("done")))onExit
Section titled “onExit”Attaches an exit-aware finalizer that receives the channel’s Exit.
import { Channel, Console, Exit } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.onExit((exit) => Console.log(Exit.isSuccess(exit) ? "ok" : "failed") ))onError
Section titled “onError”Attaches a finalizer that runs only on failure, receiving the failure Cause.
import { Channel, Console } from "effect"
Channel.fail("boom").pipe( Channel.onError((cause) => Console.error(cause)))onStart
Section titled “onStart”Runs an effect before the channel starts; its success is ignored, its failure aborts the channel.
import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.onStart(Console.log("starting")))onFirst
Section titled “onFirst”Runs an effect the first time the channel emits an element (which is still emitted unchanged).
import { Channel, Console } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.onFirst((n) => Console.log(`first: ${n}`)))Runs an effect when the channel completes successfully, before the done value propagates.
import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.onEnd(Console.log("finished")))Transforming output
Section titled “Transforming output”Maps each output element with a pure function (receives the element and its index).
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.map((n) => n * 2))// => emits 2, 4, 6mapDone
Section titled “mapDone”Maps the channel’s done value.
import { Channel } from "effect"
Channel.end(3).pipe(Channel.mapDone((n) => n + 1))// => done = 4mapDoneEffect
Section titled “mapDoneEffect”Maps the done value with an effectful function.
import { Channel, Effect } from "effect"
Channel.end(3).pipe( Channel.mapDoneEffect((n) => Effect.succeed(n * 10)))// => done = 30mapEffect
Section titled “mapEffect”Maps each element with an effect. options.concurrency and options.unordered control parallelism.
import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.mapEffect((n) => Effect.succeed(n * 2), { concurrency: 2 }))// => emits 2, 4, 6mapInput
Section titled “mapInput”Maps the channel’s input elements with an effectful function (contravariant side).
import { Channel, Effect } from "effect"
Channel.identity<number, never, void>().pipe( Channel.mapInput((s: string) => Effect.succeed(s.length)))mapInputError
Section titled “mapInputError”Maps the channel’s input errors.
import { Channel, Effect } from "effect"
Channel.identity<number, Error, void>().pipe( Channel.mapInputError((s: string) => Effect.succeed(new Error(s))))Runs a side effect on each element, leaving the element unchanged.
import { Channel, Console } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.tap((n) => Console.log(n)))// => emits 1, 2 while logging eachflatMap
Section titled “flatMap”Maps each element to a channel and flattens the children. options.concurrency /
options.bufferSize control concurrency.
import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.flatMap((n) => Channel.fromArray([n, n * 10])))// => emits 1, 10, 2, 20flatten
Section titled “flatten”Flattens a channel whose elements are themselves channels.
import { Channel } from "effect"
Channel.flatten( Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3])]))// => emits 1, 2, 3flattenArray
Section titled “flattenArray”Flattens a channel of arrays into individual elements.
import { Channel } from "effect"
Channel.flattenArray(Channel.fromArray([[1, 2], [3, 4]]))// => emits 1, 2, 3, 4flattenTake
Section titled “flattenTake”Flattens a channel of Take values: successes become non-empty arrays, failures fail, done
completes.
import { Channel } from "effect"
// Each `Take` here is a non-empty array of elements.Channel.flattenTake(Channel.fromArray([[1, 2], [3]] as const))// => emits [1, 2], [3]filter
Section titled “filter”Keeps only elements matching a predicate (refinements narrow the output type).
import { Channel } from "effect"
Channel.fromArray([1, 2, 3, 4]).pipe(Channel.filter((n) => n % 2 === 0))// => emits 2, 4filterMap
Section titled “filterMap”Keeps and maps elements with a Filter; rejected elements are dropped.
import { Channel, Filter } from "effect"
Channel.fromArray([1, 2, 3, 4]).pipe( Channel.filterMap(Filter.fromPredicate((n: number) => n % 2 === 0)))// => emits 2, 4filterEffect
Section titled “filterEffect”Filters elements with an effectful predicate.
import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.filterEffect((n) => Effect.succeed(n > 1)))// => emits 2, 3filterMapEffect
Section titled “filterMapEffect”Filters and maps elements with an effectful Filter.
import { Channel, Effect, Result } from "effect"
// A FilterEffect returns Effect<Result<Pass, Fail>>: success keeps & maps, failure drops.Channel.fromArray([1, 2, 3]).pipe( Channel.filterMapEffect((n: number) => Effect.succeed(n > 1 ? Result.succeed(n * 10) : Result.fail(n)) ))// => emits 20, 30filterArray
Section titled “filterArray”Filters each element inside emitted arrays, emitting only non-empty filtered arrays.
import { Channel } from "effect"
Channel.fromArray([[1, 2, 3], [4, 5]]).pipe( Channel.filterArray((n) => n % 2 === 1))// => emits [1, 3], [5]filterMapArray
Section titled “filterMapArray”Filters and maps each element inside emitted arrays with a Filter.
import { Channel, Filter } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe( Channel.filterMapArray(Filter.fromPredicate((n: number) => n % 2 === 0)))// => emits [2], [4]filterArrayEffect
Section titled “filterArrayEffect”Filters each element inside emitted arrays with an effectful predicate.
import { Channel, Effect } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe( Channel.filterArrayEffect((n) => Effect.succeed(n > 2)))// => emits [3, 4]filterMapArrayEffect
Section titled “filterMapArrayEffect”Filters and maps each element inside emitted arrays with an effectful Filter.
import { Channel, Effect, Result } from "effect"
Channel.fromArray([[1, 2], [3, 4]]).pipe( Channel.filterMapArrayEffect((n: number) => Effect.succeed(n > 2 ? Result.succeed(n) : Result.fail(n)) ))// => emits [3, 4]mapAccum
Section titled “mapAccum”Stateful map: each element updates an accumulator and yields zero or more output values. The
onHalt option can flush remaining state.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.mapAccum(() => 0, (sum, n) => [sum + n, [sum + n]] as const))// => emits 1, 3, 6Emits the initial accumulator plus each intermediate result as it folds over the elements.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.scan(0, (sum, n) => sum + n))// => emits 0, 1, 3, 6scanEffect
Section titled “scanEffect”Like scan, but the accumulator function is effectful.
import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.scanEffect(0, (sum, n) => Effect.succeed(sum + n)))// => emits 0, 1, 3, 6switchMap
Section titled “switchMap”Maps each element to a channel, but starting a new child interrupts the previous one (default
concurrency 1).
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.switchMap((n) => Channel.fromArray([`v-${n}`])))// => emits "v-1", "v-2", "v-3"embedInput
Section titled “embedInput”Runs an input handler against upstream while the wrapped channel runs without receiving input directly.
import { Channel, Effect } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.embedInput((upstream) => upstream.pipe(Effect.forever, Effect.ignore) ))buffer
Section titled “buffer”Buffers individual output elements in a queue so a fast producer runs ahead of a slow consumer.
strategy defaults to "suspend" (backpressure); "dropping"/"sliding" may discard.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.buffer({ capacity: 16, strategy: "suspend" }))bufferArray
Section titled “bufferArray”Buffers array output elements; upstream array boundaries are not preserved.
import { Channel } from "effect"
Channel.fromArray([[1, 2], [3]]).pipe( Channel.bufferArray({ capacity: 16 }))Composition
Section titled “Composition”concat
Section titled “concat”Runs the second channel after the first completes, concatenating their output.
import { Channel } from "effect"
Channel.concat(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]))// => emits 1, 2, 3, 4concatWith
Section titled “concatWith”Like concat, but the second channel is built from the first channel’s done value.
import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.concatWith(() => Channel.succeed(99)))// => emits 1, 2, 99combine
Section titled “combine”Combines two channels with a stateful pull function that reads from both sides.
import { Channel, Effect } from "effect"
Channel.combine( Channel.fromArray([1, 2]), Channel.fromArray([10, 20]), () => 0, (s, left, right) => Effect.map(Effect.zip(left, right), ([a, b]) => [a + b, s] as const))// => emits 11, 22orElseIfEmpty
Section titled “orElseIfEmpty”Runs a fallback channel if the source completes without emitting any element.
import { Channel } from "effect"
Channel.empty.pipe( Channel.orElseIfEmpty(() => Channel.succeed("fallback")))// => emits "fallback"pipeTo
Section titled “pipeTo”Pipes this channel’s output into another channel’s input. The result keeps this channel’s input type and the other’s output/done types.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.pipeTo(Channel.identity<number, never, void>()))// => emits 1, 2, 3pipeToOrFail
Section titled “pipeToOrFail”Like pipeTo, but this channel’s failures are preserved and not observed by the downstream channel.
import { Channel } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.pipeToOrFail(Channel.identity<number, never, void>()))Runs two channels concurrently, interleaving their output. haltStrategy controls when the merged
channel stops ("left" | "right" | "both" | "either", default "both").
import { Channel } from "effect"
Channel.merge(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]), { haltStrategy: "either"})// => emits 1, 2, 3, 4 (order may interleave)mergeAll
Section titled “mergeAll”Merges a channel of channels with bounded concurrency and bufferSize.
import { Channel } from "effect"
Channel.mergeAll({ concurrency: 2 })( Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3, 4])]))// => emits 1, 2, 3, 4 (order may interleave)mergeEffect
Section titled “mergeEffect”Runs an effect concurrently with a channel, emitting only the channel’s output and failing if the effect fails.
import { Channel, Effect, Console } from "effect"
Channel.fromArray([1, 2]).pipe( Channel.mergeEffect(Console.log("running alongside")))Repetition
Section titled “Repetition”repeat
Section titled “repeat”Repeats the channel according to a Schedule.
import { Channel, Schedule } from "effect"
Channel.fromArray([1, 2]).pipe(Channel.repeat(Schedule.recurs(1)))// => emits 1, 2, 1, 2forever
Section titled “forever”Repeats the channel forever (done type becomes never).
import { Channel } from "effect"
Channel.forever(Channel.fromArray([1, 2]))// => emits 1, 2, 1, 2, ...schedule
Section titled “schedule”Runs a schedule step for each output element, applying delays between elements.
import { Channel, Schedule } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.schedule(Schedule.spaced("100 millis")))Consumes all output, emitting nothing and preserving only the done value.
import { Channel } from "effect"
Channel.drain(Channel.fromArray([1, 2, 3]))// => emits nothingError handling
Section titled “Error handling”catchCause
Section titled “catchCause”Recovers from any failure Cause by building a replacement channel.
import { Channel } from "effect"
Channel.fail("boom").pipe( Channel.catchCause(() => Channel.succeed("recovered")))// => emits "recovered"catch (catch_)
Section titled “catch (catch_)”Recovers from a typed error value by building a replacement channel.
import { Channel } from "effect"
Channel.fail("boom").pipe( Channel.catch((err) => Channel.succeed(`saw ${err}`)))// => emits "saw boom"catchCauseIf
Section titled “catchCauseIf”Recovers from a failure Cause only when a predicate over the cause matches.
import { Cause, Channel } from "effect"
Channel.fail("boom").pipe( Channel.catchCauseIf(Cause.hasFails, () => Channel.succeed("ok")))catchCauseFilter
Section titled “catchCauseFilter”Recovers using a Filter over the Cause; the recovery function receives the selected value.
import { Cause, Channel } from "effect"
// `Cause.findError` is a Filter<Cause<E>, E, ...> selecting the first failure error.Channel.fail("boom").pipe( Channel.catchCauseFilter(Cause.findError, (err) => Channel.succeed(`saw ${err}`)))catchIf
Section titled “catchIf”Recovers from typed errors that match a predicate or refinement; non-matching errors propagate.
import { Channel } from "effect"
Channel.fail("boom").pipe( Channel.catchIf((e): e is string => typeof e === "string", () => Channel.succeed("ok")))catchFilter
Section titled “catchFilter”Recovers from errors selected by a Filter; the recovery function receives the filtered value.
import { Channel, Filter } from "effect"
Channel.fail({ _tag: "Net" } as const).pipe( Channel.catchFilter( Filter.fromPredicate((e: { _tag: string }) => e._tag === "Net"), () => Channel.succeed("ok") ))catchTag
Section titled “catchTag”Recovers from a specific tagged error (or array of tags), with an optional orElse for the rest.
import { Channel, Data } from "effect"
class NetError extends Data.TaggedError("NetError")<{ url: string }> {}
Channel.fail(new NetError({ url: "/x" })).pipe( Channel.catchTag("NetError", (e) => Channel.succeed(`retry ${e.url}`)))catchReason
Section titled “catchReason”Recovers from a specific reason tag nested inside a tagged error.
import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}
Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe( Channel.catchReason("AiError", "RateLimit", (r) => Channel.succeed(`wait ${r.retryAfter}`)))catchReasons
Section titled “catchReasons”Recovers from multiple nested reasons using an object of handlers keyed by reason tag.
import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}class Quota extends Data.TaggedError("Quota")<{ limit: number }> {}class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit | Quota }> {}
Channel.fail(new AiError({ reason: new Quota({ limit: 5 }) })).pipe( Channel.catchReasons("AiError", { RateLimit: (r) => Channel.succeed(`wait ${r.retryAfter}`), Quota: (r) => Channel.succeed(`limit ${r.limit}`) }))unwrapReason
Section titled “unwrapReason”Flattens a tagged error’s nested reason into the channel’s own error channel.
import { Channel, Data } from "effect"
class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}
Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe( Channel.unwrapReason("AiError"))// now fails with RateLimit directlytapCause
Section titled “tapCause”Runs an effect with the failure Cause, then re-raises the original failure.
import { Channel, Console } from "effect"
Channel.fail("boom").pipe( Channel.tapCause((cause) => Console.error(cause)))tapError
Section titled “tapError”Runs an effect when the channel fails with a typed error, then preserves the failure.
import { Channel, Console } from "effect"
Channel.fail("boom").pipe( Channel.tapError((err) => Console.error(err)))mapError
Section titled “mapError”Transforms the channel’s typed error.
import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.mapError((s) => new Error(s)))Converts typed errors into unrecoverable defects (error channel becomes never).
import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.orDie)ignore
Section titled “ignore”Ignores typed errors, turning a failed channel into an empty one. Use { log: true } to log the
cause first.
import { Channel } from "effect"
Channel.fail("boom").pipe(Channel.ignore())// => emits nothing, no failureignoreCause
Section titled “ignoreCause”Like ignore, but also swallows defects.
import { Channel } from "effect"
Channel.die("bug").pipe(Channel.ignoreCause())Retries the channel according to a Schedule whenever it fails.
import { Channel, Schedule } from "effect"
Channel.fail("boom").pipe(Channel.retry(Schedule.recurs(3)))Interruption
Section titled “Interruption”interruptWhen
Section titled “interruptWhen”Races the channel against an effect; if the effect finishes first, its value becomes the done value.
import { Channel, Effect } from "effect"
Channel.never.pipe( Channel.interruptWhen(Effect.as(Effect.sleep("1 second"), "timeout")))// => done = "timeout"haltWhen
Section titled “haltWhen”Stops the channel when an effect completes or fails; the effect’s success becomes the done value, its failure fails the channel.
import { Channel, Effect } from "effect"
Channel.fromArray([1, 2, 3]).pipe( Channel.haltWhen(Effect.as(Effect.sleep("1 second"), "stop")))Text codecs
Section titled “Text codecs”These operate on chunks (NonEmptyReadonlyArray), matching the shape Stream uses.
splitLines
Section titled “splitLines”Splits incoming string chunks into lines, recognizing \n, \r\n, and standalone \r, even across
chunk boundaries.
import { Channel, Effect, Stream } from "effect"
Effect.runPromise( Stream.runCollect(Stream.splitLines(Stream.make("hel", "lo\r\nwor", "ld\n")))).then(console.log)// => [ "hello", "world" ]decodeText
Section titled “decodeText”Decodes incoming Uint8Array chunks into string chunks using TextDecoder with streaming enabled,
so multi-byte characters may span chunk boundaries. Highlighted in the v4 channel improvements.
import { Channel } from "effect"
const decode = Channel.decodeText<never, void>("utf-8")// InElem: NonEmptyReadonlyArray<Uint8Array> -> OutElem: NonEmptyReadonlyArray<string>encodeText
Section titled “encodeText”Encodes incoming string chunks into Uint8Array chunks using TextEncoder.
import { Channel } from "effect"
const encode = Channel.encodeText<never, void>()// InElem: NonEmptyReadonlyArray<string> -> OutElem: NonEmptyReadonlyArray<Uint8Array>Context and services
Section titled “Context and services”contextWith
Section titled “contextWith”Builds a channel from the surrounding Context.
import { Channel, Context } from "effect"
const Config = Context.Reference("Config", { defaultValue: () => ({ n: 1 }) })
Channel.contextWith((ctx: Context.Context<typeof Config>) => Channel.succeed(Context.get(ctx, Config).n))provideContext
Section titled “provideContext”Provides a Context, removing those service requirements.
import { Channel, Context } from "effect"
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { svc: string }>declare const ctx: Context.Context<{ svc: string }>const provided = Channel.provideContext(ch, ctx)provideService
Section titled “provideService”Provides a single service by key.
import { Channel, Context } from "effect"
const Greeting = Context.Reference("Greeting", { defaultValue: () => "hi" })
Channel.contextWith((ctx: Context.Context<typeof Greeting>) => Channel.succeed(Context.get(ctx, Greeting))).pipe(Channel.provideService(Greeting, "hello"))provideServiceEffect
Section titled “provideServiceEffect”Provides a service obtained from an effect (which may fail).
import { Channel, Context, Effect } from "effect"
const Token = Context.Reference("Token", { defaultValue: () => "" })
declare const ch: Channel.Channel<string, never, void, unknown, unknown, unknown, typeof Token>Channel.provideServiceEffect(ch, Token, Effect.succeed("abc"))provide
Section titled “provide”Provides a Layer or Context, removing the corresponding requirements. options.local builds a
fresh layer instance.
import { Channel, Layer } from "effect"
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { db: string }>declare const DbLive: Layer.Layer<{ db: string }>const provided = Channel.provide(ch, DbLive)updateContext
Section titled “updateContext”Transforms the surrounding context before the channel runs.
import { Channel, Context } from "effect"
declare const ch: Channel.Channel<number, never, void>Channel.updateContext(ch, (ctx: Context.Context<never>) => ctx)updateService
Section titled “updateService”Reads a service from context, transforms it, and provides the updated value.
import { Channel, Context } from "effect"
const Count = Context.Reference("Count", { defaultValue: () => 0 })
declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, typeof Count>Channel.updateService(ch, Count, (n) => n + 1)withSpan
Section titled “withSpan”Runs the channel inside a tracing span, ending the span with the channel’s exit.
import { Channel } from "effect"
Channel.fromArray([1, 2, 3]).pipe(Channel.withSpan("process-batch"))Do notation
Section titled “Do notation”The starting channel emitting an empty object.
import { Channel } from "effect"
Channel.Do // => emits {}Adds a field produced by running another channel for each emitted object.
import { Channel } from "effect"
Channel.Do.pipe( Channel.bind("x", () => Channel.succeed(1)), Channel.bind("y", () => Channel.succeed(2)))// => emits { x: 1, y: 2 }bindTo
Section titled “bindTo”Wraps each emitted element in an object under the given field name.
import { Channel } from "effect"
Channel.succeed(42).pipe(Channel.bindTo("answer"))// => emits { answer: 42 }unwrap
Section titled “unwrap”Builds a channel from a (possibly scoped) effect that yields a channel.
import { Channel, Effect } from "effect"
Channel.unwrap(Effect.succeed(Channel.fromArray([1, 2, 3])))// => emits 1, 2, 3Running and converting
Section titled “Running and converting”These interpret a channel that no longer needs upstream input, producing an Effect.
runCollect
Section titled “runCollect”Collects all output elements into an array.
import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runCollect(Channel.fromArray([1, 2, 3]))).then(console.log)// => [ 1, 2, 3 ]runDrain
Section titled “runDrain”Discards all output, returning the done value.
import { Channel, Effect } from "effect"
Effect.runPromise( Channel.fromArray([1, 2]).pipe( Channel.concatWith(() => Channel.end("done")), Channel.runDrain )).then(console.log)// => "done"runCount
Section titled “runCount”Counts the emitted elements.
import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runCount(Channel.fromArray([1, 2, 3]))).then(console.log)// => 3runForEach
Section titled “runForEach”Runs an effect for each output element, returning the done value.
import { Channel, Console, Effect } from "effect"
Effect.runPromise( Channel.runForEach(Channel.fromArray([1, 2]), (n) => Console.log(n)))runForEachWhile
Section titled “runForEachWhile”Runs an effectful predicate per element, stopping early when it returns false.
import { Channel, Effect } from "effect"
Channel.runForEachWhile(Channel.fromArray([1, 2, 3]), (n) => Effect.succeed(n < 3))// stops before emitting past 3runFold
Section titled “runFold”Folds over all output elements with a pure accumulator.
import { Channel, Effect } from "effect"
Effect.runPromise( Channel.runFold(Channel.fromArray([1, 2, 3]), () => 0, (acc, n) => acc + n)).then(console.log)// => 6runFoldEffect
Section titled “runFoldEffect”Folds over all output elements with an effectful accumulator.
import { Channel, Effect } from "effect"
Channel.runFoldEffect( Channel.fromArray([1, 2, 3]), () => 0, (acc, n) => Effect.succeed(acc + n))runHead
Section titled “runHead”Returns the first output element as an Option.
import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runHead(Channel.fromArray([1, 2, 3]))).then(console.log)// => Option.some(1)runLast
Section titled “runLast”Returns the last output element as an Option.
import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runLast(Channel.fromArray([1, 2, 3]))).then(console.log)// => Option.some(3)runDone
Section titled “runDone”Runs the channel and returns only its done value.
import { Channel, Effect } from "effect"
Effect.runPromise(Channel.runDone(Channel.end("done"))).then(console.log)// => "done"toPull
Section titled “toPull”Converts the channel to a scoped Pull for manual, low-level consumption (pulls are serialized).
import { Channel, Effect } from "effect"
const pull = Effect.scoped(Channel.toPull(Channel.fromArray([1, 2, 3])))toPullScoped
Section titled “toPullScoped”Converts the channel to a Pull within an existing scope you control.
import { Channel, Effect, Scope } from "effect"
Effect.gen(function*() { const scope = yield* Scope.make() const pull = yield* Channel.toPullScoped(Channel.fromArray([1, 2, 3]), scope) return pull})runIntoQueue
Section titled “runIntoQueue”Runs the channel, offering each element into a queue; ends or fails the queue accordingly.
import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() { const queue = yield* Queue.unbounded<number>() yield* Channel.runIntoQueue(Channel.fromArray([1, 2, 3]), queue)})runIntoQueueArray
Section titled “runIntoQueueArray”Like runIntoQueue, for a channel emitting non-empty arrays (each element is offered individually).
import { Channel, Effect, Queue } from "effect"
Effect.gen(function*() { const queue = yield* Queue.unbounded<number>() yield* Channel.runIntoQueueArray(Channel.fromArray([[1, 2], [3]]), queue)})toQueue
Section titled “toQueue”Creates a scoped queue and forks the channel to feed it for concurrent consumption.
import { Channel, Effect } from "effect"
Effect.scoped( Channel.toQueue(Channel.fromArray([1, 2, 3]), { capacity: 16 }))toQueueArray
Section titled “toQueueArray”Like toQueue, for an array-emitting channel (offers each element individually).
import { Channel, Effect } from "effect"
Effect.scoped( Channel.toQueueArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 }))runIntoPubSub
Section titled “runIntoPubSub”Runs the channel, publishing each element to a PubSub. options.shutdownOnEnd shuts it down when
done.
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.unbounded<number>() yield* Channel.runIntoPubSub(Channel.fromArray([1, 2, 3]), pubsub)})runIntoPubSubArray
Section titled “runIntoPubSubArray”Like runIntoPubSub, for an array-emitting channel (publishes each element individually).
import { Channel, Effect, PubSub } from "effect"
Effect.gen(function*() { const pubsub = yield* PubSub.unbounded<number>() yield* Channel.runIntoPubSubArray(Channel.fromArray([[1, 2], [3]]), pubsub)})toPubSub
Section titled “toPubSub”Creates a scoped PubSub and forks the channel to feed it. shutdownOnEnd defaults to true.
import { Channel, Effect } from "effect"
Effect.scoped( Channel.toPubSub(Channel.fromArray([1, 2, 3]), { capacity: 16 }))toPubSubArray
Section titled “toPubSubArray”Like toPubSub, for an array-emitting channel (publishes each element individually).
import { Channel, Effect } from "effect"
Effect.scoped( Channel.toPubSubArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 }))toPubSubTake
Section titled “toPubSubTake”Creates a scoped PubSub of Take values; the channel’s final Exit is published so subscribers
can observe completion or failure.
import { Channel, Effect } from "effect"
Effect.scoped( Channel.toPubSubTake(Channel.fromArray([[1, 2], [3]]), { capacity: 16 }))See also
Section titled “See also”- Creating Streams — the high-level API most code should use.
- Sink — consuming streams; sinks are channels too.
- Transforming Streams — stream operators built on channels.