Skip to content

Pull & Take

Pull and Take are the two low-level primitives that sit underneath Stream, Sink, and Channel. You rarely reach for them directly, but understanding them demystifies how streams actually move data, and they become useful when you manually drive a stream or implement a custom producer.

A Pull<A, E, Done, R> is one pull step. It is just an Effect whose error channel is widened with a special completion signal:

import type { Cause } from "effect"
import type { Effect } from "effect"
interface Pull<out A, out E = never, out Done = void, out R = never>
extends Effect<A, E | Cause.Done<Done>, R> {}

Evaluating a Pull does one of three things:

  • emit — succeed with a value of type A (for streams, a non-empty batch of elements),
  • fail — fail with an ordinary error E,
  • complete — fail with Cause.Done<Done>, the end-of-input signal, optionally carrying a Done value (leftover state or a final result).

Completion lives in the error channel as a Cause.Done. That is what lets a consumer loop yield* pull repeatedly and distinguish “no more input” from a real failure. The Pull module exists to separate that Done signal back out from ordinary failures.

A Take<A, E, Done> is one stored pull result. Where Pull is the effect, Take is the plain value you can park in a queue or PubSub and replay later:

import type { Array, Exit } from "effect"
type Take<A, E = never, Done = void> =
| Array.NonEmptyReadonlyArray<A> // a non-empty batch of emitted values
| Exit.Exit<Done, E> // a completion (success Exit) or failure (failure Exit)

So a Take is either a non-empty batch of elements, a successful Exit carrying the Done value (normal completion — no elements), or a failed Exit (an ordinary failure). Take.toPull interprets a stored Take back into a Pull step.

You encounter Pull and Take through a handful of Stream / Channel APIs:

  • Stream.toPull(stream) — get a scoped Pull to manually consume a stream’s batches.
  • Stream.fromPull(effect) — build a Stream from an Effect that produces a Pull.
  • Stream.flattenTake(stream) — turn a Stream<Take<A, E>> into a Stream<A>.
  • Stream.fromPubSubTake / Stream.toPubSubTake — move Take values across a PubSub.
  • Channel.fromPull / Channel.toPull — the Channel-level pull bridge.

Stream.toPull returns a scoped Effect that yields a Pull. Each time you evaluate the pull you get the next non-empty batch; when the stream ends, the pull fails with Cause.Done. Use the Pull helpers to interpret that signal:

import { Cause, Console, Effect, Pull, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
const program = Effect.scoped(
Effect.gen(function* () {
// A scoped pull over the stream's batches.
const pull = yield* Stream.toPull(stream)
// Drive it in a loop until it completes.
while (true) {
// Recover the Done signal into a Result-like branch.
const step = yield* pull.pipe(
Effect.map((batch) => ({ done: false as const, batch })),
// catchDone turns the Cause.Done completion back into a success.
Pull.catchDone(() => Effect.succeed({ done: true as const }))
)
if (step.done) break
yield* Console.log("batch:", step.batch)
// => batch: [ 1, 2, 3 ]
}
})
)
Effect.runPromise(program)

Stream.toPull always produces batches (NonEmptyReadonlyArray), never single elements — that is how streams stay efficient.

A Take is the stored form, so you build one from an Exit or an array and turn it back into a pull with Take.toPull. This is what Stream.flattenTake does under the hood:

import { Effect, Exit, Stream, Take } from "effect"
// A stream of *stored* pull results.
const takes: Stream.Stream<Take.Take<number, string>> = Stream.make(
[1, 2] as const, // a batch of elements
[3] as const, // another batch
Exit.succeed(undefined) // a success Exit -> completion (carries Done = void)
)
const program = Stream.flattenTake(takes).pipe(Stream.runCollect)
Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3 ]

A failure Take is just a failed Exit, and it propagates as an ordinary error when interpreted:

import { Exit, Take } from "effect"
const ok: Take.Take<number, string> = [1, 2, 3] // batch -> emits
const done: Take.Take<number, string> = Exit.succeed(undefined) // completion
const failed: Take.Take<number, string> = Exit.fail("boom") // failure

Import as import { Pull } from "effect". The module’s job is to tell Cause.Done completion apart from ordinary failures, and to convert between the two.

The pull-step type itself: an Effect<A, E | Cause.Done<Done>, R>. Construct one with any Effect plus Cause.done(...) for the completion signal.

import { Cause, Effect, Pull } from "effect"
// A pull that emits 42, then completes carrying a leftover string.
let emitted = false
const pull: Pull.Pull<number, never, string> = Effect.suspend(() =>
emitted ? Cause.done("leftover") : ((emitted = true), Effect.succeed(42))
)

Recovers from the Cause.Done completion signal while leaving ordinary failures in the error channel. The handler receives the done leftover value.

import { Cause, Effect, Pull } from "effect"
const program = Cause.done("finished").pipe(
Pull.catchDone((leftover) => Effect.succeed(`done with: ${leftover}`))
)
Effect.runPromise(program).then(console.log)
// => done with: finished

Handles all three Pull outcomes — success, ordinary failure, and done — with effectful handlers.

