Skip to content

Concurrency Options

Most of the time you don’t fork fibers by hand. Combinators like Effect.forEach and Effect.all run a collection of effects for you, and they take a concurrency option that controls how many run at once. This is the everyday way to add parallelism: change one option and the same code goes from sequential to fully concurrent — with all the structured-concurrency guarantees intact (if one effect fails, the rest are interrupted).

import { Effect } from "effect"
interface User {
readonly id: number
}
// Pretend this hits a rate-limited API.
const fetchUser = (id: number) =>
Effect.succeed<User>({ id }).pipe(Effect.delay("100 millis"))
const program = Effect.gen(function*() {
const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Process the collection with at most 4 requests in flight at any moment.
// Results come back in input order regardless of completion order.
const users = yield* Effect.forEach(ids, fetchUser, {
concurrency: 4
})
yield* Effect.log(`fetched ${users.length} users`)
})
Effect.runFork(program)

The option accepts three kinds of value:

type Concurrency = number | "unbounded" | "inherit"

If you omit concurrency, effects run one at a time, in order. Each starts only after the previous one finishes. This is the safe default — opt in to parallelism deliberately.

import { Effect } from "effect"
const task = (n: number) =>
Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// Runs task 1, then task 2, then task 3 — total ~300ms.
const sequential = Effect.forEach([1, 2, 3], task)

Pass a number to cap how many effects run simultaneously. This is the option you’ll reach for most: it lets you parallelize without overwhelming a database, an API rate limit, or a connection pool.

import { Effect } from "effect"
const task = (n: number) =>
Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// At most 2 tasks run at once; as one finishes the next one starts.
const bounded = Effect.forEach([1, 2, 3, 4, 5], task, {
concurrency: 2
})

Start every effect immediately, with no limit. Use this when you know the collection is small and bounded, or when the work is cheap and you want maximum throughput.

import { Effect } from "effect"
const task = (n: number) =>
Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// All five run concurrently — total ~100ms.
const unbounded = Effect.forEach([1, 2, 3, 4, 5], task, {
concurrency: "unbounded"
})

"inherit" defers to a concurrency level set on the surrounding context with Effect.withConcurrency. If nothing is set, it behaves as "unbounded". This lets a caller tune the parallelism of code that didn’t hard-code a limit.

import { Effect } from "effect"
const task = (n: number) =>
Effect.log(`task ${n}`).pipe(Effect.delay("100 millis"))
// The inner combinator inherits whatever concurrency the caller configures.
const work = Effect.forEach([1, 2, 3, 4, 5], task, {
concurrency: "inherit"
})
// The caller pins the level to 2 for everything inside `work`.
const program = work.pipe(Effect.withConcurrency(2))

The concurrency option isn’t unique to forEach. It appears on the whole family of “run a collection” combinators, including:

  • Effect.forEach(items, f, { concurrency }) — map each item to an effect and collect results in order.
  • Effect.all([...effects], { concurrency }) — run a tuple or record of effects.
  • Many stream operators (see Streaming) accept the same option.

When you only care about the effects and not their return values — running a batch of writes, say — add discard: true. The combinator still runs everything (with your chosen concurrency) but returns void instead of building an array, avoiding the allocation.

import { Effect } from "effect"
const writeRecord = (id: number) =>
Effect.log(`wrote ${id}`).pipe(Effect.delay("50 millis"))
const program = Effect.forEach([1, 2, 3, 4, 5], writeRecord, {
concurrency: 3,
discard: true // returns Effect<void>, not Effect<void[]>
})

These combinators preserve structured concurrency: if any effect fails, the others that are still running are interrupted, and the combined effect fails with that cause. You don’t have to clean up the in-flight work yourself — see Interruption for the details of how that cancellation propagates.

If instead you want to run everything to completion and collect all outcomes (successes and failures alike), Effect.all supports mode: "result", which yields a Result per element rather than short-circuiting on the first failure.