# Channel

`Channel` is the low-level streaming primitive that both [`Stream`](https://effect.plants.sh/streaming/creating-streams/)
and [`Sink`](https://effect.plants.sh/streaming/sink/) are built on. Most application code never touches it directly —
streams and sinks cover the common cases far more ergonomically. You reach for `Channel` when you
are building a **new** stream operator, adapting a pull-based source, or need fine-grained control
over how input, output, errors, the final value, and resources compose.

## The mental model

A channel has seven type parameters:

```ts
import type { Channel } from "effect"

type Example = Channel.Channel<
  number,      // OutElem  — elements it emits downstream
  Error,       // OutErr   — error it can fail with
  string,      // OutDone  — terminal value when it finishes
  Uint8Array,  // InElem   — elements it reads from upstream
  Error,       // InErr    — error it may receive from upstream
  unknown,     // InDone   — upstream terminal value
  never        // Env      — services required while running
>
```

Think of a channel as a process with **two sides** and a **final value**:

- The **output side** (`OutElem`, `OutErr`, `OutDone`) is what the channel produces: a sequence of
  elements, an optional typed failure, and a single done value when it completes.
- The **input side** (`InElem`, `InErr`, `InDone`) is the upstream protocol it consumes when piped
  after another channel. A source channel ignores its input (the input params default to `unknown`).
- `Env` is the Effect environment needed while the channel is interpreted.

The done value is **distinct** from the emitted elements. `Channel.fromArray([1, 2, 3])` emits
`1, 2, 3` and completes with `void`; a channel can also emit nothing and complete with a value
(`Channel.end("done")`).

### How Stream and Sink relate

- A **`Stream<A, E, R>`** is essentially a `Channel<NonEmptyReadonlyArray<A>, E, void, unknown, unknown, unknown, R>`
  — a channel that ignores its input and emits **chunks** (non-empty arrays) of elements. Convert
  between them with `Stream.toChannel` / `Stream.fromChannel`.
- A **`Sink`** consumes a channel's output and produces a summary value; it uses the input side. Use
  `Sink.toChannel` / `Sink.fromChannel`.
**Streams are chunked:** Because `Stream` works in chunks, channels you splice into a stream pipeline operate on
`NonEmptyReadonlyArray<A>`, not individual elements. The text codecs below
(`Channel.decodeText`, `Channel.splitLines`, `Channel.encodeText`) all follow this chunked shape.

```ts
import { Channel, Effect, Stream } from "effect"

// A Stream is a Channel under the hood
const stream = Stream.make(1, 2, 3)
const asChannel = Stream.toChannel(stream)
// Channel<NonEmptyReadonlyArray<number>, never, void, ...>

const backToStream = Stream.fromChannel(asChannel)
```

## A practical example: a custom decode channel

Channels shine when you build a reusable stream operator. Here we splice a custom byte-decoding
channel into a stream with `Stream.pipeThroughChannel`. The channel reads chunks of `Uint8Array`,
decodes them to text, and splits the text into lines.

```ts
import { Channel, Effect, Stream } from "effect"

// A channel that turns chunks of bytes into chunks of lines.
// InElem  = NonEmptyReadonlyArray<Uint8Array>  (what a byte Stream emits)
// OutElem = NonEmptyReadonlyArray<string>      (what a line Stream wants)
const bytesToLines = Channel.decodeText<never, void>().pipe(
  Channel.pipeTo(Channel.splitLines())
)

const bytes = Stream.make(
  new TextEncoder().encode("hello\nwor"),
  new TextEncoder().encode("ld\nbye\n")
)

const lines = bytes.pipe(Stream.pipeThroughChannel(bytesToLines))

Effect.runPromise(Stream.runCollect(lines)).then(console.log)
// => [ "hello", "world", "bye" ]
```

The same channel can be run on its own with the `run*` family if you do not need a stream:

```ts
import { Channel, Effect } from "effect"

const program = Channel.fromArray([1, 2, 3]).pipe(
  Channel.map((n) => n * 2),
  Channel.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 2, 4, 6 ]
```

---

# Reference

Every public export, grouped by purpose. Each entry has a short description and a runnable example.
Reconcile against the source if you depend on exact variance.

## Guards and constants

### `isChannel`

Checks whether a value is a `Channel`.

```ts
import { Channel } from "effect"

Channel.isChannel(Channel.succeed(42)) // => true
Channel.isChannel("nope") // => false
```

### `TypeId`

The string brand (`"~effect/Channel"`) stored on channel values; used by `isChannel`.

### `DefaultChunkSize`

The default chunk size used by array-batching constructors.

```ts
import { Channel } from "effect"

console.log(Channel.DefaultChunkSize) // => 4096
```

### `HaltStrategy`

The string literal type for `merge` halting: `"left" | "right" | "both" | "either"`.

## Low-level constructors

These build channels directly from pull functions. You rarely need them unless implementing new
primitives.

### `fromTransform`

Builds a channel from a function that turns the upstream `Pull` into a new `Pull` inside a scope.
This is the foundational constructor — most others are defined in terms of it.

```ts
import { Channel, Effect } from "effect"

// Pass the upstream pull through unchanged (an identity channel)
const ch = Channel.fromTransform((upstream, _scope) => Effect.succeed(upstream))
```

### `fromTransformBracket`

Like `fromTransform`, but also provides a forked scope that closes when the resulting channel
completes — useful for scoped resource lifecycles inside a custom channel.

### `fromPull`

Builds a channel from an `Effect` that produces a `Pull` (a repeatedly evaluated effect that emits
elements or signals done).

```ts
import { Channel, Effect } from "effect"

const ch = Channel.fromPull(Effect.succeed(Effect.succeed(42)))
```

### `transformPull`

Transforms an existing channel by rewriting its `Pull` implementation.

```ts
import { Channel, Effect } from "effect"

const doubled = Channel.transformPull(
  Channel.fromIterable([1, 2, 3]),
  (pull) => Effect.succeed(Effect.map(pull, (n) => n * 2))
)
// => emits 2, 4, 6
```

### `toTransform`

The inverse of `fromTransform`: extracts a channel's underlying transform function.

```ts
import { Channel } from "effect"

const transform = Channel.toTransform(Channel.succeed(42))
```

### `callback`

Creates a channel that an imperative callback fills via a `Queue`. Optional `bufferSize` and
`strategy` (`"sliding" | "dropping" | "suspend"`) control backpressure.

```ts
import { Channel, Effect, Queue } from "effect"

const ch = Channel.callback<number>((queue) =>
  Effect.gen(function*() {
    yield* Queue.offer(queue, 1)
    yield* Queue.offer(queue, 2)
  })
)
// => emits 1, 2
```

### `callbackArray`

Like `callback`, but emits non-empty **arrays** of the queued elements.

```ts
import { Channel, Effect, Queue } from "effect"

const ch = Channel.callbackArray<number>((queue) =>
  Queue.offerAll(queue, [1, 2, 3]).pipe(Effect.asVoid)
)
// => emits [1, 2, 3]
```

### `suspend`

Lazily defers channel construction until the channel is run.

```ts
import { Channel } from "effect"

const ch = Channel.suspend(() => Channel.succeed(Math.random()))
```

## Value constructors

### `succeed`

Emits a single value, then completes.

```ts
import { Channel } from "effect"

Channel.succeed(42) // => emits 42
```

### `sync`

Emits a single lazily computed value.

```ts
import { Channel } from "effect"

Channel.sync(() => Date.now()) // => emits the time when run
```

### `end`

Completes immediately with a done value, emitting **nothing**.

```ts
import { Channel } from "effect"

Channel.end("done") // => emits nothing, done = "done"
```

### `endSync`

Like `end`, but the done value is computed lazily.

```ts
import { Channel } from "effect"

Channel.endSync(() => "done")
```

### `empty`

A channel that emits no elements and completes with `void`.

```ts
import { Channel } from "effect"

Channel.empty // => emits nothing
```

### `never`

A channel that never emits and never completes.

```ts
import { Channel } from "effect"

Channel.never // => hangs forever
```

### `identity`

Forwards upstream input elements, errors, and the done value unchanged.

```ts
import { Channel } from "effect"

const id = Channel.identity<number, never, void>()
// pipe a channel into this and it passes through
```

### `fail`

Fails immediately with the given error.

```ts
import { Channel } from "effect"

Channel.fail("boom") // => fails with "boom"
```

### `failSync`

Fails with a lazily computed error.

```ts
import { Channel } from "effect"

Channel.failSync(() => new Error("late"))
```

### `failCause`

Fails immediately with a `Cause`.

```ts
import { Cause, Channel } from "effect"

Channel.failCause(Cause.fail("boom"))
```

### `failCauseSync`

Fails with a lazily computed `Cause`.

```ts
import { Cause, Channel } from "effect"

Channel.failCauseSync(() => Cause.die("defect"))
```

### `die`

Fails with an unrecoverable defect.

```ts
import { Channel } from "effect"

Channel.die(new Error("bug")) // => dies
```

## Constructors from sources

### `fromEffect`

Runs an effect and emits its success as a single element.

```ts
import { Channel, Effect } from "effect"

Channel.fromEffect(Effect.succeed("hi")) // => emits "hi"
```

### `fromEffectDone`

Runs an effect and uses its success as the **done value**, emitting no elements.

```ts
import { Channel, Effect } from "effect"

Channel.fromEffectDone(Effect.succeed(42)) // => emits nothing, done = 42
```

### `fromEffectDrain`

Runs an effect and discards its result, emitting nothing.

```ts
import { Channel, Effect, Console } from "effect"

Channel.fromEffectDrain(Console.log("side effect"))
```

### `fromEffectTake`

Builds a channel from an effect producing a `Take`: success emits a non-empty array, failure fails
the channel, and a done `Take` completes it.

```ts
import { Channel, Effect } from "effect"

// A `Take<A>` is a non-empty array of elements, or an `Exit` for done/failure.
Channel.fromEffectTake(Effect.succeed([1, 2, 3] as const))
// => emits [1, 2, 3]
```

### `fromIterator`

Emits every value produced by an iterator; the iterator's return becomes the done value.

```ts
import { Channel } from "effect"

Channel.fromIterator(() => [1, 2, 3][Symbol.iterator]())
// => emits 1, 2, 3
```

### `fromIteratorArray`

Like `fromIterator`, but emits non-empty arrays batched by `chunkSize` (default `DefaultChunkSize`).

```ts
import { Channel } from "effect"

Channel.fromIteratorArray(() => [1, 2, 3, 4, 5][Symbol.iterator](), 2)
// => emits [1, 2], [3, 4], [5]
```

### `fromIterable`

Emits every element of an iterable.

```ts
import { Channel } from "effect"

Channel.fromIterable(new Set([1, 2, 3])) // => emits 1, 2, 3
```

### `fromIterableArray`

Like `fromIterable`, but emits non-empty arrays batched by `chunkSize`.

```ts
import { Channel } from "effect"

Channel.fromIterableArray([1, 2, 3, 4], 2)
// => emits [1, 2], [3, 4]
```

### `fromArray`

Emits each element of an array, then completes.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]) // => emits 1, 2, 3
```

### `fromChunk`

Emits each element of a `Chunk`.

```ts
import { Channel, Chunk } from "effect"

