# Grouping, Chunking & Buffering

Streams rarely flow at a convenient granularity. Events arrive one at a time but
you want to write them to a database in batches; a websocket fires faster than
your renderer can keep up; a noisy sensor needs debouncing. This page covers all
of the operators that reshape a stream over **size**, **time**, **key**, or
**buffering** before you consume it.

## Common case: batching events for a bulk write

The single most useful operator here is [`Stream.groupedWithin`](#groupedwithin):
it emits a batch as soon as **either** the chunk fills up **or** a time window
elapses, whichever comes first. That is exactly the policy you want for a bulk
insert: never wait too long, never write rows one at a time.

```ts
import { Context, Effect, Stream } from "effect"

// A service that persists a batch of events in a single round-trip.
class Database extends Context.Service<Database>()("app/Database", {
  make: Effect.succeed({
    insertMany: (rows: ReadonlyArray<{ readonly id: number }>) =>
      Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`)
  })
}) {}

const ingest = (events: Stream.Stream<{ readonly id: number }>) =>
  events.pipe(
    // Flush every 100 events, or every 2 seconds, whichever comes first.
    Stream.groupedWithin(100, "2 seconds"),
    Stream.runForEach((batch) =>
      Effect.flatMap(Database, (db) => db.insertMany(batch))
    )
  )

