Transforming Streams
Streams compose with operators the same way Effect does: each operator takes a
stream and returns a new stream, so you build a pipeline with .pipe(...). Pure
transforms (map, filter) reshape elements without running effects; effectful
transforms (mapEffect, flatMap) let each element trigger more work — with
explicit, opt-in concurrency. The example below walks an order-processing
pipeline from raw events to enriched, taxed orders.
import { Stream } from "effect"
interface Order { readonly id: string readonly status: "paid" | "refunded" readonly subtotalCents: number readonly shippingCents: number readonly country: "US" | "CA" | "NZ"}
interface NormalizedOrder extends Order { readonly totalCents: number}
const orderEvents: Stream.Stream<Order> = Stream.make( { id: "ord_1", status: "paid", subtotalCents: 4_500, shippingCents: 500, country: "US" }, { id: "ord_2", status: "refunded", subtotalCents: 2_000, shippingCents: 0, country: "CA" })
// `Stream.map` applies a pure, total function to every element.const normalized = orderEvents.pipe( Stream.map((order): NormalizedOrder => ({ ...order, totalCents: order.subtotalCents + order.shippingCents })))
// `Stream.filter` drops elements that fail the predicate.const paid = normalized.pipe( Stream.filter((order) => order.status === "paid"))Effectful transforms with concurrency
Section titled “Effectful transforms with concurrency”Stream.mapEffect is the workhorse for per-element side effects — a lookup, an
HTTP call, a write. By default it processes elements one at a time, preserving
order. Pass { concurrency: n } to run up to n effects in flight at once; the
stream takes care of backpressure so you never overwhelm the downstream
consumer.
import { Effect, Stream } from "effect"
interface NormalizedOrder { readonly id: string readonly totalCents: number readonly country: "US" | "CA" | "NZ"}
interface EnrichedOrder extends NormalizedOrder { readonly taxCents: number readonly grandTotalCents: number}
// Define the per-element effect with `Effect.fn` — never a plain function that// returns `Effect.gen`. This one simulates an effectful tax lookup.const enrichOrder = Effect.fn("enrichOrder")(function*(order: NormalizedOrder) { yield* Effect.sleep("5 millis") // stand-in for a tax/risk service call
const taxRate = order.country === "US" ? 0.08 : 0.13 const taxCents = Math.round(order.totalCents * taxRate)
return { ...order, taxCents, grandTotalCents: order.totalCents + taxCents } satisfies EnrichedOrder})
const paid: Stream.Stream<NormalizedOrder> = Stream.make( { id: "ord_1", totalCents: 5_000, country: "US" as const })
// Run up to 4 enrichment effects concurrently. Output order is still preserved.export const enriched = paid.pipe( Stream.mapEffect(enrichOrder, { concurrency: 4 }))Stream.flatMap is the other effectful combinator: it maps each element to a
stream and flattens the results, again with optional { concurrency }. Use it
when one input fans out into many outputs (one customer → their order history,
one URL → its paginated pages).
import { Stream } from "effect"
// Each country fans out into a stream of synthetic orders; the inner streams are// flattened into one. `concurrency: 2` runs two country sub-streams at once.export const allOrders = Stream.make("US", "CA", "NZ").pipe( Stream.flatMap( (country) => Stream.range(1, 50).pipe( Stream.map((i) => ({ id: `ord_${country}_${i}`, country })) ), { concurrency: 2 } ))Grouping and batching
Section titled “Grouping and batching”Streaming pipelines often need to batch elements — to write rows in chunks, to emit windows for aggregation, or to partition by key. These operators reshape the cardinality of the stream rather than individual elements:
import { Effect, Stream } from "effect"
const events = Stream.range(1, 10)
// `Stream.grouped(n)` batches elements into non-empty arrays of size `n`// (the last batch may be smaller). Ideal for bulk writes.export const batches = events.pipe(Stream.grouped(3)) // Stream<NonEmptyArray<number>>
// `Stream.groupedWithin(n, duration)` emits a batch as soon as it reaches `n`// elements OR the time window elapses — whichever comes first. This bounds// latency for slow or bursty sources.export const timeBatches = events.pipe(Stream.groupedWithin(3, "5 seconds"))
// `Stream.groupByKey` splits a stream into a stream of `[key, subStream]` pairs,// one substream per distinct key. Each substream can be consumed independently.export const byCountry = Stream.make( { country: "US", amount: 10 }, { country: "CA", amount: 20 }, { country: "US", amount: 5 }).pipe( Stream.groupByKey((o) => o.country), // For each group, sum its amounts and log the per-country total. Stream.mapEffect(([country, group]) => group.pipe( Stream.runFold(() => 0, (acc, o) => acc + o.amount), Effect.tap((total) => Effect.logInfo(`${country}: ${total}`)) ) ))Windowing operators
Section titled “Windowing operators”take, drop, and their *While variants slice a stream without buffering it:
import { Stream } from "effect"
const orders = Stream.range(1, 100)
// First two elements, then the stream stops pulling.export const firstTwo = orders.pipe(Stream.take(2))
// Skip the first element (e.g. a warm-up sample).export const afterWarmup = orders.pipe(Stream.drop(1))
// Emit elements only while the predicate holds, then stop.export const untilBig = orders.pipe(Stream.takeWhile((n) => n < 50))These do not collect anything — they return new streams. Combine them with a destructor from Consuming streams to actually run the pipeline. When an element or effect can fail, see Error handling.