Channel.fromChunk(Chunk.make(1, 2, 3)) // => emits 1, 2, 3
```

### `fromQueue`

Reads elements one at a time from a queue.

```ts
import { Channel, Effect, Queue } from "effect"

Effect.gen(function*() {
  const queue = yield* Queue.unbounded<number>()
  yield* Queue.offerAll(queue, [1, 2, 3])
  return Channel.fromQueue(queue)
})
```

### `fromQueueArray`

Reads available elements from a queue as non-empty arrays.

```ts
import { Channel, Effect, Queue } from "effect"

Effect.gen(function*() {
  const queue = yield* Queue.unbounded<number>()
  yield* Queue.offerAll(queue, [1, 2, 3])
  return Channel.fromQueueArray(queue) // => emits [1, 2, 3]
})
```

### `fromSubscription`

Emits values received from a `PubSub` subscription.

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(16)
  const sub = yield* PubSub.subscribe(pubsub)
  return Channel.fromSubscription(sub)
})
```

### `fromSubscriptionArray`

Like `fromSubscription`, but emits received values as non-empty arrays.

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(16)
  const sub = yield* PubSub.subscribe(pubsub)
  return Channel.fromSubscriptionArray(sub)
})
```

### `fromPubSub`

Subscribes to a `PubSub` and emits each published value.

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<number>()
  return Channel.fromPubSub(pubsub)
})
```

