Pull & Take
Pull and Take are the two low-level primitives that sit underneath
Stream, Sink, and
Channel. You rarely reach for them directly, but understanding them demystifies
how streams actually move data, and they become useful when you manually drive a
stream or implement a custom producer.
Mental model
Section titled “Mental model”A Pull<A, E, Done, R> is one pull step. It is just an Effect whose error
channel is widened with a special completion signal:
import type { Cause } from "effect"import type { Effect } from "effect"
interface Pull<out A, out E = never, out Done = void, out R = never> extends Effect<A, E | Cause.Done<Done>, R> {}Evaluating a Pull does one of three things:
- emit — succeed with a value of type
A(for streams, a non-empty batch of elements), - fail — fail with an ordinary error
E, - complete — fail with
Cause.Done<Done>, the end-of-input signal, optionally carrying aDonevalue (leftover state or a final result).
Completion lives in the error channel as a Cause.Done. That is what lets a
consumer loop yield* pull repeatedly and distinguish “no more input” from a
real failure. The Pull module exists to separate that Done signal back out
from ordinary failures.
A Take<A, E, Done> is one stored pull result. Where Pull is the effect,
Take is the plain value you can park in a queue or PubSub and replay later:
import type { Array, Exit } from "effect"
type Take<A, E = never, Done = void> = | Array.NonEmptyReadonlyArray<A> // a non-empty batch of emitted values | Exit.Exit<Done, E> // a completion (success Exit) or failure (failure Exit)So a Take is either a non-empty batch of elements, a successful Exit carrying
the Done value (normal completion — no elements), or a failed Exit (an
ordinary failure). Take.toPull interprets a stored Take back into
a Pull step.
Where these surface
Section titled “Where these surface”You encounter Pull and Take through a handful of Stream / Channel APIs:
Stream.toPull(stream)— get a scopedPullto manually consume a stream’s batches.Stream.fromPull(effect)— build aStreamfrom anEffectthat produces aPull.Stream.flattenTake(stream)— turn aStream<Take<A, E>>into aStream<A>.Stream.fromPubSubTake/Stream.toPubSubTake— moveTakevalues across aPubSub.Channel.fromPull/Channel.toPull— theChannel-level pull bridge.
Manually pulling a stream
Section titled “Manually pulling a stream”Stream.toPull returns a scoped Effect that yields a Pull. Each time you
evaluate the pull you get the next non-empty batch; when the stream ends, the
pull fails with Cause.Done. Use the Pull helpers to interpret that signal:
import { Cause, Console, Effect, Pull, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
const program = Effect.scoped( Effect.gen(function* () { // A scoped pull over the stream's batches. const pull = yield* Stream.toPull(stream)
// Drive it in a loop until it completes. while (true) { // Recover the Done signal into a Result-like branch. const step = yield* pull.pipe( Effect.map((batch) => ({ done: false as const, batch })), // catchDone turns the Cause.Done completion back into a success. Pull.catchDone(() => Effect.succeed({ done: true as const })) ) if (step.done) break yield* Console.log("batch:", step.batch) // => batch: [ 1, 2, 3 ] } }))
Effect.runPromise(program)Stream.toPull always produces batches (NonEmptyReadonlyArray), never
single elements — that is how streams stay efficient.
Interpreting Take values
Section titled “Interpreting Take values”A Take is the stored form, so you build one from an Exit or an array and turn
it back into a pull with Take.toPull. This is what Stream.flattenTake does
under the hood:
import { Effect, Exit, Stream, Take } from "effect"
// A stream of *stored* pull results.const takes: Stream.Stream<Take.Take<number, string>> = Stream.make( [1, 2] as const, // a batch of elements [3] as const, // another batch Exit.succeed(undefined) // a success Exit -> completion (carries Done = void))
const program = Stream.flattenTake(takes).pipe(Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 1, 2, 3 ]A failure Take is just a failed Exit, and it propagates as an ordinary error
when interpreted:
import { Exit, Take } from "effect"
const ok: Take.Take<number, string> = [1, 2, 3] // batch -> emitsconst done: Take.Take<number, string> = Exit.succeed(undefined) // completionconst failed: Take.Take<number, string> = Exit.fail("boom") // failurePull reference
Section titled “Pull reference”Import as import { Pull } from "effect". The module’s job is to tell Cause.Done
completion apart from ordinary failures, and to convert between the two.
The pull-step type itself: an Effect<A, E | Cause.Done<Done>, R>. Construct one
with any Effect plus Cause.done(...) for the completion signal.
import { Cause, Effect, Pull } from "effect"
// A pull that emits 42, then completes carrying a leftover string.let emitted = falseconst pull: Pull.Pull<number, never, string> = Effect.suspend(() => emitted ? Cause.done("leftover") : ((emitted = true), Effect.succeed(42)))catchDone
Section titled “catchDone”Recovers from the Cause.Done completion signal while leaving ordinary failures
in the error channel. The handler receives the done leftover value.
import { Cause, Effect, Pull } from "effect"
const program = Cause.done("finished").pipe( Pull.catchDone((leftover) => Effect.succeed(`done with: ${leftover}`)))
Effect.runPromise(program).then(console.log)// => done with: finishedmatchEffect
Section titled “matchEffect”Handles all three Pull outcomes — success, ordinary failure, and done — with
effectful handlers.
import { Cause, Effect, Pull } from "effect"
const pull = Cause.done("stream ended")
const program = Pull.matchEffect(pull, { onSuccess: (value) => Effect.succeed(`got value: ${value}`), onFailure: (cause) => Effect.succeed(`got error: ${cause}`), onDone: (leftover) => Effect.succeed(`halted with: ${leftover}`)})
Effect.runPromise(program).then(console.log)// => halted with: stream endedisDoneCause
Section titled “isDoneCause”Returns true when a whole Cause contains any Cause.Done completion reason.
Use it for a boolean branch when you do not need the done payload.
import { Cause, Pull } from "effect"
Pull.isDoneCause(Cause.fail(Cause.Done("x"))) // => truePull.isDoneCause(Cause.fail("boom")) // => falseisDoneFailure
Section titled “isDoneFailure”A predicate over a single Cause.Reason: true when the reason is a Fail
whose error is a Cause.Done. Use it when traversing cause.reasons directly.
import { Cause, Pull } from "effect"
const cause = Cause.fail(Cause.Done("x"))cause.reasons.some(Pull.isDoneFailure) // => truefilterDone
Section titled “filterDone”Finds a Cause.Done in a Cause, preserving its typed value. Returns a
successful Result with the Cause.Done when present, otherwise a failed
Result carrying the non-done cause.
import { Cause, Pull, Result } from "effect"
const r = Pull.filterDone(Cause.fail(Cause.Done("leftover")))Result.isSuccess(r) // => true// r.success is the Cause.Done<"leftover"> value
const r2 = Pull.filterDone(Cause.fail("boom"))Result.isFailure(r2) // => true (r2.failure is the non-done Cause)filterDoneVoid
Section titled “filterDoneVoid”Like filterDone but for cases where the done payload does not matter — returns a
plain Cause.Done marker on success.
import { Cause, Pull, Result } from "effect"
const r = Pull.filterDoneVoid(Cause.fail(Cause.Done()))Result.isSuccess(r) // => truefilterDoneLeftover
Section titled “filterDoneLeftover”Extracts only the leftover value carried by a Cause.Done completion (rather
than the Done wrapper). Powers catchDone internally.
import { Cause, Pull, Result } from "effect"
const r = Pull.filterDoneLeftover(Cause.fail(Cause.Done("leftover")))Result.isSuccess(r) // => true// r.success === "leftover"filterNoDone
Section titled “filterNoDone”The inverse of filterDone: keeps a Cause only when it has no Cause.Done
failures. Use it to isolate ordinary failures for handling.
import { Cause, Pull, Result } from "effect"
const r = Pull.filterNoDone(Cause.fail("boom"))Result.isSuccess(r) // => true (no done present)
const r2 = Pull.filterNoDone(Cause.fail(Cause.Done()))Result.isFailure(r2) // => true (a done is present)doneExitFromCause
Section titled “doneExitFromCause”Converts a Cause into an Exit, treating Cause.Done as success (its leftover
becomes the success value) and any remaining cause as failure. Useful for
finalizing a pull loop into an Exit.
import { Cause, Exit, Pull } from "effect"
const ok = Pull.doneExitFromCause(Cause.fail(Cause.Done("result")))Exit.isSuccess(ok) // => true (value === "result")
const err = Pull.doneExitFromCause(Cause.fail("boom"))Exit.isFailure(err) // => truePull type extractors
Section titled “Pull type extractors”These are type-level helpers for writing generic pull signatures. They take a
Pull type and project out one of its parameters.
Pull.Success
Section titled “Pull.Success”Extracts the success (emitted) type A from a Pull.
import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>type A = Pull.Success<P> // => numberPull.Error
Section titled “Pull.Error”Extracts the ordinary error type E, excluding the Cause.Done completion.
import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>type E = Pull.Error<P> // => "err"Pull.Leftover
Section titled “Pull.Leftover”Extracts the completion leftover type Done.
import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string>type L = Pull.Leftover<P> // => stringPull.Services
Section titled “Pull.Services”Extracts the required services (context) type R.
import type { Pull } from "effect"
type P = Pull.Pull<number, "err", string, "MyService">type R = Pull.Services<P> // => "MyService"Pull.ExcludeDone
Section titled “Pull.ExcludeDone”A utility type that removes Cause.Done members from an error union — the error
type that remains once completion has been filtered out.
import type { Cause, Pull } from "effect"
type Raw = "err" | Cause.Done<string>type E = Pull.ExcludeDone<Raw> // => "err"Take reference
Section titled “Take reference”Import as import { Take } from "effect". The module is intentionally tiny: a
Take is a plain union value, so you construct one directly and convert it with
toPull.
The stored pull-result type: NonEmptyReadonlyArray<A> | Exit<Done, E>. A batch
emits elements, a success Exit signals completion (no elements), and a failure
Exit signals an ordinary failure.
import { Exit, Take } from "effect"
const batch: Take.Take<number, string> = [1, 2, 3] // emits 1, 2, 3const done: Take.Take<number, string> = Exit.succeed(undefined) // completion (Done = void)const failed: Take.Take<number, string> = Exit.fail("boom") // failuretoPull
Section titled “toPull”Interprets a stored Take as a Pull step: batches succeed with their elements,
failure exits fail, and success exits become Cause.Done completion.
import { Effect, Exit, Pull, Take } from "effect"
// A batch -> a pull that succeeds with the batch.const p1: Pull.Pull<readonly number[], never, void> = Take.toPull([1, 2, 3])Effect.runPromise(p1).then(console.log)// => [ 1, 2, 3 ]
// A success Exit -> a pull that completes (fails with Cause.Done).const p2 = Take.toPull(Exit.succeed("final"))Effect.runPromise( p2.pipe(Pull.catchDone((leftover) => Effect.succeed(`done: ${leftover}`)))).then(console.log)// => done: finalSee also
Section titled “See also”- Creating Streams — the high-level producers built on these primitives.
- Consuming Streams —
runCollect,runForEach, and friends. - Sinks — the consumer side of streaming.