Skip to content

Grouping, Chunking & Buffering

Streams rarely flow at a convenient granularity. Events arrive one at a time but you want to write them to a database in batches; a websocket fires faster than your renderer can keep up; a noisy sensor needs debouncing. This page covers all of the operators that reshape a stream over size, time, key, or buffering before you consume it.

Common case: batching events for a bulk write

Section titled “Common case: batching events for a bulk write”

The single most useful operator here is Stream.groupedWithin: it emits a batch as soon as either the chunk fills up or a time window elapses, whichever comes first. That is exactly the policy you want for a bulk insert: never wait too long, never write rows one at a time.

import { Context, Effect, Stream } from "effect"
// A service that persists a batch of events in a single round-trip.
class Database extends Context.Service<Database>()("app/Database", {
make: Effect.succeed({
insertMany: (rows: ReadonlyArray<{ readonly id: number }>) =>
Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`)
})
}) {}
const ingest = (events: Stream.Stream<{ readonly id: number }>) =>
events.pipe(
// Flush every 100 events, or every 2 seconds, whichever comes first.
Stream.groupedWithin(100, "2 seconds"),
Stream.runForEach((batch) =>
Effect.flatMap(Database, (db) => db.insertMany(batch))
)
)
const program = ingest(Stream.range(1, 250)).pipe(
Effect.provideService(Database, {
insertMany: (rows) =>
Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`)
})
)
Effect.runPromise(program)
// One INSERT for the first 100 rows, another for the next 100,
// then a final flush of the remaining 50 when the source ends.

The result of groupedWithin(n, duration) is a Stream<Array<A>> — each element is one batch. runForEach then sees whole batches, so a single insertMany call handles up to n rows at once.

You want to…Use
Batch by count onlygrouped
Batch by count or timegroupedWithin
Collapse consecutive equal-keyed runsgroupAdjacentBy
Split into one substream per keygroupBy / groupByKey
Reshape the internal chunk boundariesrechunk, chunks
Emit overlapping windowssliding, slidingSize
Fold with a Sinktransduce, aggregate, aggregateWithin
Emit only the latest after a quiet perioddebounce
Rate-limit with a token bucketthrottle, throttleEffect
Decouple a fast producer from a slow consumerbuffer, bufferArray

Partitions the stream into non-empty arrays of a fixed size, preserving order. The final array may be smaller if there are not enough elements left to fill it.

import { Effect, Stream } from "effect"
const program = Stream.range(1, 8).pipe(
Stream.grouped(3),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8 ] ]

Emits an array when the chunk size is reached or the duration passes, whichever happens first. The timer resets after each emitted batch.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe(
// 3 never fills a chunk of 2, so it is flushed when the window elapses.
Stream.groupedWithin(2, "5 seconds"),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3 ] ]

Groups consecutive elements whose keys are equal (by Equal.equals) into [key, group] pairs. Use it when the stream is already ordered by the key; later non-adjacent runs with the same key are emitted as separate groups.

import { Effect, Stream } from "effect"
const program = Stream.make("a", "a", "b", "b", "a").pipe(
Stream.groupAdjacentBy((s) => s),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ "a", [ "a", "a" ] ], [ "b", [ "b", "b" ] ], [ "a", [ "a" ] ] ]
// note: the trailing "a" is a NEW group, not merged with the first run

groupBy and groupByKey fan a stream out into one substream per key. The outer stream emits [key, Stream<value>] pairs; you then process each inner stream, typically with mapEffect and concurrency: "unbounded" so groups run in parallel. Each inner stream is backed by a bounded queue (default capacity 4096); tune it with bufferSize, and use idleTimeToLive to close groups that have gone quiet.

Classifies each element with an effectful function returning [key, value], emitting one substream per distinct key.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.groupBy((n) =>
Effect.succeed([n % 2 === 0 ? "even" : "odd", n] as const)
),
Stream.mapEffect(
Effect.fnUntraced(function* ([key, stream]) {
return [key, yield* Stream.runCollect(stream)] as const
}),
{ concurrency: "unbounded" }
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]

Like groupBy but takes a pure key function and keeps the original element as the value. Prefer this when the key can be computed synchronously.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.groupByKey((n) => (n % 2 === 0 ? "even" : "odd")),
Stream.mapEffect(
([key, stream]) =>
Stream.runCollect(stream).pipe(
Effect.map((values) => [key, values] as const)
),
{ concurrency: "unbounded" }
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]

A Stream carries elements in internal arrays (“chunks”). Most operators are chunk-transparent, but these let you observe and reshape the chunk boundaries — useful for performance tuning and for adapting to APIs that want fixed-size batches.

Exposes the stream’s internal arrays as the stream’s elements: turns Stream<A> into Stream<NonEmptyReadonlyArray<A>>. Combine with rechunk to control the resulting array sizes.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.rechunk(2),
Stream.chunks,
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3, 4 ], [ 5 ] ]