### `fromPubSubArray`

Like `fromPubSub`, but emits published values as non-empty arrays.

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<number>()
  return Channel.fromPubSubArray(pubsub)
})
```

### `fromPubSubTake`

Subscribes to a `PubSub` of `Take` values and emits the take outputs as non-empty arrays.

```ts
import { Channel, Effect, PubSub } from "effect"
import type { Take } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<Take.Take<number>>()
  return Channel.fromPubSubTake(pubsub)
})
```

### `fromSchedule`

Emits each schedule output; the done value is the last output.

```ts
import { Channel, Schedule } from "effect"

Channel.fromSchedule(Schedule.spaced("1 second"))
// => emits the recurrence count on each tick
```

### `fromAsyncIterable`

Pulls values from an `AsyncIterable`; the iterator return becomes the done value. The `onError`
function maps thrown/rejected errors to a typed channel error.

```ts
import { Channel } from "effect"

async function* gen() {
  yield 1
  yield 2
}

Channel.fromAsyncIterable(gen(), (e) => new Error(String(e)))
// => emits 1, 2
```

### `fromAsyncIterableArray`

Like `fromAsyncIterable`, but emits each value as a single-element non-empty array.

```ts
import { Channel } from "effect"

async function* gen() {
  yield 1
}

Channel.fromAsyncIterableArray(gen(), (e) => new Error(String(e)))
// => emits [1]
```

## Resource management

### `acquireUseRelease`

Acquires a resource, uses it to build a channel, and runs `release` with the channel's `Exit` when
it finishes, fails, or is interrupted. Acquisition is uninterruptible.

```ts
import { Channel, Effect } from "effect"

Channel.acquireUseRelease(
  Effect.succeed("conn"),
  (conn) => Channel.succeed(conn.toUpperCase()),
  (conn, _exit) => Effect.log(`closing ${conn}`)
)
```

### `acquireRelease`

Acquires a resource, emits it as a single element, and registers `release` in the channel scope.

```ts
import { Channel, Effect } from "effect"

Channel.acquireRelease(
  Effect.succeed("conn"),
  (conn, _exit) => Effect.log(`closing ${conn}`)
)
// => emits "conn"
```

### `scoped`

Runs a channel with a `Scope` provided for its lifetime, removing the `Scope` requirement.

```ts
import { Channel, Effect, Scope } from "effect"

declare const scopedChannel: Channel.Channel<number, never, void, unknown, unknown, unknown, Scope.Scope>
const ch = Channel.scoped(scopedChannel) // Scope removed from Env
```

### `ensuring`

Attaches a finalizer that always runs once the channel begins, regardless of outcome.

```ts
import { Channel, Console } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.ensuring(Console.log("done"))
)
```

### `onExit`

Attaches an exit-aware finalizer that receives the channel's `Exit`.

```ts
import { Channel, Console, Exit } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.onExit((exit) =>
    Console.log(Exit.isSuccess(exit) ? "ok" : "failed")
  )
)
```

### `onError`

Attaches a finalizer that runs **only on failure**, receiving the failure `Cause`.

```ts
import { Channel, Console } from "effect"