import { Cause, Effect, Pull } from "effect"
const pull = Cause.done("stream ended")
const program = Pull.matchEffect(pull, {
onSuccess: (value) => Effect.succeed(`got value: ${value}`),
onFailure: (cause) => Effect.succeed(`got error: ${cause}`),
onDone: (leftover) => Effect.succeed(`halted with: ${leftover}`)
})
Effect.runPromise(program).then(console.log)
// => halted with: stream ended

Returns true when a whole Cause contains any Cause.Done completion reason. Use it for a boolean branch when you do not need the done payload.

import { Cause, Pull } from "effect"
Pull.isDoneCause(Cause.fail(Cause.Done("x"))) // => true
Pull.isDoneCause(Cause.fail("boom")) // => false

A predicate over a single Cause.Reason: true when the reason is a Fail whose error is a Cause.Done. Use it when traversing cause.reasons directly.

import { Cause, Pull } from "effect"
const cause = Cause.fail(Cause.Done("x"))
cause.reasons.some(Pull.isDoneFailure) // => true

Finds a Cause.Done in a Cause, preserving its typed value. Returns a successful Result with the Cause.Done when present, otherwise a failed Result carrying the non-done cause.

import { Cause, Pull, Result } from "effect"
const r = Pull.filterDone(Cause.fail(Cause.Done("leftover")))
Result.isSuccess(r) // => true
// r.success is the Cause.Done<"leftover"> value
const r2 = Pull.filterDone(Cause.fail("boom"))
Result.isFailure(r2) // => true (r2.failure is the non-done Cause)

Like filterDone but for cases where the done payload does not matter — returns a plain Cause.Done marker on success.

import { Cause, Pull, Result } from "effect"
const r = Pull.filterDoneVoid(Cause.fail(Cause.Done()))
Result.isSuccess(r) // => true

Extracts only the leftover value carried by a Cause.Done completion (rather than the Done wrapper). Powers catchDone internally.

import { Cause, Pull, Result } from "effect"
const r = Pull.filterDoneLeftover(Cause.fail(Cause.Done("leftover")))
Result.isSuccess(r) // => true
// r.success === "leftover"

The inverse of filterDone: keeps a Cause only when it has no Cause.Done failures. Use it to isolate ordinary failures for handling.

import { Cause, Pull, Result } from "effect"
const r = Pull.filterNoDone(Cause.fail("boom"))
Result.isSuccess(r) // => true (no done present)
const r2 = Pull.filterNoDone(Cause.fail(Cause.Done()))
Result.isFailure(r2) // => true (a done is present)

Converts a Cause into an Exit, treating Cause.Done as success (its leftover becomes the success value) and any remaining cause as failure. Useful for finalizing a pull loop into an Exit.

import { Cause, Exit, Pull } from "effect"
const ok = Pull.doneExitFromCause(Cause.fail(Cause.Done("result")))
Exit.isSuccess(ok) // => true (value === "result")
const err = Pull.doneExitFromCause(Cause.fail("boom"))
Exit.isFailure(err) // => true

These are type-level helpers for writing generic pull signatures. They take a Pull type and project out one of its parameters.

Extracts the success (emitted) type A from a Pull.

import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>
type A = Pull.Success<P> // => number

Extracts the ordinary error type E, excluding the Cause.Done completion.

import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>
type E = Pull.Error<P> // => "err"

Extracts the completion leftover type Done.

import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>
type L = Pull.Leftover<P> // => string

Extracts the required services (context) type R.

import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string, "MyService">
type R = Pull.Services<P> // => "MyService"

A utility type that removes Cause.Done members from an error union — the error type that remains once completion has been filtered out.

import type { Cause, Pull } from "effect"
type Raw = "err" | Cause.Done<string>
type E = Pull.ExcludeDone<Raw> // => "err"

Import as import { Take } from "effect". The module is intentionally tiny: a Take is a plain union value, so you construct one directly and convert it with toPull.

The stored pull-result type: NonEmptyReadonlyArray<A> | Exit<Done, E>. A batch emits elements, a success Exit signals completion (no elements), and a failure Exit signals an ordinary failure.

import { Exit, Take } from "effect"
const batch: Take.Take<number, string> = [1, 2, 3] // emits 1, 2, 3
const done: Take.Take<number, string> = Exit.succeed(undefined) // completion (Done = void)
const failed: Take.Take<number, string> = Exit.fail("boom") // failure

Interprets a stored Take as a Pull step: batches succeed with their elements, failure exits fail, and success exits become Cause.Done completion.

import { Effect, Exit, Pull, Take } from "effect"
// A batch -> a pull that succeeds with the batch.
const p1: Pull.Pull<readonly number[], never, void> = Take.toPull([1, 2, 3])
Effect.runPromise(p1).then(console.log)
// => [ 1, 2, 3 ]
// A success Exit -> a pull that completes (fails with Cause.Done).
const p2 = Take.toPull(Exit.succeed("final"))
Effect.runPromise(
p2.pipe(Pull.catchDone((leftover) => Effect.succeed(`done: ${leftover}`)))
).then(console.log)
// => done: final