Re-batches the stream into internal chunks of the given size (clamped to at least 1) without changing the element type. The downstream still sees a flat Stream<A>; only the internal chunking changes. Pair with chunks to make the batches visible.

import { Effect, Stream } from "effect"
const program = Stream.range(1, 5).pipe(
Stream.rechunk(2),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3, 4, 5 ] (flat — only internal chunk sizes changed)

Emits overlapping windows of n consecutive elements, advancing by one element each time.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.sliding(2),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ] ]

Like sliding, but lets you set the step (advance) separately from the window size. slidingSize(chunkSize, stepSize).

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.slidingSize(3, 2),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2, 3 ], [ 3, 4, 5 ] ]

For anything more sophisticated than fixed counts, fold the stream with a Sink. A sink consumes some prefix of the stream, produces a result, and hands back any leftover input; these operators run the sink repeatedly and emit each result.

Repeatedly applies a sink as a transducer, emitting each sink result. The sink runs synchronously inline with the stream (no separate fiber), so it cannot flush on its own — it emits when the sink signals completion.

import { Effect, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4).pipe(
Stream.transduce(Sink.take(2)),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ [ 1, 2 ], [ 3, 4 ] ]

Like transduce, but runs the upstream and the sink in separate fibers, so the sink can keep consuming input while downstream is still processing the previous result. Best when downstream work is the bottleneck.

import { Effect, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5, 6).pipe(
// Sum until 3 inputs are seen, then emit and restart.
Stream.aggregate(
Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n))
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 6, 15 ] (1+2+3, then 4+5+6)

aggregate plus a Schedule: the schedule can flush the current aggregation even if the sink has not yet completed. This is the time-aware building block behind groupedWithin. The schedule’s input is the Option<B> of the most recent output.

import { Effect, Schedule, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5, 6).pipe(
Stream.aggregateWithin(
Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n)),
// Force a flush at most once per minute (here the sink completes first).
Schedule.spaced("1 minute")
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 6, 15 ]

Within each quiet window, drops earlier elements and emits only the latest one once the source pauses for duration. Ideal for search-as-you-type, resize events, and other bursty signals where only the final value matters.

import { Duration, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(
Stream.fromEffect(Effect.sleep(Duration.millis(50)).pipe(Effect.as(4)))
),
Stream.concat(Stream.make(5)),
Stream.debounce(Duration.millis(30))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 3, 5 ]
// 1,2,3 arrive in a burst -> only 3 survives; then 4,5 arrive -> only 5

Rate-limits the stream using a token bucket with a synchronous cost function. The bucket holds up to units + burst tokens and refills units tokens per duration. With strategy: "shape" (the default) chunks are delayed to fit the budget; with "enforce" over-budget chunks are dropped.

import { Effect, Schedule, Stream } from "effect"
const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
Stream.take(6),
Stream.throttle({
cost: (chunk) => chunk.length, // one token per element
units: 1,
duration: "100 millis",
strategy: "shape"
})
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 0, 1, 2, 3, 4, 5 ] (emitted no faster than the budget allows)

The same token-bucket throttle, but the cost of a chunk is computed by an effect (e.g. when sizing requires a lookup). Same units / duration / burst / strategy options as throttle.

import { Effect, Schedule, Stream } from "effect"
const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
Stream.take(6),
Stream.throttleEffect({
cost: (chunk) => Effect.succeed(chunk.length),
units: 1,
duration: "100 millis",
strategy: "shape"
})
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// => [ 0, 1, 2, 3, 4, 5 ]

Buffering lets a fast producer run ahead of a slow consumer by holding elements in a queue between them. The queue strategy controls what happens when the buffer is full:

  • "suspend" (default) — apply backpressure: the producer pauses until space frees up. Nothing is lost.
  • "dropping" — discard new items when full.
  • "sliding" — discard the oldest buffered items to make room.

capacity: "unbounded" never blocks and never drops (watch memory).

Buffers up to capacity elements. This combinator destroys chunking, so apply rechunk afterward if you need fixed chunk sizes downstream.

import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe(
Stream.buffer({ capacity: 16, strategy: "suspend" }),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3 ] (output unchanged; the producer can run ahead)

Buffers up to capacity chunks (arrays) instead of individual elements. It preserves chunking and performs best with power-of-2 capacities — prefer it when downstream relies on chunk boundaries.

import { Effect, Stream } from "effect"
const program = Stream.fromArrays([1, 2], [3, 4]).pipe(
Stream.bufferArray({ capacity: 2 }),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// => [ 1, 2, 3, 4 ] (chunk boundaries preserved internally)

  • Sinks — the Sink argument to transduce, aggregate, and aggregateWithin (including Sink.take, Sink.foldUntil, and friends).
  • Schedule — building flush policies for aggregateWithin (and the Schedule.spaced used by groupedWithin).
  • Consuming streamsrunCollect, runForEach, runDrain, and other terminal operators used above.