Channel.fail("boom").pipe(
  Channel.onError((cause) => Console.error(cause))
)
```

### `onStart`

Runs an effect before the channel starts; its success is ignored, its failure aborts the channel.

```ts
import { Channel, Console } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.onStart(Console.log("starting"))
)
```

### `onFirst`

Runs an effect the first time the channel emits an element (which is still emitted unchanged).

```ts
import { Channel, Console } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.onFirst((n) => Console.log(`first: ${n}`))
)
```

### `onEnd`

Runs an effect when the channel completes successfully, before the done value propagates.

```ts
import { Channel, Console } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.onEnd(Console.log("finished"))
)
```

## Transforming output

### `map`

Maps each output element with a pure function (receives the element and its index).

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(Channel.map((n) => n * 2))
// => emits 2, 4, 6
```

### `mapDone`

Maps the channel's done value.

```ts
import { Channel } from "effect"

Channel.end(3).pipe(Channel.mapDone((n) => n + 1))
// => done = 4
```

### `mapDoneEffect`

Maps the done value with an effectful function.

```ts
import { Channel, Effect } from "effect"

Channel.end(3).pipe(
  Channel.mapDoneEffect((n) => Effect.succeed(n * 10))
)
// => done = 30
```

### `mapEffect`

Maps each element with an effect. `options.concurrency` and `options.unordered` control parallelism.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.mapEffect((n) => Effect.succeed(n * 2), { concurrency: 2 })
)
// => emits 2, 4, 6
```

### `mapInput`

Maps the channel's **input** elements with an effectful function (contravariant side).

```ts
import { Channel, Effect } from "effect"

Channel.identity<number, never, void>().pipe(
  Channel.mapInput((s: string) => Effect.succeed(s.length))
)
```

### `mapInputError`

Maps the channel's **input** errors.

```ts
import { Channel, Effect } from "effect"

Channel.identity<number, Error, void>().pipe(
  Channel.mapInputError((s: string) => Effect.succeed(new Error(s)))
)
```

### `tap`

Runs a side effect on each element, leaving the element unchanged.

```ts
import { Channel, Console } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.tap((n) => Console.log(n))
)
// => emits 1, 2 while logging each
```

### `flatMap`

Maps each element to a channel and flattens the children. `options.concurrency` /
`options.bufferSize` control concurrency.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.flatMap((n) => Channel.fromArray([n, n * 10]))
)
// => emits 1, 10, 2, 20
```

### `flatten`

Flattens a channel whose elements are themselves channels.

```ts
import { Channel } from "effect"

Channel.flatten(
  Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3])])
)
// => emits 1, 2, 3
```

### `flattenArray`

Flattens a channel of arrays into individual elements.

```ts
import { Channel } from "effect"

Channel.flattenArray(Channel.fromArray([[1, 2], [3, 4]]))
// => emits 1, 2, 3, 4
```

### `flattenTake`

Flattens a channel of `Take` values: successes become non-empty arrays, failures fail, done
completes.

```ts
import { Channel } from "effect"

// Each `Take` here is a non-empty array of elements.
Channel.flattenTake(Channel.fromArray([[1, 2], [3]] as const))
// => emits [1, 2], [3]
```

### `filter`

Keeps only elements matching a predicate (refinements narrow the output type).

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3, 4]).pipe(Channel.filter((n) => n % 2 === 0))
// => emits 2, 4
```

### `filterMap`

Keeps and maps elements with a `Filter`; rejected elements are dropped.

```ts
import { Channel, Filter } from "effect"

Channel.fromArray([1, 2, 3, 4]).pipe(
  Channel.filterMap(Filter.fromPredicate((n: number) => n % 2 === 0))
)
// => emits 2, 4
```

### `filterEffect`

Filters elements with an effectful predicate.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.filterEffect((n) => Effect.succeed(n > 1))
)
// => emits 2, 3
```

### `filterMapEffect`

Filters and maps elements with an effectful `Filter`.

```ts
import { Channel, Effect, Result } from "effect"

// A FilterEffect returns Effect<Result<Pass, Fail>>: success keeps & maps, failure drops.
Channel.fromArray([1, 2, 3]).pipe(
  Channel.filterMapEffect((n: number) =>
    Effect.succeed(n > 1 ? Result.succeed(n * 10) : Result.fail(n))
  )
)
// => emits 20, 30
```

### `filterArray`

Filters each element **inside emitted arrays**, emitting only non-empty filtered arrays.

```ts
import { Channel } from "effect"

Channel.fromArray([[1, 2, 3], [4, 5]]).pipe(
  Channel.filterArray((n) => n % 2 === 1)
)
// => emits [1, 3], [5]
```

### `filterMapArray`

Filters and maps each element inside emitted arrays with a `Filter`.

```ts
import { Channel, Filter } from "effect"

Channel.fromArray([[1, 2], [3, 4]]).pipe(
  Channel.filterMapArray(Filter.fromPredicate((n: number) => n % 2 === 0))
)
// => emits [2], [4]
```

### `filterArrayEffect`

Filters each element inside emitted arrays with an effectful predicate.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([[1, 2], [3, 4]]).pipe(
  Channel.filterArrayEffect((n) => Effect.succeed(n > 2))
)
// => emits [3, 4]
```

### `filterMapArrayEffect`

Filters and maps each element inside emitted arrays with an effectful `Filter`.

```ts
import { Channel, Effect, Result } from "effect"

