Skip to content

Queue and PubSub

When fibers need to hand work to one another, two complementary primitives cover the field:

  • A Queue is a buffer that distributes messages — each item is taken by exactly one consumer. Use it to spread a stream of jobs across a pool of workers.
  • A PubSub broadcasts messages — each item is delivered to every subscriber. Use it to fan a domain event out to many independent listeners.

Both are asynchronous and fiber-safe: producers and consumers suspend rather than busy-wait, and a bounded variant applies backpressure so a fast producer can’t overwhelm slow consumers.

Queue.offer adds a message; Queue.take removes and returns one, suspending if the queue is empty. Because each message is taken once, a queue is the natural backbone of a producer/consumer or worker-pool design.

import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// A bounded queue: producers suspend when it's full (backpressure).
const queue = yield* Queue.bounded<number>(16)
// A worker that pulls jobs forever, one at a time.
const worker = (id: number) =>
Effect.forever(
Effect.gen(function*() {
const job = yield* Queue.take(queue)
yield* Effect.log(`worker ${id} handling job ${job}`)
yield* Effect.sleep("100 millis")
})
)
// Start three workers as supervised children.
yield* Effect.forEach([1, 2, 3], (id) => Effect.forkChild(worker(id)))
// Feed in ten jobs; the three workers share them out between themselves.
yield* Effect.forEach([...Array(10).keys()], (n) => Queue.offer(queue, n))
yield* Effect.sleep("1 second")
})
Effect.runFork(program)

Queue.bounded(capacity) applies backpressure: once full, offer suspends the producer until a consumer makes room. That’s the safe default, but you can choose a different strategy when you’d rather drop messages than slow producers:

| Constructor | When full, offer… | | --- | --- | | Queue.bounded(n) | suspends the producer (backpressure) | | Queue.dropping(n) | discards the new message and returns false | | Queue.sliding(n) | discards the oldest message to make room | | Queue.unbounded() | never blocks; grows without limit |

import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// A sliding queue of size 2 keeps only the most recent values.
const queue = yield* Queue.sliding<string>(2)
yield* Queue.offer(queue, "a")
yield* Queue.offer(queue, "b")
yield* Queue.offer(queue, "c") // evicts "a"
const all = yield* Queue.takeAll(queue)
yield* Effect.log(all) // ["b", "c"]
})
Effect.runFork(program)

A PubSub is the broadcast counterpart of a queue. Publishing delivers the message to every current subscriber, so it’s the tool for an in-process event bus, live update notifications, or any one-to-many fan-out. Each subscriber gets its own buffered view via PubSub.subscribe, scoped so it’s cleaned up automatically.

The idiomatic shape is to wrap the PubSub in a service and expose a typed publish plus a subscribe stream:

import { Context, Effect, Layer, PubSub, Stream } from "effect"
export type OrderEvent =
| { readonly _tag: "OrderPlaced"; readonly orderId: string }
| { readonly _tag: "PaymentCaptured"; readonly orderId: string }
| { readonly _tag: "OrderShipped"; readonly orderId: string }
export class OrderEvents extends Context.Service<OrderEvents, {
publish(event: OrderEvent): Effect.Effect<void>
publishAll(events: ReadonlyArray<OrderEvent>): Effect.Effect<void>
readonly subscribe: Stream.Stream<OrderEvent>
}>()("acme/OrderEvents") {
static readonly layer = Layer.effect(
OrderEvents,
Effect.gen(function*() {
// Bounded for backpressure; `replay` lets late subscribers catch up on
// the most recent events (handy after a reconnect).
const pubsub = yield* PubSub.bounded<OrderEvent>({
capacity: 256,
replay: 50
})
// Shut the PubSub down when the service's scope closes.
yield* Effect.addFinalizer(() => PubSub.shutdown(pubsub))
const publish = Effect.fn("OrderEvents.publish")(
function*(event: OrderEvent) {
yield* PubSub.publish(pubsub, event)
}
)
const publishAll = Effect.fn("OrderEvents.publishAll")(
function*(events: ReadonlyArray<OrderEvent>) {
yield* PubSub.publishAll(pubsub, events)
}
)
// Every consumer of `subscribe` gets *its own* stream of all events
// published after it subscribes (plus the replay buffer, if any).
const subscribe = Stream.fromPubSub(pubsub)
return OrderEvents.of({ publish, publishAll, subscribe })
})
)
}

Consumers each run their own stream and receive every event independently:

import { Effect, Stream } from "effect"
// One subscriber updates a read model; another sends notifications.
// Both see *all* events because PubSub broadcasts.
const projectionWorker = Effect.gen(function*() {
const events = yield* OrderEvents
yield* events.subscribe.pipe(
Stream.tap((event) => Effect.log(`projection: ${event._tag}`)),
Stream.runDrain
)
})
const notificationWorker = Effect.gen(function*() {
const events = yield* OrderEvents
yield* events.subscribe.pipe(
Stream.tap((event) => Effect.log(`notify: ${event._tag}`)),
Stream.runDrain
)
})

The choice comes down to delivery:

| | Queue | PubSub | | --- | --- | --- | | Each message goes to | exactly one consumer | every subscriber | | Typical use | work distribution, job pools | event bus, live fan-out | | Adding a consumer | takes a share of the load | gets the full stream |

If you need both — durable work distribution with delivery guarantees across processes — look at Cluster and the messaging built on top of these primitives. For coordinating a single hand-off of one value between two fibers, a Deferred is lighter than either.