# Pull & Take

`Pull` and `Take` are the two low-level primitives that sit underneath
[`Stream`](https://effect.plants.sh/streaming/creating-streams/), [`Sink`](https://effect.plants.sh/streaming/sink/), and
`Channel`. You rarely reach for them directly, but understanding them demystifies
how streams actually move data, and they become useful when you manually drive a
stream or implement a custom producer.
**Low-level / advanced:** Most streaming work should use `Stream`, `Sink`, and `Channel` directly. Reach
for `Pull` / `Take` only when you are bridging to a manual pull loop or building
a custom stream source. A `Pull` is "an `Effect` with a completion signal hidden
in its error channel" — generic `Effect` error handling will catch that signal
unless you filter it deliberately.

## Mental model

**A `Pull<A, E, Done, R>` is one pull step.** It is just an `Effect` whose error
channel is widened with a special completion signal:

```ts
import type { Cause } from "effect"
import type { Effect } from "effect"

interface Pull<out A, out E = never, out Done = void, out R = never>
  extends Effect<A, E | Cause.Done<Done>, R> {}
```

Evaluating a `Pull` does one of three things:

- **emit** — succeed with a value of type `A` (for streams, a non-empty batch of elements),
- **fail** — fail with an ordinary error `E`,
- **complete** — fail with `Cause.Done<Done>`, the end-of-input signal, optionally carrying a `Done` value (leftover state or a final result).

Completion lives in the *error* channel as a `Cause.Done`. That is what lets a
consumer loop `yield* pull` repeatedly and distinguish "no more input" from a
real failure. The `Pull` module exists to separate that `Done` signal back out
from ordinary failures.

**A `Take<A, E, Done>` is one stored pull result.** Where `Pull` is the *effect*,
`Take` is the plain *value* you can park in a queue or `PubSub` and replay later:

```ts
import type { Array, Exit } from "effect"

type Take<A, E = never, Done = void> =
  | Array.NonEmptyReadonlyArray<A> // a non-empty batch of emitted values
  | Exit.Exit<Done, E> // a completion (success Exit) or failure (failure Exit)
```

So a `Take` is either a non-empty batch of elements, a successful `Exit` carrying
the `Done` value (normal completion — no elements), or a failed `Exit` (an
ordinary failure). [`Take.toPull`](#takepull) interprets a stored `Take` back into
a `Pull` step.

## Where these surface

You encounter `Pull` and `Take` through a handful of `Stream` / `Channel` APIs:

- `Stream.toPull(stream)` — get a scoped `Pull` to manually consume a stream's batches.
- `Stream.fromPull(effect)` — build a `Stream` from an `Effect` that produces a `Pull`.
- `Stream.flattenTake(stream)` — turn a `Stream<Take<A, E>>` into a `Stream<A>`.
- `Stream.fromPubSubTake` / `Stream.toPubSubTake` — move `Take` values across a `PubSub`.
- `Channel.fromPull` / `Channel.toPull` — the `Channel`-level pull bridge.

## Manually pulling a stream

`Stream.toPull` returns a *scoped* `Effect` that yields a `Pull`. Each time you
evaluate the pull you get the next non-empty batch; when the stream ends, the
pull fails with `Cause.Done`. Use the `Pull` helpers to interpret that signal:

```ts
import { Cause, Console, Effect, Pull, Stream } from "effect"

const stream = Stream.make(1, 2, 3)

const program = Effect.scoped(
  Effect.gen(function* () {
    // A scoped pull over the stream's batches.
    const pull = yield* Stream.toPull(stream)

    // Drive it in a loop until it completes.
    while (true) {
      // Recover the Done signal into a Result-like branch.
      const step = yield* pull.pipe(
        Effect.map((batch) => ({ done: false as const, batch })),
        // catchDone turns the Cause.Done completion back into a success.
        Pull.catchDone(() => Effect.succeed({ done: true as const }))
      )
      if (step.done) break
      yield* Console.log("batch:", step.batch)
      // => batch: [ 1, 2, 3 ]
    }
  })
)

Effect.runPromise(program)
```

`Stream.toPull` always produces **batches** (`NonEmptyReadonlyArray`), never
single elements — that is how streams stay efficient.

## Interpreting Take values

A `Take` is the stored form, so you build one from an `Exit` or an array and turn
it back into a pull with `Take.toPull`. This is what `Stream.flattenTake` does
under the hood:

```ts
import { Effect, Exit, Stream, Take } from "effect"

// A stream of *stored* pull results.
const takes: Stream.Stream<Take.Take<number, string>> = Stream.make(
  [1, 2] as const, // a batch of elements
  [3] as const, // another batch
  Exit.succeed(undefined) // a success Exit -> completion (carries Done = void)
)

const program = Stream.flattenTake(takes).pipe(Stream.runCollect)

Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3 ]
```

A failure `Take` is just a failed `Exit`, and it propagates as an ordinary error
when interpreted:

```ts
import { Exit, Take } from "effect"

const ok: Take.Take<number, string> = [1, 2, 3] // batch -> emits
const done: Take.Take<number, string> = Exit.succeed(undefined) // completion
const failed: Take.Take<number, string> = Exit.fail("boom") // failure
```

---

# Pull reference

Import as ``. The module's job is to tell `Cause.Done`
completion apart from ordinary failures, and to convert between the two.

### Pull

The pull-step type itself: an `Effect<A, E | Cause.Done<Done>, R>`. Construct one
with any `Effect` plus `Cause.done(...)` for the completion signal.

```ts
import { Cause, Effect, Pull } from "effect"

// A pull that emits 42, then completes carrying a leftover string.
let emitted = false
const pull: Pull.Pull<number, never, string> = Effect.suspend(() =>
  emitted ? Cause.done("leftover") : ((emitted = true), Effect.succeed(42))
)
```

### catchDone

Recovers from the `Cause.Done` completion signal while leaving ordinary failures
in the error channel. The handler receives the done leftover value.

```ts
import { Cause, Effect, Pull } from "effect"

const program = Cause.done("finished").pipe(
  Pull.catchDone((leftover) => Effect.succeed(`done with: ${leftover}`))
)

Effect.runPromise(program).then(console.log)
// => done with: finished
```

### matchEffect

Handles all three `Pull` outcomes — success, ordinary failure, and done — with
effectful handlers.

```ts
import { Cause, Effect, Pull } from "effect"

const pull = Cause.done("stream ended")

const program = Pull.matchEffect(pull, {
  onSuccess: (value) => Effect.succeed(`got value: ${value}`),
  onFailure: (cause) => Effect.succeed(`got error: ${cause}`),
  onDone: (leftover) => Effect.succeed(`halted with: ${leftover}`)
})

Effect.runPromise(program).then(console.log)
// => halted with: stream ended
```

### isDoneCause

Returns `true` when a whole `Cause` contains any `Cause.Done` completion reason.
Use it for a boolean branch when you do not need the done payload.

```ts
import { Cause, Pull } from "effect"

Pull.isDoneCause(Cause.fail(Cause.Done("x"))) // => true
Pull.isDoneCause(Cause.fail("boom")) // => false
```

### isDoneFailure

A predicate over a single `Cause.Reason`: `true` when the reason is a `Fail`
whose error is a `Cause.Done`. Use it when traversing `cause.reasons` directly.

```ts
import { Cause, Pull } from "effect"

const cause = Cause.fail(Cause.Done("x"))
cause.reasons.some(Pull.isDoneFailure) // => true
```

### filterDone

Finds a `Cause.Done` in a `Cause`, preserving its typed value. Returns a
successful `Result` with the `Cause.Done` when present, otherwise a failed
`Result` carrying the non-done cause.

```ts
import { Cause, Pull, Result } from "effect"

const r = Pull.filterDone(Cause.fail(Cause.Done("leftover")))
Result.isSuccess(r) // => true
// r.success is the Cause.Done<"leftover"> value

const r2 = Pull.filterDone(Cause.fail("boom"))
Result.isFailure(r2) // => true  (r2.failure is the non-done Cause)
```

### filterDoneVoid

Like `filterDone` but for cases where the done payload does not matter — returns a
plain `Cause.Done` marker on success.

```ts
import { Cause, Pull, Result } from "effect"

const r = Pull.filterDoneVoid(Cause.fail(Cause.Done()))
Result.isSuccess(r) // => true
```

### filterDoneLeftover

Extracts *only* the leftover value carried by a `Cause.Done` completion (rather
than the `Done` wrapper). Powers `catchDone` internally.

```ts
import { Cause, Pull, Result } from "effect"

const r = Pull.filterDoneLeftover(Cause.fail(Cause.Done("leftover")))
Result.isSuccess(r) // => true
// r.success === "leftover"
```

### filterNoDone

The inverse of `filterDone`: keeps a `Cause` only when it has *no* `Cause.Done`
failures. Use it to isolate ordinary failures for handling.

```ts
import { Cause, Pull, Result } from "effect"

const r = Pull.filterNoDone(Cause.fail("boom"))
Result.isSuccess(r) // => true  (no done present)

const r2 = Pull.filterNoDone(Cause.fail(Cause.Done()))
Result.isFailure(r2) // => true  (a done is present)
```

### doneExitFromCause

Converts a `Cause` into an `Exit`, treating `Cause.Done` as success (its leftover
becomes the success value) and any remaining cause as failure. Useful for
finalizing a pull loop into an `Exit`.

```ts
import { Cause, Exit, Pull } from "effect"

const ok = Pull.doneExitFromCause(Cause.fail(Cause.Done("result")))
Exit.isSuccess(ok) // => true  (value === "result")

const err = Pull.doneExitFromCause(Cause.fail("boom"))
Exit.isFailure(err) // => true
```

## Pull type extractors

These are *type-level* helpers for writing generic pull signatures. They take a
`Pull` type and project out one of its parameters.

### Pull.Success

Extracts the success (emitted) type `A` from a `Pull`.

```ts
import type { Pull } from "effect"

type P = Pull.Pull<number, "err", string>
type A = Pull.Success<P> // => number
```

### Pull.Error

Extracts the ordinary error type `E`, *excluding* the `Cause.Done` completion.

```ts
import type { Pull } from "effect"

type P = Pull.Pull<number, "err", string>
type E = Pull.Error<P> // => "err"
```

### Pull.Leftover

Extracts the completion leftover type `Done`.

```ts
import type { Pull } from "effect"

type P = Pull.Pull<number, "err", string>
type L = Pull.Leftover<P> // => string
```

### Pull.Services

Extracts the required services (context) type `R`.

```ts
import type { Pull } from "effect"

type P = Pull.Pull<number, "err", string, "MyService">
type R = Pull.Services<P> // => "MyService"
```

### Pull.ExcludeDone

A utility type that removes `Cause.Done` members from an error union — the error
type that remains once completion has been filtered out.

```ts
import type { Cause, Pull } from "effect"

type Raw = "err" | Cause.Done<string>
type E = Pull.ExcludeDone<Raw> // => "err"
```

---

# Take reference

Import as ``. The module is intentionally tiny: a
`Take` is a plain union value, so you construct one directly and convert it with
`toPull`.

### Take

The stored pull-result type: `NonEmptyReadonlyArray<A> | Exit<Done, E>`. A batch
emits elements, a success `Exit` signals completion (no elements), and a failure
`Exit` signals an ordinary failure.

```ts
import { Exit, Take } from "effect"

const batch: Take.Take<number, string> = [1, 2, 3] // emits 1, 2, 3
const done: Take.Take<number, string> = Exit.succeed(undefined) // completion (Done = void)
const failed: Take.Take<number, string> = Exit.fail("boom") // failure
```

### toPull

Interprets a stored `Take` as a `Pull` step: batches succeed with their elements,
failure exits fail, and success exits become `Cause.Done` completion.

```ts
import { Effect, Exit, Pull, Take } from "effect"

// A batch -> a pull that succeeds with the batch.
const p1: Pull.Pull<readonly number[], never, void> = Take.toPull([1, 2, 3])
Effect.runPromise(p1).then(console.log)
// => [ 1, 2, 3 ]

// A success Exit -> a pull that completes (fails with Cause.Done).
const p2 = Take.toPull(Exit.succeed("final"))
Effect.runPromise(
  p2.pipe(Pull.catchDone((leftover) => Effect.succeed(`done: ${leftover}`)))
).then(console.log)
// => done: final
```

## See also

- [Creating Streams](https://effect.plants.sh/streaming/creating-streams/) — the high-level producers built on these primitives.
- [Consuming Streams](https://effect.plants.sh/streaming/consuming-streams/) — `runCollect`, `runForEach`, and friends.
- [Sinks](https://effect.plants.sh/streaming/sink/) — the consumer side of streaming.