Channel.fromArray([[1, 2], [3, 4]]).pipe(
  Channel.filterMapArrayEffect((n: number) =>
    Effect.succeed(n > 2 ? Result.succeed(n) : Result.fail(n))
  )
)
// => emits [3, 4]
```

### `mapAccum`

Stateful map: each element updates an accumulator and yields zero or more output values. The
`onHalt` option can flush remaining state.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.mapAccum(() => 0, (sum, n) => [sum + n, [sum + n]] as const)
)
// => emits 1, 3, 6
```

### `scan`

Emits the initial accumulator plus each intermediate result as it folds over the elements.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(Channel.scan(0, (sum, n) => sum + n))
// => emits 0, 1, 3, 6
```

### `scanEffect`

Like `scan`, but the accumulator function is effectful.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.scanEffect(0, (sum, n) => Effect.succeed(sum + n))
)
// => emits 0, 1, 3, 6
```

### `switchMap`

Maps each element to a channel, but starting a new child interrupts the previous one (default
concurrency `1`).

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.switchMap((n) => Channel.fromArray([`v-${n}`]))
)
// => emits "v-1", "v-2", "v-3"
```

### `embedInput`

Runs an input handler against upstream while the wrapped channel runs without receiving input
directly.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.embedInput((upstream) =>
    upstream.pipe(Effect.forever, Effect.ignore)
  )
)
```

### `buffer`

Buffers individual output elements in a queue so a fast producer runs ahead of a slow consumer.
`strategy` defaults to `"suspend"` (backpressure); `"dropping"`/`"sliding"` may discard.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.buffer({ capacity: 16, strategy: "suspend" })
)
```

### `bufferArray`

Buffers array output elements; upstream array boundaries are not preserved.

```ts
import { Channel } from "effect"

Channel.fromArray([[1, 2], [3]]).pipe(
  Channel.bufferArray({ capacity: 16 })
)
```

## Composition

### `concat`

Runs the second channel after the first completes, concatenating their output.

```ts
import { Channel } from "effect"

Channel.concat(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]))
// => emits 1, 2, 3, 4
```

### `concatWith`

Like `concat`, but the second channel is built from the first channel's done value.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.concatWith(() => Channel.succeed(99))
)
// => emits 1, 2, 99
```

### `combine`

Combines two channels with a stateful pull function that reads from both sides.

```ts
import { Channel, Effect } from "effect"

Channel.combine(
  Channel.fromArray([1, 2]),
  Channel.fromArray([10, 20]),
  () => 0,
  (s, left, right) =>
    Effect.map(Effect.zip(left, right), ([a, b]) => [a + b, s] as const)
)
// => emits 11, 22
```

### `orElseIfEmpty`

Runs a fallback channel if the source completes without emitting any element.

```ts
import { Channel } from "effect"

Channel.empty.pipe(
  Channel.orElseIfEmpty(() => Channel.succeed("fallback"))
)
// => emits "fallback"
```

### `pipeTo`

Pipes this channel's output into another channel's input. The result keeps this channel's input
type and the other's output/done types.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.pipeTo(Channel.identity<number, never, void>())
)
// => emits 1, 2, 3
```

### `pipeToOrFail`

Like `pipeTo`, but this channel's failures are preserved and not observed by the downstream channel.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.pipeToOrFail(Channel.identity<number, never, void>())
)
```

### `merge`

Runs two channels concurrently, interleaving their output. `haltStrategy` controls when the merged
channel stops (`"left" | "right" | "both" | "either"`, default `"both"`).

```ts
import { Channel } from "effect"

Channel.merge(Channel.fromArray([1, 2]), Channel.fromArray([3, 4]), {
  haltStrategy: "either"
})
// => emits 1, 2, 3, 4 (order may interleave)
```

### `mergeAll`

Merges a channel of channels with bounded `concurrency` and `bufferSize`.

```ts
import { Channel } from "effect"

Channel.mergeAll({ concurrency: 2 })(
  Channel.fromArray([Channel.fromArray([1, 2]), Channel.fromArray([3, 4])])
)
// => emits 1, 2, 3, 4 (order may interleave)
```

### `mergeEffect`

Runs an effect concurrently with a channel, emitting only the channel's output and failing if the
effect fails.

```ts
import { Channel, Effect, Console } from "effect"

Channel.fromArray([1, 2]).pipe(
  Channel.mergeEffect(Console.log("running alongside"))
)
```

## Repetition

### `repeat`

Repeats the channel according to a `Schedule`.

```ts
import { Channel, Schedule } from "effect"

Channel.fromArray([1, 2]).pipe(Channel.repeat(Schedule.recurs(1)))
// => emits 1, 2, 1, 2
```

### `forever`

Repeats the channel forever (done type becomes `never`).

```ts
import { Channel } from "effect"

Channel.forever(Channel.fromArray([1, 2]))
// => emits 1, 2, 1, 2, ...
```

### `schedule`

Runs a schedule step for each output element, applying delays between elements.

```ts
import { Channel, Schedule } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.schedule(Schedule.spaced("100 millis"))
)
```

### `drain`

Consumes all output, emitting nothing and preserving only the done value.

```ts
import { Channel } from "effect"

Channel.drain(Channel.fromArray([1, 2, 3]))
// => emits nothing
```

## Error handling

### `catchCause`

