Semaphore and Latch
Once you have many fibers, you need ways to coordinate them. Two small, composable primitives cover most needs:
- A
Semaphorelimits how many fibers may hold a resource at once — perfect for capping concurrency against a connection pool, an API quota, or any shared resource that can’t take unlimited load. - A
Latchis a gate: fibers wait on it until something opens it, then all waiters proceed. Use it to hold work back until initialization is done, or to pause and resume a pipeline.
Semaphore
Section titled “Semaphore”A semaphore holds a fixed number of permits. withPermits(n) acquires n
permits before running an effect and releases them when the effect finishes —
success, failure, or interruption. If not enough permits are free, the fiber
suspends until they are. A semaphore with one permit is a mutex.
import { Effect, Semaphore } from "effect"
const program = Effect.gen(function*() { // Create a semaphore that allows at most 2 holders at a time. const semaphore = yield* Semaphore.make(2)
const task = (n: number) => // `withPermit` takes exactly one permit for the duration of the effect. semaphore.withPermit( Effect.gen(function*() { yield* Effect.log(`task ${n} running`) yield* Effect.sleep("500 millis") yield* Effect.log(`task ${n} done`) }) )
// Six tasks are forked, but the semaphore lets only 2 run concurrently. yield* Effect.forEach([1, 2, 3, 4, 5, 6], task, { concurrency: "unbounded" })})
Effect.runFork(program)Here Effect.forEach with concurrency: "unbounded" wants to run all six
tasks at once, but each task wraps itself in withPermit, so the semaphore caps
the effective concurrency at two. The permit is released automatically when each
task’s effect exits, so the next waiting fiber is admitted immediately.
Sharing a semaphore from a service
Section titled “Sharing a semaphore from a service”The real power of a semaphore is sharing one instance everywhere. Hold it in a service so every consumer goes through the same gate:
import { Context, Effect, Layer, Semaphore } from "effect"
// A client that allows at most 5 concurrent outbound requests, process-wide.export class RateLimitedApi extends Context.Service<RateLimitedApi, { request<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>}>()("app/RateLimitedApi") { static readonly layer = Layer.effect( RateLimitedApi, Effect.gen(function*() { // One shared semaphore for the lifetime of the layer. const semaphore = yield* Semaphore.make(5)
return RateLimitedApi.of({ // Every request, from anywhere, competes for the same 5 permits. request: (effect) => Semaphore.withPermit(semaphore, effect) }) }) )}Semaphore.make(n) and the standalone Semaphore.withPermit / withPermits
functions are the data-first equivalents of the methods on the semaphore object —
use whichever style reads better. You can also acquire several permits at once
with withPermits(n), try without blocking via withPermitsIfAvailable, or
adjust the limit at runtime with Semaphore.resize.
A Latch is a one-bit gate that fibers can wait on. Its await effect suspends
the calling fiber while the latch is closed, and resumes once it is
opened. Opening is sticky: every fiber that waits after the latch opens
proceeds immediately.
import { Effect, Latch } from "effect"
const program = Effect.gen(function*() { // Start closed: waiters will suspend until we open it. const latch = yield* Latch.make(false)
// Fork a worker that must wait for the green light before doing anything. yield* Effect.forkChild( Effect.gen(function*() { yield* Effect.log("worker: waiting for the latch") yield* latch.await yield* Effect.log("worker: go!") }) )
// Do some setup, then open the latch to release the worker. yield* Effect.sleep("500 millis") yield* Effect.log("main: setup done, opening latch") yield* latch.open})
Effect.runFork(program)// worker: waiting for the latch// main: setup done, opening latch// worker: go!Open, release, and close
Section titled “Open, release, and close”A latch gives you three ways to control waiters, which matter when work arrives continuously:
open— release all current waiters and let future waiters pass immediately. The gate stays open.release— wake the fibers currently waiting, but leave the latch closed so later waiters suspend again. A one-shot pulse.close— re-close an open latch so futureawaits suspend once more.
import { Effect, Latch } from "effect"
const program = Effect.gen(function*() { const latch = yield* Latch.make(false)
// Pause a pipeline: close the latch and have stages await it. yield* Latch.close(latch)
// ...later, resume just the in-flight waiters without re-opening... yield* Latch.release(latch)
// ...or open it for good so everything flows. yield* Latch.open(latch)})This open/release/close trio makes a latch a natural pause/resume switch for a
streaming pipeline or a pool of workers: close to pause, open to
resume. For a single one-time hand-off of a value between fibers — a producer
computing a result that one consumer awaits — reach for a
Deferred instead, which carries the value (or failure)
along with the signal.