Transactional queues and PubSub
TxQueue, TxPriorityQueue, and TxPubSub are queues and broadcast hubs whose
state lives in transactional memory. Every operation returns an Effect, and
because the underlying state is a TxRef,
you can offer, take, and inspect them atomically alongside any other
transactional value. The blocking operations (take, peek) don’t block a
thread — they call Effect.txRetry and park the fiber until the queue changes.
If you don’t need transactional composition, reach for the non-transactional
Queue and PubSub instead. Use the Tx*
variants when a queue operation must commit or retry together with other STM
state.
Producer/consumer with TxQueue
Section titled “Producer/consumer with TxQueue”TxQueue<A, E> is a transactional queue with built-in blocking semantics, which
makes it a natural fit for producer/consumer pipelines. A consumer fiber simply
calls take in a loop: when the queue is empty the transaction retries, so the
fiber sleeps cheaply until a producer offers a value — no polling loop required.
import { Effect, Fiber, TxQueue } from "effect"
const program = Effect.gen(function*() { // Bounded capacity gives natural back-pressure: producers wait when full. const queue = yield* TxQueue.bounded<number>(16)
// Consumer: take blocks the transaction until an item arrives, so there is // no polling loop — the fiber sleeps until a producer offers a value. const consumer = yield* Effect.forkChild( Effect.forever( Effect.gen(function*() { const job = yield* TxQueue.take(queue) yield* Effect.log(`processing ${job}`) }) ) )
// Producer: enqueue some work. yield* TxQueue.offerAll(queue, [1, 2, 3])
yield* Effect.sleep("100 millis") yield* Fiber.interrupt(consumer)})Back-pressure strategies
Section titled “Back-pressure strategies”The strategy is chosen at construction time and decides what offer does when
the queue is full:
| Constructor | When full, offer… |
|---|---|
TxQueue.bounded(capacity) | retries the transaction (producers wait) |
TxQueue.dropping(capacity) | returns false, rejecting the new value |
TxQueue.sliding(capacity) | drops the oldest value to make room, returns true |
TxQueue.unbounded() | always accepts while open (no capacity limit) |
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const dropping = yield* TxQueue.dropping<number>(2) yield* TxQueue.offerAll(dropping, [1, 2]) // fills to capacity const accepted = yield* TxQueue.offer(dropping, 3) console.log(accepted) // => false (rejected)
const sliding = yield* TxQueue.sliding<number>(2) yield* TxQueue.offerAll(sliding, [1, 2]) yield* TxQueue.offer(sliding, 3) // evicts 1 const first = yield* TxQueue.take(sliding) console.log(first) // => 2})Lifecycle and completion
Section titled “Lifecycle and completion”A TxQueue moves through three states (its State type):
- Open — accepting offers and serving takes normally.
- Closing — no new offers accepted, but buffered items are still served until the queue drains.
- Done — terminal. Blocking consumers fail with the stored
Cause;pollreturnsOption.none.
You move the queue toward completion with:
end— signal a clean finish viaCause.Done(queues whoseEincludesCause.Done). Buffered items drain first, then takes fail withCause.Done.fail/failCause— complete with an error or arbitrary cause.faildiscards buffered items and goes straight to Done;failCauseenters Closing if items remain.interrupt— complete with the current fiber’s interruption cause, draining buffered items first.shutdown— clear everything immediately and interrupt.
Observe state with isOpen / isClosing / isDone / isShutdown, and wait
for the terminal state with awaitCompletion.
import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number, Cause.Done>(8) yield* TxQueue.offerAll(queue, [1, 2])
// Closing: buffered items can still be taken. yield* TxQueue.end(queue) console.log(yield* TxQueue.isClosing(queue)) // => true console.log(yield* TxQueue.take(queue)) // => 1 console.log(yield* TxQueue.take(queue)) // => 2
// Now drained → Done. take fails with Cause.Done. const result = yield* Effect.flip(TxQueue.take(queue)) console.log(Cause.isDone(result)) // => true})TxQueue
Section titled “TxQueue”TxQueue<A, E> stores values of type A, exposes write-only TxEnqueue and
read-only TxDequeue handles, and can complete or fail with a Cause observed
by consumers.
TxQueue (type)
Section titled “TxQueue (type)”The full queue interface; extends both TxEnqueue and
TxDequeue. The E channel defaults to never.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue: TxQueue.TxQueue<number> = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 42) console.log(yield* TxQueue.take(queue)) // => 42})TxEnqueue
Section titled “TxEnqueue”The write-only view of a queue: offer, offerAll, fail, failCause, end,
interrupt, shutdown, plus state inspection. Accept this type when a function
should only produce.
import { Effect, TxQueue } from "effect"
const produce = (out: TxQueue.TxEnqueue<number>) => TxQueue.offer(out, 1)
const program = Effect.gen(function*() { const queue = yield* TxQueue.unbounded<number>() yield* produce(queue) // TxQueue is assignable to TxEnqueue})TxDequeue
Section titled “TxDequeue”The read-only view of a queue: take, takeAll, takeN, takeBetween,
poll, peek, plus state inspection. Accept this type when a function should
only consume.
import { Effect, TxQueue } from "effect"
const consume = (input: TxQueue.TxDequeue<number>) => TxQueue.take(input)
const program = Effect.gen(function*() { const queue = yield* TxQueue.unbounded<number>() yield* TxQueue.offer(queue, 7) console.log(yield* consume(queue)) // => 7})The lifecycle tag of a queue: { _tag: "Open" }, { _tag: "Closing", cause },
or { _tag: "Done", cause }. You normally inspect it via the isOpen /
isClosing / isDone helpers rather than reading it directly.
import type { TxQueue } from "effect"
declare const state: TxQueue.State<string, Error>
if (state._tag === "Open") { // accepting items} else { console.log(state.cause) // => the completion Cause (Closing | Done)}isTxQueue
Section titled “isTxQueue”Type guard for a full TxQueue.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(1) console.log(TxQueue.isTxQueue(queue)) // => true console.log(TxQueue.isTxQueue("nope")) // => false})isTxEnqueue
Section titled “isTxEnqueue”Type guard for a TxEnqueue (write-only) handle.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(1) console.log(TxQueue.isTxEnqueue(queue)) // => true})isTxDequeue
Section titled “isTxDequeue”Type guard for a TxDequeue (read-only) handle.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(1) console.log(TxQueue.isTxDequeue(queue)) // => true})bounded
Section titled “bounded”Creates a bounded queue. Producers retry (wait) when the queue is full.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(2) yield* TxQueue.offer(queue, 1) console.log(yield* TxQueue.size(queue)) // => 1})unbounded
Section titled “unbounded”Creates a queue with unlimited capacity; offer always succeeds while open.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.unbounded<string>() yield* TxQueue.offerAll(queue, ["a", "b"]) console.log(yield* TxQueue.size(queue)) // => 2})dropping
Section titled “dropping”Creates a bounded queue that drops new items when full (offer returns
false).
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.dropping<number>(1) yield* TxQueue.offer(queue, 1) console.log(yield* TxQueue.offer(queue, 2)) // => false})sliding
Section titled “sliding”Creates a bounded queue that evicts the oldest item when full to accept the new one.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.sliding<number>(1) yield* TxQueue.offer(queue, 1) yield* TxQueue.offer(queue, 2) // evicts 1 console.log(yield* TxQueue.take(queue)) // => 2})Offers one item, returning true if accepted. Bounded queues retry while full;
dropping queues return false; sliding queues evict; closing/done queues return
false.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(yield* TxQueue.offer(queue, 42)) // => true})offerAll
Section titled “offerAll”Offers many items, returning the array of items that were not accepted.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.dropping<number>(2) const rejected = yield* TxQueue.offerAll(queue, [1, 2, 3]) console.log(rejected) // => [3]})Takes the next item, retrying while empty. Fails with the completion cause if the queue is done.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 1) console.log(yield* TxQueue.take(queue)) // => 1})takeAll
Section titled “takeAll”Takes all buffered items at once, returning a NonEmptyArray. Blocks until at
least one item is available.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3]) console.log(yield* TxQueue.takeAll(queue)) // => [1, 2, 3]})Takes exactly n items, waiting until that many are available (capped at the
queue’s capacity). Closing queues drain what they have.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(5) yield* TxQueue.offerAll(queue, [1, 2, 3, 4]) console.log(yield* TxQueue.takeN(queue, 4)) // => [1, 2, 3, 4]})takeBetween
Section titled “takeBetween”Takes between min and max items, waiting for at least min on an open
queue and taking up to max of what is available.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5, 6]) console.log(yield* TxQueue.takeBetween(queue, 2, 5)) // => [1, 2, 3, 4, 5]})Tries to take one item without blocking, returning an Option.
import { Effect, Option, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(Option.isNone(yield* TxQueue.poll(queue))) // => true yield* TxQueue.offer(queue, 42) console.log(yield* TxQueue.poll(queue)) // => Option.some(42)})Waits for and returns the next item without removing it. Fails with the completion cause if the queue is done.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 42) console.log(yield* TxQueue.peek(queue)) // => 42 console.log(yield* TxQueue.size(queue)) // => 1 (still there)})The number of buffered items.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3]) console.log(yield* TxQueue.size(queue)) // => 3})isEmpty
Section titled “isEmpty”Whether the queue currently has no buffered items.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(yield* TxQueue.isEmpty(queue)) // => true})isFull
Section titled “isFull”Whether the queue is at capacity (always false for unbounded queues).
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(2) yield* TxQueue.offerAll(queue, [1, 2]) console.log(yield* TxQueue.isFull(queue)) // => true})isOpen
Section titled “isOpen”Whether the queue is in the Open state.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(yield* TxQueue.isOpen(queue)) // => true yield* TxQueue.interrupt(queue) console.log(yield* TxQueue.isOpen(queue)) // => false})isClosing
Section titled “isClosing”Whether the queue is Closing (draining buffered items before completion).
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 1) yield* TxQueue.interrupt(queue) // items remain → Closing console.log(yield* TxQueue.isClosing(queue)) // => true})isDone
Section titled “isDone”Whether the queue has reached the terminal Done state.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.interrupt(queue) // empty → Done immediately console.log(yield* TxQueue.isDone(queue)) // => true})isShutdown
Section titled “isShutdown”Alias for isDone, kept for compatibility.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.shutdown(queue) console.log(yield* TxQueue.isShutdown(queue)) // => true})Fails the queue with an error, discarding buffered items and going straight to
Done. Returns false if already closing/done.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number, string>(10) console.log(yield* TxQueue.fail(queue, "connection lost")) // => true const err = yield* Effect.flip(TxQueue.take(queue)) console.log(err) // => "connection lost"})failCause
Section titled “failCause”Completes the queue with an arbitrary Cause. Enters Closing if items
remain so they can drain, otherwise goes directly to Done.
import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(yield* TxQueue.failCause(queue, Cause.interrupt())) // => true})Convenience wrapper that completes a queue with Cause.Done (the E channel
must include Cause.Done). Buffered items drain first; later takes fail with
Cause.Done.
import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number, Cause.Done>(10) console.log(yield* TxQueue.end(queue)) // => true const result = yield* Effect.flip(TxQueue.take(queue)) console.log(Cause.isDone(result)) // => true})Removes and returns all currently buffered items without changing the lifecycle
state. Returns [] if the queue is done via Cause.Done; propagates other
causes.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3]) console.log(yield* TxQueue.clear(queue)) // => [1, 2, 3] console.log(yield* TxQueue.size(queue)) // => 0})interrupt
Section titled “interrupt”Completes the queue with the current fiber’s interruption cause, draining
buffered items first. Returns false if already closing/done.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) console.log(yield* TxQueue.interrupt(queue)) // => true})shutdown
Section titled “shutdown”Clears all items and interrupts the queue immediately.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3]) yield* TxQueue.shutdown(queue) console.log(yield* TxQueue.size(queue)) // => 0})awaitCompletion
Section titled “awaitCompletion”Retries until the queue reaches Done, then succeeds. Useful for waiting on another fiber to finish producing.
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() { const queue = yield* TxQueue.bounded<number>(10) yield* Effect.forkChild( Effect.delay(TxQueue.interrupt(queue), "50 millis") ) yield* TxQueue.awaitCompletion(queue) console.log("queue completed")})TxPriorityQueue
Section titled “TxPriorityQueue”TxPriorityQueue<A> keeps elements ordered by an Order<A> supplied at
construction. take and peek return the smallest element by that order,
so Order.Number dequeues lower numbers first — reverse the order if larger
values should have higher priority. Unlike TxQueue, it has no capacity or
lifecycle; it is a pure ordered multiset (equal values are kept, not merged).
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.take(pq)) // => 1 console.log(yield* TxPriorityQueue.take(pq)) // => 2
// Highest-first: flip the order. const maxFirst = yield* TxPriorityQueue.empty<number>( Order.flip(Order.Number) ) yield* TxPriorityQueue.offerAll(maxFirst, [1, 5, 3]) console.log(yield* TxPriorityQueue.take(maxFirst)) // => 5})TxPriorityQueue (type)
Section titled “TxPriorityQueue (type)”A transactional priority queue backed by a sorted chunk and an Order<A>.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq: TxPriorityQueue.TxPriorityQueue<number> = yield* TxPriorityQueue.empty<number>(Order.Number) yield* TxPriorityQueue.offer(pq, 1)})Creates an empty priority queue with the given Order.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true})Creates a priority queue from variadic elements. Curried on the Order.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.make(Order.Number)(3, 1, 2) console.log(yield* TxPriorityQueue.take(pq)) // => 1})fromIterable
Section titled “fromIterable”Creates a priority queue from an iterable, sorting on construction.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]})isTxPriorityQueue
Section titled “isTxPriorityQueue”Type guard for a TxPriorityQueue.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) console.log(TxPriorityQueue.isTxPriorityQueue(pq)) // => true})Inserts a value into its sorted position.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) yield* TxPriorityQueue.offer(pq, 2) yield* TxPriorityQueue.offer(pq, 1) console.log(yield* TxPriorityQueue.take(pq)) // => 1})offerAll
Section titled “offerAll”Inserts every element of an iterable.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) yield* TxPriorityQueue.offerAll(pq, [3, 1, 2]) console.log(yield* TxPriorityQueue.take(pq)) // => 1})Takes the smallest element, retrying while the queue is empty.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.take(pq)) // => 1})takeOption
Section titled “takeOption”Tries to take the smallest element, returning Option.none when empty (no
retry).
import { Effect, Option, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) console.log(Option.isNone(yield* TxPriorityQueue.takeOption(pq))) // => true})takeAll
Section titled “takeAll”Takes every element in priority order.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 2, 3]})takeUpTo
Section titled “takeUpTo”Takes up to n elements in priority order.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [5, 3, 1, 4, 2]) console.log(yield* TxPriorityQueue.takeUpTo(pq, 2)) // => [1, 2]})Observes the smallest element without removing it, retrying while empty.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.peek(pq)) // => 1})peekOption
Section titled “peekOption”Observes the smallest element without removing it, returning Option.none when
empty.
import { Effect, Option, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) console.log(Option.isNone(yield* TxPriorityQueue.peekOption(pq))) // => true})removeIf
Section titled “removeIf”Removes every element matching the predicate.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5]) yield* TxPriorityQueue.removeIf(pq, (n) => n % 2 === 0) console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 3, 5]})retainIf
Section titled “retainIf”Keeps only the elements matching the predicate.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5]) yield* TxPriorityQueue.retainIf(pq, (n) => n % 2 === 0) console.log(yield* TxPriorityQueue.takeAll(pq)) // => [2, 4]})The number of elements in the queue.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3]) console.log(yield* TxPriorityQueue.size(pq)) // => 3})isEmpty
Section titled “isEmpty”Whether the queue has no elements.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.empty<number>(Order.Number) console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true})isNonEmpty
Section titled “isNonEmpty”Whether the queue has at least one element.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1]) console.log(yield* TxPriorityQueue.isNonEmpty(pq)) // => true})toArray
Section titled “toArray”Returns all elements in priority order without removing them.
import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() { const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2]) console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]})TxPubSub
Section titled “TxPubSub”TxPubSub<A> is a transactional broadcast hub. Each subscriber owns its own
TxQueue, and a publish offers the value to every currently-registered
subscriber queue in one atomic transaction. Subscribers only receive values
published after they subscribe. subscribe is Scope-managed: the
subscriber queue is registered on acquire and removed (and shut down) when the
scope closes.
The hub’s strategy (bounded, dropping, sliding, unbounded) is applied to
each subscriber queue, so it controls back-pressure per subscriber.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped( Effect.gen(function*() { const sub1 = yield* TxPubSub.subscribe(hub) const sub2 = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, "broadcast")
console.log(yield* TxQueue.take(sub1)) // => "broadcast" console.log(yield* TxQueue.take(sub2)) // => "broadcast" }) )})TxPubSub (type)
Section titled “TxPubSub (type)”The hub interface. Carries the broadcast strategy and capacity.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub: TxPubSub.TxPubSub<number> = yield* TxPubSub.bounded<number>(16)})bounded
Section titled “bounded”Creates a hub where a full subscriber queue makes publishers retry until space is available (back-pressure).
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.bounded<number>(16) yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 42) console.log(yield* TxQueue.take(sub)) // => 42 }) )})dropping
Section titled “dropping”Creates a hub where a full subscriber queue drops the new message; publish
returns false.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.dropping<number>(2) yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 1) yield* TxPubSub.publish(hub, 2) console.log(yield* TxPubSub.publish(hub, 3)) // => false (dropped) }) )})sliding
Section titled “sliding”Creates a hub where a full subscriber queue evicts its oldest message to keep the newest.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.sliding<number>(2) yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publishAll(hub, [1, 2, 3]) // evicts 1 console.log(yield* TxQueue.take(sub)) // => 2 }) )})unbounded
Section titled “unbounded”Creates a hub whose subscriber queues grow without bound; messages are always accepted.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<string>() yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, "msg") console.log(yield* TxQueue.take(sub)) // => "msg" }) )})isTxPubSub
Section titled “isTxPubSub”Type guard for a TxPubSub.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() console.log(TxPubSub.isTxPubSub(hub)) // => true})publish
Section titled “publish”Broadcasts one value to all current subscribers. Returns true when delivered
to every subscriber, or false if the hub is shut down or the message was
dropped for any subscriber. With no subscribers it is a no-op returning true.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<string>() console.log(yield* TxPubSub.publish(hub, "no one listening")) // => true})publishAll
Section titled “publishAll”Broadcasts every message from an iterable. Returns true if all were delivered
to all subscribers.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publishAll(hub, [1, 2, 3]) console.log(yield* TxQueue.takeAll(sub)) // => [1, 2, 3] }) )})subscribe
Section titled “subscribe”Subscribes to the hub, returning a Scope-managed TxQueue that receives
messages published after subscription. The queue is removed and shut down when
the scope closes.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<string>() yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, "hello") console.log(yield* TxQueue.take(sub)) // => "hello" }) )})acquireSubscriber
Section titled “acquireSubscriber”The transactional acquire step of subscribe: creates and registers a
subscriber queue inside a transaction (requires Effect.Transaction). Use it
when registration must be atomic with other Tx operations; pair with
releaseSubscriber.
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub)) yield* TxPubSub.publish(hub, 1) console.log(yield* TxQueue.take(sub)) // => 1 yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub))})releaseSubscriber
Section titled “releaseSubscriber”The transactional release step of subscribe: removes a manually acquired queue
from the hub and shuts it down. Composes inside Effect.tx with other cleanup.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub)) yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub)) // removes + shuts down})capacity
Section titled “capacity”The configured capacity of the hub (a plain number, not an Effect).
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.bounded<number>(16) console.log(TxPubSub.capacity(hub)) // => 16})The maximum number of pending messages in any single subscriber queue (not the total across queues).
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() yield* Effect.scoped( Effect.gen(function*() { yield* TxPubSub.subscribe(hub) yield* TxPubSub.publishAll(hub, [1, 2]) console.log(yield* TxPubSub.size(hub)) // => 2 }) )})isEmpty
Section titled “isEmpty”Whether all subscriber queues are empty.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() console.log(yield* TxPubSub.isEmpty(hub)) // => true})isFull
Section titled “isFull”Whether any subscriber queue is at capacity (always false for unbounded
hubs).
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.bounded<number>(2) console.log(yield* TxPubSub.isFull(hub)) // => false})isShutdown
Section titled “isShutdown”Whether the hub has been shut down.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() console.log(yield* TxPubSub.isShutdown(hub)) // => false yield* TxPubSub.shutdown(hub) console.log(yield* TxPubSub.isShutdown(hub)) // => true})shutdown
Section titled “shutdown”Shuts down the hub and the subscriber queues registered at that moment. After
shutdown, publish / publishAll return false. Idempotent.
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() yield* TxPubSub.shutdown(hub) console.log(yield* TxPubSub.publish(hub, 1)) // => false})awaitShutdown
Section titled “awaitShutdown”Retries until the hub is shut down, then succeeds.
import { Effect, Fiber, TxPubSub } from "effect"
const program = Effect.gen(function*() { const hub = yield* TxPubSub.unbounded<number>() const fiber = yield* Effect.forkChild(TxPubSub.awaitShutdown(hub)) yield* TxPubSub.shutdown(hub) yield* Fiber.await(fiber)})See also
Section titled “See also”- Transactional state —
TxRef, the atom of STM that backs these structures. - Transactional data structures
—
TxHashMap,TxChunk,TxSubscriptionRef, and friends. - Queue and PubSub — the non-transactional
QueueandPubSubfor general fiber communication.