Recovers from any failure `Cause` by building a replacement channel.

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(
  Channel.catchCause(() => Channel.succeed("recovered"))
)
// => emits "recovered"
```

### `catch` (`catch_`)

Recovers from a typed error value by building a replacement channel.

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(
  Channel.catch((err) => Channel.succeed(`saw ${err}`))
)
// => emits "saw boom"
```

### `catchCauseIf`

Recovers from a failure `Cause` only when a predicate over the cause matches.

```ts
import { Cause, Channel } from "effect"

Channel.fail("boom").pipe(
  Channel.catchCauseIf(Cause.hasFails, () => Channel.succeed("ok"))
)
```

### `catchCauseFilter`

Recovers using a `Filter` over the `Cause`; the recovery function receives the selected value.

```ts
import { Cause, Channel } from "effect"

// `Cause.findError` is a Filter<Cause<E>, E, ...> selecting the first failure error.
Channel.fail("boom").pipe(
  Channel.catchCauseFilter(Cause.findError, (err) => Channel.succeed(`saw ${err}`))
)
```

### `catchIf`

Recovers from typed errors that match a predicate or refinement; non-matching errors propagate.

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(
  Channel.catchIf((e): e is string => typeof e === "string", () => Channel.succeed("ok"))
)
```

### `catchFilter`

Recovers from errors selected by a `Filter`; the recovery function receives the filtered value.

```ts
import { Channel, Filter } from "effect"

Channel.fail({ _tag: "Net" } as const).pipe(
  Channel.catchFilter(
    Filter.fromPredicate((e: { _tag: string }) => e._tag === "Net"),
    () => Channel.succeed("ok")
  )
)
```

### `catchTag`

Recovers from a specific tagged error (or array of tags), with an optional `orElse` for the rest.

```ts
import { Channel, Data } from "effect"

class NetError extends Data.TaggedError("NetError")<{ url: string }> {}

Channel.fail(new NetError({ url: "/x" })).pipe(
  Channel.catchTag("NetError", (e) => Channel.succeed(`retry ${e.url}`))
)
```

### `catchReason`

Recovers from a specific `reason` tag nested inside a tagged error.

```ts
import { Channel, Data } from "effect"

class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}

Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe(
  Channel.catchReason("AiError", "RateLimit", (r) => Channel.succeed(`wait ${r.retryAfter}`))
)
```

### `catchReasons`

Recovers from multiple nested reasons using an object of handlers keyed by reason tag.

```ts
import { Channel, Data } from "effect"

class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class Quota extends Data.TaggedError("Quota")<{ limit: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit | Quota }> {}

Channel.fail(new AiError({ reason: new Quota({ limit: 5 }) })).pipe(
  Channel.catchReasons("AiError", {
    RateLimit: (r) => Channel.succeed(`wait ${r.retryAfter}`),
    Quota: (r) => Channel.succeed(`limit ${r.limit}`)
  })
)
```

### `unwrapReason`

Flattens a tagged error's nested `reason` into the channel's own error channel.

```ts
import { Channel, Data } from "effect"

class RateLimit extends Data.TaggedError("RateLimit")<{ retryAfter: number }> {}
class AiError extends Data.TaggedError("AiError")<{ reason: RateLimit }> {}

Channel.fail(new AiError({ reason: new RateLimit({ retryAfter: 60 }) })).pipe(
  Channel.unwrapReason("AiError")
)
// now fails with RateLimit directly
```

### `tapCause`

Runs an effect with the failure `Cause`, then re-raises the original failure.

```ts
import { Channel, Console } from "effect"

Channel.fail("boom").pipe(
  Channel.tapCause((cause) => Console.error(cause))
)
```

### `tapError`

Runs an effect when the channel fails with a typed error, then preserves the failure.

```ts
import { Channel, Console } from "effect"

Channel.fail("boom").pipe(
  Channel.tapError((err) => Console.error(err))
)
```

### `mapError`

Transforms the channel's typed error.

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(Channel.mapError((s) => new Error(s)))
```

### `orDie`

Converts typed errors into unrecoverable defects (error channel becomes `never`).

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(Channel.orDie)
```

### `ignore`

Ignores typed errors, turning a failed channel into an empty one. Use `{ log: true }` to log the
cause first.

```ts
import { Channel } from "effect"

Channel.fail("boom").pipe(Channel.ignore())
// => emits nothing, no failure
```

### `ignoreCause`

Like `ignore`, but also swallows defects.

```ts
import { Channel } from "effect"

Channel.die("bug").pipe(Channel.ignoreCause())
```

### `retry`

Retries the channel according to a `Schedule` whenever it fails.

```ts
import { Channel, Schedule } from "effect"

Channel.fail("boom").pipe(Channel.retry(Schedule.recurs(3)))
```

## Interruption

### `interruptWhen`

Races the channel against an effect; if the effect finishes first, its value becomes the done value.

```ts
import { Channel, Effect } from "effect"

Channel.never.pipe(
  Channel.interruptWhen(Effect.as(Effect.sleep("1 second"), "timeout"))
)
// => done = "timeout"
```

### `haltWhen`

Stops the channel when an effect completes or fails; the effect's success becomes the done value,
its failure fails the channel.

```ts
import { Channel, Effect } from "effect"

