Grouping, Chunking & Buffering
Streams rarely flow at a convenient granularity. Events arrive one at a time but you want to write them to a database in batches; a websocket fires faster than your renderer can keep up; a noisy sensor needs debouncing. This page covers all of the operators that reshape a stream over size, time, key, or buffering before you consume it.
Common case: batching events for a bulk write
Section titled “Common case: batching events for a bulk write”The single most useful operator here is Stream.groupedWithin:
it emits a batch as soon as either the chunk fills up or a time window
elapses, whichever comes first. That is exactly the policy you want for a bulk
insert: never wait too long, never write rows one at a time.
import { Context, Effect, Stream } from "effect"
// A service that persists a batch of events in a single round-trip.class Database extends Context.Service<Database>()("app/Database", { make: Effect.succeed({ insertMany: (rows: ReadonlyArray<{ readonly id: number }>) => Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`) })}) {}
const ingest = (events: Stream.Stream<{ readonly id: number }>) => events.pipe( // Flush every 100 events, or every 2 seconds, whichever comes first. Stream.groupedWithin(100, "2 seconds"), Stream.runForEach((batch) => Effect.flatMap(Database, (db) => db.insertMany(batch)) ) )
const program = ingest(Stream.range(1, 250)).pipe( Effect.provideService(Database, { insertMany: (rows) => Effect.log(`INSERT ${rows.length} rows: [${rows.map((r) => r.id).join(", ")}]`) }))
Effect.runPromise(program)// One INSERT for the first 100 rows, another for the next 100,// then a final flush of the remaining 50 when the source ends.The result of groupedWithin(n, duration) is a Stream<Array<A>> — each element
is one batch. runForEach then sees whole batches, so a single insertMany call
handles up to n rows at once.
Choosing an operator
Section titled “Choosing an operator”| You want to… | Use |
|---|---|
| Batch by count only | grouped |
| Batch by count or time | groupedWithin |
| Collapse consecutive equal-keyed runs | groupAdjacentBy |
| Split into one substream per key | groupBy / groupByKey |
| Reshape the internal chunk boundaries | rechunk, chunks |
| Emit overlapping windows | sliding, slidingSize |
Fold with a Sink | transduce, aggregate, aggregateWithin |
| Emit only the latest after a quiet period | debounce |
| Rate-limit with a token bucket | throttle, throttleEffect |
| Decouple a fast producer from a slow consumer | buffer, bufferArray |
Fixed grouping
Section titled “Fixed grouping”grouped
Section titled “grouped”Partitions the stream into non-empty arrays of a fixed size, preserving order. The final array may be smaller if there are not enough elements left to fill it.
import { Effect, Stream } from "effect"
const program = Stream.range(1, 8).pipe( Stream.grouped(3), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8 ] ]groupedWithin
Section titled “groupedWithin”Emits an array when the chunk size is reached or the duration passes, whichever happens first. The timer resets after each emitted batch.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe( // 3 never fills a chunk of 2, so it is flushed when the window elapses. Stream.groupedWithin(2, "5 seconds"), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2 ], [ 3 ] ]groupAdjacentBy
Section titled “groupAdjacentBy”Groups consecutive elements whose keys are equal (by Equal.equals) into
[key, group] pairs. Use it when the stream is already ordered by the key; later
non-adjacent runs with the same key are emitted as separate groups.
import { Effect, Stream } from "effect"
const program = Stream.make("a", "a", "b", "b", "a").pipe( Stream.groupAdjacentBy((s) => s), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ "a", [ "a", "a" ] ], [ "b", [ "b", "b" ] ], [ "a", [ "a" ] ] ]// note: the trailing "a" is a NEW group, not merged with the first runKeyed grouping
Section titled “Keyed grouping”groupBy and groupByKey fan a stream out into one substream per key. The
outer stream emits [key, Stream<value>] pairs; you then process each inner
stream, typically with mapEffect and concurrency: "unbounded" so groups run
in parallel. Each inner stream is backed by a bounded queue (default capacity
4096); tune it with bufferSize, and use idleTimeToLive to close groups that
have gone quiet.
groupBy
Section titled “groupBy”Classifies each element with an effectful function returning [key, value],
emitting one substream per distinct key.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe( Stream.groupBy((n) => Effect.succeed([n % 2 === 0 ? "even" : "odd", n] as const) ), Stream.mapEffect( Effect.fnUntraced(function* ([key, stream]) { return [key, yield* Stream.runCollect(stream)] as const }), { concurrency: "unbounded" } ), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]groupByKey
Section titled “groupByKey”Like groupBy but takes a pure key function and keeps the original element
as the value. Prefer this when the key can be computed synchronously.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe( Stream.groupByKey((n) => (n % 2 === 0 ? "even" : "odd")), Stream.mapEffect( ([key, stream]) => Stream.runCollect(stream).pipe( Effect.map((values) => [key, values] as const) ), { concurrency: "unbounded" } ), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ "odd", [ 1, 3, 5 ] ], [ "even", [ 2, 4 ] ] ]Chunk control
Section titled “Chunk control”A Stream carries elements in internal arrays (“chunks”). Most operators are
chunk-transparent, but these let you observe and reshape the chunk boundaries —
useful for performance tuning and for adapting to APIs that want fixed-size
batches.
chunks
Section titled “chunks”Exposes the stream’s internal arrays as the stream’s elements: turns
Stream<A> into Stream<NonEmptyReadonlyArray<A>>. Combine with rechunk to
control the resulting array sizes.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe( Stream.rechunk(2), Stream.chunks, Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2 ], [ 3, 4 ], [ 5 ] ]rechunk
Section titled “rechunk”Re-batches the stream into internal chunks of the given size (clamped to at
least 1) without changing the element type. The downstream still sees a flat
Stream<A>; only the internal chunking changes. Pair with chunks to make the
batches visible.
import { Effect, Stream } from "effect"
const program = Stream.range(1, 5).pipe( Stream.rechunk(2), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 1, 2, 3, 4, 5 ] (flat — only internal chunk sizes changed)sliding
Section titled “sliding”Emits overlapping windows of n consecutive elements, advancing by one element
each time.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe( Stream.sliding(2), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ] ]slidingSize
Section titled “slidingSize”Like sliding, but lets you set the step (advance) separately from the window
size. slidingSize(chunkSize, stepSize).
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe( Stream.slidingSize(3, 2), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2, 3 ], [ 3, 4, 5 ] ]Sink-driven aggregation
Section titled “Sink-driven aggregation”For anything more sophisticated than fixed counts, fold the stream with a
Sink. A sink consumes some prefix of the stream, produces a
result, and hands back any leftover input; these operators run the sink
repeatedly and emit each result.
transduce
Section titled “transduce”Repeatedly applies a sink as a transducer, emitting each sink result. The sink runs synchronously inline with the stream (no separate fiber), so it cannot flush on its own — it emits when the sink signals completion.
import { Effect, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4).pipe( Stream.transduce(Sink.take(2)), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ [ 1, 2 ], [ 3, 4 ] ]aggregate
Section titled “aggregate”Like transduce, but runs the upstream and the sink in separate fibers, so
the sink can keep consuming input while downstream is still processing the
previous result. Best when downstream work is the bottleneck.
import { Effect, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5, 6).pipe( // Sum until 3 inputs are seen, then emit and restart. Stream.aggregate( Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n)) ), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 6, 15 ] (1+2+3, then 4+5+6)aggregateWithin
Section titled “aggregateWithin”aggregate plus a Schedule: the schedule can flush
the current aggregation even if the sink has not yet completed. This is the
time-aware building block behind groupedWithin. The schedule’s input is the
Option<B> of the most recent output.
import { Effect, Schedule, Sink, Stream } from "effect"
const program = Stream.make(1, 2, 3, 4, 5, 6).pipe( Stream.aggregateWithin( Sink.foldUntil(() => 0, 3, (sum, n: number) => Effect.succeed(sum + n)), // Force a flush at most once per minute (here the sink completes first). Schedule.spaced("1 minute") ), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 6, 15 ]Time-based operators
Section titled “Time-based operators”debounce
Section titled “debounce”Within each quiet window, drops earlier elements and emits only the latest
one once the source pauses for duration. Ideal for search-as-you-type, resize
events, and other bursty signals where only the final value matters.
import { Duration, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe( Stream.concat( Stream.fromEffect(Effect.sleep(Duration.millis(50)).pipe(Effect.as(4))) ), Stream.concat(Stream.make(5)), Stream.debounce(Duration.millis(30)))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)// => [ 3, 5 ]// 1,2,3 arrive in a burst -> only 3 survives; then 4,5 arrive -> only 5throttle
Section titled “throttle”Rate-limits the stream using a token bucket with a synchronous cost
function. The bucket holds up to units + burst tokens and refills units
tokens per duration. With strategy: "shape" (the default) chunks are
delayed to fit the budget; with "enforce" over-budget chunks are
dropped.
import { Effect, Schedule, Stream } from "effect"
const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe( Stream.take(6), Stream.throttle({ cost: (chunk) => chunk.length, // one token per element units: 1, duration: "100 millis", strategy: "shape" }))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)// => [ 0, 1, 2, 3, 4, 5 ] (emitted no faster than the budget allows)throttleEffect
Section titled “throttleEffect”The same token-bucket throttle, but the cost of a chunk is computed by an
effect (e.g. when sizing requires a lookup). Same units / duration /
burst / strategy options as throttle.
import { Effect, Schedule, Stream } from "effect"
const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe( Stream.take(6), Stream.throttleEffect({ cost: (chunk) => Effect.succeed(chunk.length), units: 1, duration: "100 millis", strategy: "shape" }))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)// => [ 0, 1, 2, 3, 4, 5 ]Buffering
Section titled “Buffering”Buffering lets a fast producer run ahead of a slow consumer by holding
elements in a queue between them. The queue strategy controls what happens when
the buffer is full:
"suspend"(default) — apply backpressure: the producer pauses until space frees up. Nothing is lost."dropping"— discard new items when full."sliding"— discard the oldest buffered items to make room.
capacity: "unbounded" never blocks and never drops (watch memory).
buffer
Section titled “buffer”Buffers up to capacity elements. This combinator destroys chunking, so
apply rechunk afterward if you need fixed chunk sizes downstream.
import { Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe( Stream.buffer({ capacity: 16, strategy: "suspend" }), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 1, 2, 3 ] (output unchanged; the producer can run ahead)bufferArray
Section titled “bufferArray”Buffers up to capacity chunks (arrays) instead of individual elements. It
preserves chunking and performs best with power-of-2 capacities — prefer it
when downstream relies on chunk boundaries.
import { Effect, Stream } from "effect"
const program = Stream.fromArrays([1, 2], [3, 4]).pipe( Stream.bufferArray({ capacity: 2 }), Stream.runCollect)
Effect.runPromise(program).then(console.log)// => [ 1, 2, 3, 4 ] (chunk boundaries preserved internally)See also
Section titled “See also”- Sinks — the
Sinkargument totransduce,aggregate, andaggregateWithin(includingSink.take,Sink.foldUntil, and friends). - Schedule — building flush policies for
aggregateWithin(and theSchedule.spacedused bygroupedWithin). - Consuming streams —
runCollect,runForEach,runDrain, and other terminal operators used above.