const program = ingest(Stream.range(1, 250)).pipe(
  Effect.provideService(Database, {
    insertMany: (rows) =>
      Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`)
  })
)

Effect.runPromise(program)
// One INSERT for the first 100 rows, another for the next 100,
// then a final flush of the remaining 50 when the source ends.
```

The result of `groupedWithin(n, duration)` is a `Stream<Array<A>>` — each element
is one batch. `runForEach` then sees whole batches, so a single `insertMany` call
handles up to `n` rows at once.

:::note
`groupedWithin` is built on [`aggregateWithin`](#aggregatewithin) with
`Sink.take(n)` and `Schedule.spaced(duration)`. If you need a smarter batching
policy (cost-based sizes, custom flush schedules) reach for `aggregateWithin`
directly — see the [Sink reference](https://effect.plants.sh/streaming/sink/) and
[Schedule](https://effect.plants.sh/scheduling/schedule/).
:::

## Choosing an operator

| You want to…                                       | Use                                          |
| -------------------------------------------------- | -------------------------------------------- |
| Batch by count only                                | [`grouped`](#grouped)                        |
| Batch by count **or** time                         | [`groupedWithin`](#groupedwithin)            |
| Collapse consecutive equal-keyed runs              | [`groupAdjacentBy`](#groupadjacentby)        |
| Split into one substream **per key**               | [`groupBy`](#groupby) / [`groupByKey`](#groupbykey) |
| Reshape the internal chunk boundaries              | [`rechunk`](#rechunk), [`chunks`](#chunks)   |
| Emit overlapping windows                           | [`sliding`](#sliding), [`slidingSize`](#slidingsize) |
| Fold with a [`Sink`](https://effect.plants.sh/streaming/sink/)             | [`transduce`](#transduce), [`aggregate`](#aggregate), [`aggregateWithin`](#aggregatewithin) |
| Emit only the latest after a quiet period          | [`debounce`](#debounce)                      |
| Rate-limit with a token bucket                     | [`throttle`](#throttle), [`throttleEffect`](#throttleeffect) |
| Decouple a fast producer from a slow consumer      | [`buffer`](#buffer), [`bufferArray`](#bufferarray) |

---

## Fixed grouping

### grouped

Partitions the stream into non-empty arrays of a fixed size, preserving order.
The final array may be smaller if there are not enough elements left to fill it.

```ts
import { Effect, Stream } from "effect"

const program = Stream.range(1, 8).pipe(
  Stream.grouped(3),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8 ] ]
```

### groupedWithin

Emits an array when the chunk size is reached **or** the duration passes,
whichever happens first. The timer resets after each emitted batch.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3).pipe(
  // 3 never fills a chunk of 2, so it is flushed when the window elapses.
  Stream.groupedWithin(2, "5 seconds"),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3 ] ]
```

### groupAdjacentBy

Groups **consecutive** elements whose keys are equal (by `Equal.equals`) into
`[key, group]` pairs. Use it when the stream is already ordered by the key; later
non-adjacent runs with the same key are emitted as separate groups.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make("a", "a", "b", "b", "a").pipe(
  Stream.groupAdjacentBy((s) => s),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ "a", [ "a", "a" ] ], [ "b", [ "b", "b" ] ], [ "a", [ "a" ] ] ]
//    note: the trailing "a" is a NEW group, not merged with the first run
```

---

## Keyed grouping

`groupBy` and `groupByKey` fan a stream out into **one substream per key**. The
outer stream emits `[key, Stream<value>]` pairs; you then process each inner
stream, typically with `mapEffect` and `concurrency: "unbounded"` so groups run
in parallel. Each inner stream is backed by a bounded queue (default capacity
4096); tune it with `bufferSize`, and use `idleTimeToLive` to close groups that
have gone quiet.

### groupBy

Classifies each element with an **effectful** function returning `[key, value]`,
emitting one substream per distinct key.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.groupBy((n) =>
    Effect.succeed([n % 2 === 0 ? "even" : "odd", n] as const)
  ),
  Stream.mapEffect(
    Effect.fnUntraced(function* ([key, stream]) {
      return [key, yield* Stream.runCollect(stream)] as const
    }),
    { concurrency: "unbounded" }
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]
```

### groupByKey

Like `groupBy` but takes a **pure** key function and keeps the original element
as the value. Prefer this when the key can be computed synchronously.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.groupByKey((n) => (n % 2 === 0 ? "even" : "odd")),
  Stream.mapEffect(
    ([key, stream]) =>
      Stream.runCollect(stream).pipe(
        Effect.map((values) => [key, values] as const)
      ),
    { concurrency: "unbounded" }
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]
```

:::caution
You must consume **every** emitted substream (e.g. with `mapEffect` /
`concurrency: "unbounded"`), otherwise its bounded queue fills and the whole
pipeline stalls. If you only care about some keys, run a draining sink such as
`Stream.runDrain` on the ones you ignore.
:::

---

## Chunk control

A `Stream` carries elements in internal arrays ("chunks"). Most operators are
chunk-transparent, but these let you observe and reshape the chunk boundaries —
useful for performance tuning and for adapting to APIs that want fixed-size
batches.

### chunks

Exposes the stream's internal arrays as the stream's elements: turns
`Stream<A>` into `Stream<NonEmptyReadonlyArray<A>>`. Combine with `rechunk` to
control the resulting array sizes.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.rechunk(2),
  Stream.chunks,
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3, 4 ], [ 5 ] ]
```

### rechunk

Re-batches the stream into internal chunks of the given size (clamped to at
least 1) **without** changing the element type. The downstream still sees a flat
`Stream<A>`; only the internal chunking changes. Pair with `chunks` to make the
batches visible.

```ts
import { Effect, Stream } from "effect"

const program = Stream.range(1, 5).pipe(
  Stream.rechunk(2),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3, 4, 5 ]   (flat — only internal chunk sizes changed)
```

### sliding

Emits overlapping windows of `n` consecutive elements, advancing by one element
each time.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.sliding(2),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ] ]
```

### slidingSize

Like `sliding`, but lets you set the step (advance) separately from the window
size. `slidingSize(chunkSize, stepSize)`.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.slidingSize(3, 2),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2, 3 ], [ 3, 4, 5 ] ]
```

---

## Sink-driven aggregation

For anything more sophisticated than fixed counts, fold the stream with a
[`Sink`](https://effect.plants.sh/streaming/sink/). A sink consumes some prefix of the stream, produces a
result, and hands back any leftover input; these operators run the sink
repeatedly and emit each result.

### transduce

Repeatedly applies a sink as a **transducer**, emitting each sink result. The
sink runs synchronously inline with the stream (no separate fiber), so it cannot
flush on its own — it emits when the sink signals completion.

```ts
import { Effect, Sink, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4).pipe(
  Stream.transduce(Sink.take(2)),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3, 4 ] ]
```

### aggregate

Like `transduce`, but runs the upstream and the sink in **separate fibers**, so
the sink can keep consuming input while downstream is still processing the
previous result. Best when downstream work is the bottleneck.

```ts
import { Effect, Sink, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5, 6).pipe(
  // Sum until 3 inputs are seen, then emit and restart.
  Stream.aggregate(
    Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n))
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 6, 15 ]   (1+2+3, then 4+5+6)
```

### aggregateWithin

`aggregate` plus a [`Schedule`](https://effect.plants.sh/scheduling/schedule/): the schedule can flush
the current aggregation even if the sink has not yet completed. This is the
time-aware building block behind `groupedWithin`. The schedule's input is the
`Option<B>` of the most recent output.

```ts
import { Effect, Schedule, Sink, Stream } from "effect"

const program = Stream.make(1, 2, 3, 4, 5, 6).pipe(
  Stream.aggregateWithin(
    Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n)),
    // Force a flush at most once per minute (here the sink completes first).
    Schedule.spaced("1 minute")
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 6, 15 ]
```

---

## Time-based operators

### debounce

Within each quiet window, drops earlier elements and emits only the **latest**
one once the source pauses for `duration`. Ideal for search-as-you-type, resize
events, and other bursty signals where only the final value matters.

```ts
import { Duration, Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.concat(
    Stream.fromEffect(Effect.sleep(Duration.millis(50)).pipe(Effect.as(4)))
  ),
  Stream.concat(Stream.make(5)),
  Stream.debounce(Duration.millis(30))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 3, 5 ]
//    1,2,3 arrive in a burst -> only 3 survives; then 4,5 arrive -> only 5
```

### throttle

Rate-limits the stream using a **token bucket** with a synchronous `cost`
function. The bucket holds up to `units + burst` tokens and refills `units`
tokens per `duration`. With `strategy: "shape"` (the default) chunks are
**delayed** to fit the budget; with `"enforce"` over-budget chunks are
**dropped**.

```ts
import { Effect, Schedule, Stream } from "effect"

const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
  Stream.take(6),
  Stream.throttle({
    cost: (chunk) => chunk.length, // one token per element
    units: 1,
    duration: "100 millis",
    strategy: "shape"
  })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 0, 1, 2, 3, 4, 5 ]   (emitted no faster than the budget allows)
```

### throttleEffect

The same token-bucket throttle, but the `cost` of a chunk is computed by an
**effect** (e.g. when sizing requires a lookup). Same `units` / `duration` /
`burst` / `strategy` options as `throttle`.

```ts
import { Effect, Schedule, Stream } from "effect"

const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
  Stream.take(6),
  Stream.throttleEffect({
    cost: (chunk) => Effect.succeed(chunk.length),
    units: 1,
    duration: "100 millis",
    strategy: "shape"
  })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 0, 1, 2, 3, 4, 5 ]
```

---

## Buffering

Buffering lets a **fast producer** run ahead of a **slow consumer** by holding
elements in a queue between them. The queue `strategy` controls what happens when
the buffer is full:

- `"suspend"` (default) — apply **backpressure**: the producer pauses until space
  frees up. Nothing is lost.
- `"dropping"` — discard **new** items when full.
- `"sliding"` — discard the **oldest** buffered items to make room.

`capacity: "unbounded"` never blocks and never drops (watch memory).

### buffer

Buffers up to `capacity` **elements**. This combinator destroys chunking, so
apply [`rechunk`](#rechunk) afterward if you need fixed chunk sizes downstream.

```ts
import { Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3).pipe(
  Stream.buffer({ capacity: 16, strategy: "suspend" }),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3 ]   (output unchanged; the producer can run ahead)
```

### bufferArray

Buffers up to `capacity` **chunks** (arrays) instead of individual elements. It
**preserves** chunking and performs best with power-of-2 capacities — prefer it
when downstream relies on chunk boundaries.

```ts
import { Effect, Stream } from "effect"

const program = Stream.fromArrays([1, 2], [3, 4]).pipe(
  Stream.bufferArray({ capacity: 2 }),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3, 4 ]   (chunk boundaries preserved internally)
```

---

## See also

- [Sinks](https://effect.plants.sh/streaming/sink/) — the `Sink` argument to `transduce`, `aggregate`,
  and `aggregateWithin` (including `Sink.take`, `Sink.foldUntil`, and friends).
- [Schedule](https://effect.plants.sh/scheduling/schedule/) — building flush policies for
  `aggregateWithin` (and the `Schedule.spaced` used by `groupedWithin`).
- [Consuming streams](https://effect.plants.sh/streaming/consuming-streams/) — `runCollect`,
  `runForEach`, `runDrain`, and other terminal operators used above.