Channel.fromArray([1, 2, 3]).pipe(
  Channel.haltWhen(Effect.as(Effect.sleep("1 second"), "stop"))
)
```

## Text codecs

These operate on **chunks** (`NonEmptyReadonlyArray`), matching the shape `Stream` uses.

### `splitLines`

Splits incoming string chunks into lines, recognizing `\n`, `\r\n`, and standalone `\r`, even across
chunk boundaries.

```ts
import { Channel, Effect, Stream } from "effect"

Effect.runPromise(
  Stream.runCollect(Stream.splitLines(Stream.make("hel", "lo\r\nwor", "ld\n")))
).then(console.log)
// => [ "hello", "world" ]
```

### `decodeText`

Decodes incoming `Uint8Array` chunks into string chunks using `TextDecoder` with streaming enabled,
so multi-byte characters may span chunk boundaries. Highlighted in the v4 channel improvements.

```ts
import { Channel } from "effect"

const decode = Channel.decodeText<never, void>("utf-8")
// InElem: NonEmptyReadonlyArray<Uint8Array>  ->  OutElem: NonEmptyReadonlyArray<string>
```

### `encodeText`

Encodes incoming string chunks into `Uint8Array` chunks using `TextEncoder`.

```ts
import { Channel } from "effect"

const encode = Channel.encodeText<never, void>()
// InElem: NonEmptyReadonlyArray<string>  ->  OutElem: NonEmptyReadonlyArray<Uint8Array>
```

## Context and services

### `contextWith`

Builds a channel from the surrounding `Context`.

```ts
import { Channel, Context } from "effect"

const Config = Context.Reference("Config", { defaultValue: () => ({ n: 1 }) })

Channel.contextWith((ctx: Context.Context<typeof Config>) =>
  Channel.succeed(Context.get(ctx, Config).n)
)
```

### `provideContext`

Provides a `Context`, removing those service requirements.

```ts
import { Channel, Context } from "effect"

declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { svc: string }>
declare const ctx: Context.Context<{ svc: string }>
const provided = Channel.provideContext(ch, ctx)
```

### `provideService`

Provides a single service by key.

```ts
import { Channel, Context } from "effect"

const Greeting = Context.Reference("Greeting", { defaultValue: () => "hi" })

Channel.contextWith((ctx: Context.Context<typeof Greeting>) =>
  Channel.succeed(Context.get(ctx, Greeting))
).pipe(Channel.provideService(Greeting, "hello"))
```

### `provideServiceEffect`

Provides a service obtained from an effect (which may fail).

```ts
import { Channel, Context, Effect } from "effect"

const Token = Context.Reference("Token", { defaultValue: () => "" })

declare const ch: Channel.Channel<string, never, void, unknown, unknown, unknown, typeof Token>
Channel.provideServiceEffect(ch, Token, Effect.succeed("abc"))
```

### `provide`

Provides a `Layer` or `Context`, removing the corresponding requirements. `options.local` builds a
fresh layer instance.

```ts
import { Channel, Layer } from "effect"

declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, { db: string }>
declare const DbLive: Layer.Layer<{ db: string }>
const provided = Channel.provide(ch, DbLive)
```

### `updateContext`

Transforms the surrounding context before the channel runs.

```ts
import { Channel, Context } from "effect"

declare const ch: Channel.Channel<number, never, void>
Channel.updateContext(ch, (ctx: Context.Context<never>) => ctx)
```

### `updateService`

Reads a service from context, transforms it, and provides the updated value.

```ts
import { Channel, Context } from "effect"

const Count = Context.Reference("Count", { defaultValue: () => 0 })

declare const ch: Channel.Channel<number, never, void, unknown, unknown, unknown, typeof Count>
Channel.updateService(ch, Count, (n) => n + 1)
```

### `withSpan`

Runs the channel inside a tracing span, ending the span with the channel's exit.

```ts
import { Channel } from "effect"

Channel.fromArray([1, 2, 3]).pipe(Channel.withSpan("process-batch"))
```

## Do notation

### `Do`

The starting channel emitting an empty object.

```ts
import { Channel } from "effect"

Channel.Do // => emits {}
```

### `bind`

Adds a field produced by running another channel for each emitted object.

```ts
import { Channel } from "effect"

Channel.Do.pipe(
  Channel.bind("x", () => Channel.succeed(1)),
  Channel.bind("y", () => Channel.succeed(2))
)
// => emits { x: 1, y: 2 }
```

### `bindTo`

Wraps each emitted element in an object under the given field name.

```ts
import { Channel } from "effect"

Channel.succeed(42).pipe(Channel.bindTo("answer"))
// => emits { answer: 42 }
```

### `unwrap`

Builds a channel from a (possibly scoped) effect that yields a channel.

```ts
import { Channel, Effect } from "effect"

Channel.unwrap(Effect.succeed(Channel.fromArray([1, 2, 3])))
// => emits 1, 2, 3
```

## Running and converting

These interpret a channel that no longer needs upstream input, producing an `Effect`.

### `runCollect`

Collects all output elements into an array.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(Channel.runCollect(Channel.fromArray([1, 2, 3]))).then(console.log)
// => [ 1, 2, 3 ]
```

### `runDrain`

Discards all output, returning the done value.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(
  Channel.fromArray([1, 2]).pipe(
    Channel.concatWith(() => Channel.end("done")),
    Channel.runDrain
  )
).then(console.log)
// => "done"
```

### `runCount`

Counts the emitted elements.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(Channel.runCount(Channel.fromArray([1, 2, 3]))).then(console.log)
// => 3
```

