Queue and PubSub
When fibers need to hand work to one another, two complementary primitives cover the field:
- A
Queueis a buffer that distributes messages — each item is taken by exactly one consumer. Use it to spread a stream of jobs across a pool of workers. - A
PubSubbroadcasts messages — each item is delivered to every subscriber. Use it to fan a domain event out to many independent listeners.
Both are asynchronous and fiber-safe: producers and consumers suspend rather than busy-wait, and a bounded variant applies backpressure so a fast producer can’t overwhelm slow consumers.
Queue — distributing work
Section titled “Queue — distributing work”Queue.offer adds a message; Queue.take removes and returns one, suspending if
the queue is empty. Because each message is taken once, a queue is the natural
backbone of a producer/consumer or worker-pool design.
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() { // A bounded queue: producers suspend when it's full (backpressure). const queue = yield* Queue.bounded<number>(16)
// A worker that pulls jobs forever, one at a time. const worker = (id: number) => Effect.forever( Effect.gen(function*() { const job = yield* Queue.take(queue) yield* Effect.log(`worker ${id} handling job ${job}`) yield* Effect.sleep("100 millis") }) )
// Start three workers as supervised children. yield* Effect.forEach([1, 2, 3], (id) => Effect.forkChild(worker(id)))
// Feed in ten jobs; the three workers share them out between themselves. yield* Effect.forEach([...Array(10).keys()], (n) => Queue.offer(queue, n))
yield* Effect.sleep("1 second")})
Effect.runFork(program)Backpressure strategies
Section titled “Backpressure strategies”Queue.bounded(capacity) applies backpressure: once full, offer suspends
the producer until a consumer makes room. That’s the safe default, but you can
choose a different strategy when you’d rather drop messages than slow producers:
| Constructor | When full, offer… |
| --- | --- |
| Queue.bounded(n) | suspends the producer (backpressure) |
| Queue.dropping(n) | discards the new message and returns false |
| Queue.sliding(n) | discards the oldest message to make room |
| Queue.unbounded() | never blocks; grows without limit |
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() { // A sliding queue of size 2 keeps only the most recent values. const queue = yield* Queue.sliding<string>(2)
yield* Queue.offer(queue, "a") yield* Queue.offer(queue, "b") yield* Queue.offer(queue, "c") // evicts "a"
const all = yield* Queue.takeAll(queue) yield* Effect.log(all) // ["b", "c"]})
Effect.runFork(program)PubSub — broadcasting events
Section titled “PubSub — broadcasting events”A PubSub is the broadcast counterpart of a queue. Publishing delivers the
message to every current subscriber, so it’s the tool for an in-process event
bus, live update notifications, or any one-to-many fan-out. Each subscriber gets
its own buffered view via PubSub.subscribe, scoped so it’s cleaned up
automatically.
The idiomatic shape is to wrap the PubSub in a service and expose a typed
publish plus a subscribe stream:
import { Context, Effect, Layer, PubSub, Stream } from "effect"
export type OrderEvent = | { readonly _tag: "OrderPlaced"; readonly orderId: string } | { readonly _tag: "PaymentCaptured"; readonly orderId: string } | { readonly _tag: "OrderShipped"; readonly orderId: string }
export class OrderEvents extends Context.Service<OrderEvents, { publish(event: OrderEvent): Effect.Effect<void> publishAll(events: ReadonlyArray<OrderEvent>): Effect.Effect<void> readonly subscribe: Stream.Stream<OrderEvent>}>()("acme/OrderEvents") { static readonly layer = Layer.effect( OrderEvents, Effect.gen(function*() { // Bounded for backpressure; `replay` lets late subscribers catch up on // the most recent events (handy after a reconnect). const pubsub = yield* PubSub.bounded<OrderEvent>({ capacity: 256, replay: 50 })
// Shut the PubSub down when the service's scope closes. yield* Effect.addFinalizer(() => PubSub.shutdown(pubsub))
const publish = Effect.fn("OrderEvents.publish")( function*(event: OrderEvent) { yield* PubSub.publish(pubsub, event) } )
const publishAll = Effect.fn("OrderEvents.publishAll")( function*(events: ReadonlyArray<OrderEvent>) { yield* PubSub.publishAll(pubsub, events) } )
// Every consumer of `subscribe` gets *its own* stream of all events // published after it subscribes (plus the replay buffer, if any). const subscribe = Stream.fromPubSub(pubsub)
return OrderEvents.of({ publish, publishAll, subscribe }) }) )}Consumers each run their own stream and receive every event independently:
import { Effect, Stream } from "effect"
// One subscriber updates a read model; another sends notifications.// Both see *all* events because PubSub broadcasts.const projectionWorker = Effect.gen(function*() { const events = yield* OrderEvents yield* events.subscribe.pipe( Stream.tap((event) => Effect.log(`projection: ${event._tag}`)), Stream.runDrain )})
const notificationWorker = Effect.gen(function*() { const events = yield* OrderEvents yield* events.subscribe.pipe( Stream.tap((event) => Effect.log(`notify: ${event._tag}`)), Stream.runDrain )})Queue or PubSub?
Section titled “Queue or PubSub?”The choice comes down to delivery:
| | Queue | PubSub |
| --- | --- | --- |
| Each message goes to | exactly one consumer | every subscriber |
| Typical use | work distribution, job pools | event bus, live fan-out |
| Adding a consumer | takes a share of the load | gets the full stream |
If you need both — durable work distribution with delivery guarantees across
processes — look at Cluster and the messaging built on top of these
primitives. For coordinating a single hand-off of one value between two fibers,
a Deferred is lighter than either.