Skip to content

Transforming Streams

Streams compose with operators the same way Effect does: each operator takes a stream and returns a new stream, so you build a pipeline with .pipe(...). Pure transforms (map, filter) reshape elements without running effects; effectful transforms (mapEffect, flatMap) let each element trigger more work — with explicit, opt-in concurrency. The example below walks an order-processing pipeline from raw events to enriched, taxed orders.

import { Stream } from "effect"
interface Order {
readonly id: string
readonly status: "paid" | "refunded"
readonly subtotalCents: number
readonly shippingCents: number
readonly country: "US" | "CA" | "NZ"
}
interface NormalizedOrder extends Order {
readonly totalCents: number
}
const orderEvents: Stream.Stream<Order> = Stream.make(
{ id: "ord_1", status: "paid", subtotalCents: 4_500, shippingCents: 500, country: "US" },
{ id: "ord_2", status: "refunded", subtotalCents: 2_000, shippingCents: 0, country: "CA" }
)
// `Stream.map` applies a pure, total function to every element.
const normalized = orderEvents.pipe(
Stream.map((order): NormalizedOrder => ({
...order,
totalCents: order.subtotalCents + order.shippingCents
}))
)
// `Stream.filter` drops elements that fail the predicate.
const paid = normalized.pipe(
Stream.filter((order) => order.status === "paid")
)

Stream.mapEffect is the workhorse for per-element side effects — a lookup, an HTTP call, a write. By default it processes elements one at a time, preserving order. Pass { concurrency: n } to run up to n effects in flight at once; the stream takes care of backpressure so you never overwhelm the downstream consumer.

import { Effect, Stream } from "effect"
interface NormalizedOrder {
readonly id: string
readonly totalCents: number
readonly country: "US" | "CA" | "NZ"
}
interface EnrichedOrder extends NormalizedOrder {
readonly taxCents: number
readonly grandTotalCents: number
}
// Define the per-element effect with `Effect.fn` — never a plain function that
// returns `Effect.gen`. This one simulates an effectful tax lookup.
const enrichOrder = Effect.fn("enrichOrder")(function*(order: NormalizedOrder) {
yield* Effect.sleep("5 millis") // stand-in for a tax/risk service call
const taxRate = order.country === "US" ? 0.08 : 0.13
const taxCents = Math.round(order.totalCents * taxRate)
return {
...order,
taxCents,
grandTotalCents: order.totalCents + taxCents
} satisfies EnrichedOrder
})
const paid: Stream.Stream<NormalizedOrder> = Stream.make(
{ id: "ord_1", totalCents: 5_000, country: "US" as const }
)
// Run up to 4 enrichment effects concurrently. Output order is still preserved.
export const enriched = paid.pipe(
Stream.mapEffect(enrichOrder, { concurrency: 4 })
)

Stream.flatMap is the other effectful combinator: it maps each element to a stream and flattens the results, again with optional { concurrency }. Use it when one input fans out into many outputs (one customer → their order history, one URL → its paginated pages).

import { Stream } from "effect"
// Each country fans out into a stream of synthetic orders; the inner streams are
// flattened into one. `concurrency: 2` runs two country sub-streams at once.
export const allOrders = Stream.make("US", "CA", "NZ").pipe(
Stream.flatMap(
(country) =>
Stream.range(1, 50).pipe(
Stream.map((i) => ({ id: `ord_${country}_${i}`, country }))
),
{ concurrency: 2 }
)
)

Streaming pipelines often need to batch elements — to write rows in chunks, to emit windows for aggregation, or to partition by key. These operators reshape the cardinality of the stream rather than individual elements:

import { Effect, Stream } from "effect"
const events = Stream.range(1, 10)
// `Stream.grouped(n)` batches elements into non-empty arrays of size `n`
// (the last batch may be smaller). Ideal for bulk writes.
export const batches = events.pipe(Stream.grouped(3)) // Stream<NonEmptyArray<number>>
// `Stream.groupedWithin(n, duration)` emits a batch as soon as it reaches `n`
// elements OR the time window elapses — whichever comes first. This bounds
// latency for slow or bursty sources.
export const timeBatches = events.pipe(Stream.groupedWithin(3, "5 seconds"))
// `Stream.groupByKey` splits a stream into a stream of `[key, subStream]` pairs,
// one substream per distinct key. Each substream can be consumed independently.
export const byCountry = Stream.make(
{ country: "US", amount: 10 },
{ country: "CA", amount: 20 },
{ country: "US", amount: 5 }
).pipe(
Stream.groupByKey((o) => o.country),
// For each group, sum its amounts and log the per-country total.
Stream.mapEffect(([country, group]) =>
group.pipe(
Stream.runFold(() => 0, (acc, o) => acc + o.amount),
Effect.tap((total) => Effect.logInfo(`${country}: ${total}`))
)
)
)

take, drop, and their *While variants slice a stream without buffering it:

import { Stream } from "effect"
const orders = Stream.range(1, 100)
// First two elements, then the stream stops pulling.
export const firstTwo = orders.pipe(Stream.take(2))
// Skip the first element (e.g. a warm-up sample).
export const afterWarmup = orders.pipe(Stream.drop(1))
// Emit elements only while the predicate holds, then stop.
export const untilBig = orders.pipe(Stream.takeWhile((n) => n < 50))

These do not collect anything — they return new streams. Combine them with a destructor from Consuming streams to actually run the pipeline. When an element or effect can fail, see Error handling.