Concurrency Options
Most of the time you don’t fork fibers by hand. Combinators like
Effect.forEach and Effect.all run a collection of effects for you, and
they take a concurrency option that controls how many run at once. This is the
everyday way to add parallelism: change one option and the same code goes from
sequential to fully concurrent — with all the structured-concurrency guarantees
intact (if one effect fails, the rest are interrupted).
import { Effect } from "effect"
interface User { readonly id: number}
// Pretend this hits a rate-limited API.const fetchUser = (id: number) => Effect.succeed<User>({ id }).pipe(Effect.delay("100 millis"))
const program = Effect.gen(function*() { const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Process the collection with at most 4 requests in flight at any moment. // Results come back in input order regardless of completion order. const users = yield* Effect.forEach(ids, fetchUser, { concurrency: 4 })
yield* Effect.log(`fetched ${users.length} users`)})
Effect.runFork(program)The concurrency option
Section titled “The concurrency option”The option accepts three kinds of value:
type Concurrency = number | "unbounded" | "inherit"Sequential (the default)
Section titled “Sequential (the default)”If you omit concurrency, effects run one at a time, in order. Each starts
only after the previous one finishes. This is the safe default — opt in to
parallelism deliberately.
import { Effect } from "effect"
const task = (n: number) => Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// Runs task 1, then task 2, then task 3 — total ~300ms.const sequential = Effect.forEach([1, 2, 3], task)A number — bounded concurrency
Section titled “A number — bounded concurrency”Pass a number to cap how many effects run simultaneously. This is the option
you’ll reach for most: it lets you parallelize without overwhelming a database,
an API rate limit, or a connection pool.
import { Effect } from "effect"
const task = (n: number) => Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// At most 2 tasks run at once; as one finishes the next one starts.const bounded = Effect.forEach([1, 2, 3, 4, 5], task, { concurrency: 2})"unbounded" — as many as possible
Section titled “"unbounded" — as many as possible”Start every effect immediately, with no limit. Use this when you know the collection is small and bounded, or when the work is cheap and you want maximum throughput.
import { Effect } from "effect"
const task = (n: number) => Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// All five run concurrently — total ~100ms.const unbounded = Effect.forEach([1, 2, 3, 4, 5], task, { concurrency: "unbounded"})"inherit" — take it from the context
Section titled “"inherit" — take it from the context”"inherit" defers to a concurrency level set on the surrounding context with
Effect.withConcurrency. If nothing is set, it behaves as "unbounded". This
lets a caller tune the parallelism of code that didn’t hard-code a limit.
import { Effect } from "effect"
const task = (n: number) => Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// The inner combinator inherits whatever concurrency the caller configures.const work = Effect.forEach([1, 2, 3, 4, 5], task, { concurrency: "inherit"})
// The caller pins the level to 2 for everything inside `work`.const program = work.pipe(Effect.withConcurrency(2))Where these options apply
Section titled “Where these options apply”The concurrency option isn’t unique to forEach. It appears on the whole
family of “run a collection” combinators, including:
Effect.forEach(items, f, { concurrency })— map each item to an effect and collect results in order.Effect.all([...effects], { concurrency })— run a tuple or record of effects.- Many stream operators (see Streaming) accept the same option.
Discarding results
Section titled “Discarding results”When you only care about the effects and not their return values — running a
batch of writes, say — add discard: true. The combinator still runs everything
(with your chosen concurrency) but returns void instead of building an array,
avoiding the allocation.
import { Effect } from "effect"
const writeRecord = (id: number) => Effect.log(`wrote ${id}`).pipe(Effect.delay("50 millis"))
const program = Effect.forEach([1, 2, 3, 4, 5], writeRecord, { concurrency: 3, discard: true // returns Effect<void>, not Effect<void[]>})Failure interrupts the rest
Section titled “Failure interrupts the rest”These combinators preserve structured concurrency: if any effect fails, the others that are still running are interrupted, and the combined effect fails with that cause. You don’t have to clean up the in-flight work yourself — see Interruption for the details of how that cancellation propagates.
If instead you want to run everything to completion and collect all outcomes
(successes and failures alike), Effect.all supports mode: "result", which
yields a Result per element rather than short-circuiting on the
first failure.