### `runForEach`

Runs an effect for each output element, returning the done value.

```ts
import { Channel, Console, Effect } from "effect"

Effect.runPromise(
  Channel.runForEach(Channel.fromArray([1, 2]), (n) => Console.log(n))
)
```

### `runForEachWhile`

Runs an effectful predicate per element, stopping early when it returns `false`.

```ts
import { Channel, Effect } from "effect"

Channel.runForEachWhile(Channel.fromArray([1, 2, 3]), (n) => Effect.succeed(n < 3))
// stops before emitting past 3
```

### `runFold`

Folds over all output elements with a pure accumulator.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(
  Channel.runFold(Channel.fromArray([1, 2, 3]), () => 0, (acc, n) => acc + n)
).then(console.log)
// => 6
```

### `runFoldEffect`

Folds over all output elements with an effectful accumulator.

```ts
import { Channel, Effect } from "effect"

Channel.runFoldEffect(
  Channel.fromArray([1, 2, 3]),
  () => 0,
  (acc, n) => Effect.succeed(acc + n)
)
```

### `runHead`

Returns the first output element as an `Option`.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(Channel.runHead(Channel.fromArray([1, 2, 3]))).then(console.log)
// => Option.some(1)
```

### `runLast`

Returns the last output element as an `Option`.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(Channel.runLast(Channel.fromArray([1, 2, 3]))).then(console.log)
// => Option.some(3)
```

### `runDone`

Runs the channel and returns only its done value.

```ts
import { Channel, Effect } from "effect"

Effect.runPromise(Channel.runDone(Channel.end("done"))).then(console.log)
// => "done"
```

### `toPull`

Converts the channel to a scoped `Pull` for manual, low-level consumption (pulls are serialized).

```ts
import { Channel, Effect } from "effect"

const pull = Effect.scoped(Channel.toPull(Channel.fromArray([1, 2, 3])))
```

### `toPullScoped`

Converts the channel to a `Pull` within an existing scope you control.

```ts
import { Channel, Effect, Scope } from "effect"

Effect.gen(function*() {
  const scope = yield* Scope.make()
  const pull = yield* Channel.toPullScoped(Channel.fromArray([1, 2, 3]), scope)
  return pull
})
```

### `runIntoQueue`

Runs the channel, offering each element into a queue; ends or fails the queue accordingly.

```ts
import { Channel, Effect, Queue } from "effect"

Effect.gen(function*() {
  const queue = yield* Queue.unbounded<number>()
  yield* Channel.runIntoQueue(Channel.fromArray([1, 2, 3]), queue)
})
```

### `runIntoQueueArray`

Like `runIntoQueue`, for a channel emitting non-empty arrays (each element is offered individually).

```ts
import { Channel, Effect, Queue } from "effect"

Effect.gen(function*() {
  const queue = yield* Queue.unbounded<number>()
  yield* Channel.runIntoQueueArray(Channel.fromArray([[1, 2], [3]]), queue)
})
```

### `toQueue`

Creates a scoped queue and forks the channel to feed it for concurrent consumption.

```ts
import { Channel, Effect } from "effect"

Effect.scoped(
  Channel.toQueue(Channel.fromArray([1, 2, 3]), { capacity: 16 })
)
```

### `toQueueArray`

Like `toQueue`, for an array-emitting channel (offers each element individually).

```ts
import { Channel, Effect } from "effect"

Effect.scoped(
  Channel.toQueueArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)
```

### `runIntoPubSub`

Runs the channel, publishing each element to a `PubSub`. `options.shutdownOnEnd` shuts it down when
done.

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<number>()
  yield* Channel.runIntoPubSub(Channel.fromArray([1, 2, 3]), pubsub)
})
```

### `runIntoPubSubArray`

Like `runIntoPubSub`, for an array-emitting channel (publishes each element individually).

```ts
import { Channel, Effect, PubSub } from "effect"

Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<number>()
  yield* Channel.runIntoPubSubArray(Channel.fromArray([[1, 2], [3]]), pubsub)
})
```

### `toPubSub`

Creates a scoped `PubSub` and forks the channel to feed it. `shutdownOnEnd` defaults to `true`.

```ts
import { Channel, Effect } from "effect"

Effect.scoped(
  Channel.toPubSub(Channel.fromArray([1, 2, 3]), { capacity: 16 })
)
```

### `toPubSubArray`

Like `toPubSub`, for an array-emitting channel (publishes each element individually).

```ts
import { Channel, Effect } from "effect"

Effect.scoped(
  Channel.toPubSubArray(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)
```

### `toPubSubTake`

Creates a scoped `PubSub` of `Take` values; the channel's final `Exit` is published so subscribers
can observe completion or failure.

```ts
import { Channel, Effect } from "effect"

Effect.scoped(
  Channel.toPubSubTake(Channel.fromArray([[1, 2], [3]]), { capacity: 16 })
)
```

## See also

- [Creating Streams](https://effect.plants.sh/streaming/creating-streams/) — the high-level API most code should use.
- [Sink](https://effect.plants.sh/streaming/sink/) — consuming streams; sinks are channels too.
- [Transforming Streams](https://effect.plants.sh/streaming/transforming-streams/) — stream operators built on channels.