Skip to content

Transactional queues and PubSub

TxQueue, TxPriorityQueue, and TxPubSub are queues and broadcast hubs whose state lives in transactional memory. Every operation returns an Effect, and because the underlying state is a TxRef, you can offer, take, and inspect them atomically alongside any other transactional value. The blocking operations (take, peek) don’t block a thread — they call Effect.txRetry and park the fiber until the queue changes.

If you don’t need transactional composition, reach for the non-transactional Queue and PubSub instead. Use the Tx* variants when a queue operation must commit or retry together with other STM state.

TxQueue<A, E> is a transactional queue with built-in blocking semantics, which makes it a natural fit for producer/consumer pipelines. A consumer fiber simply calls take in a loop: when the queue is empty the transaction retries, so the fiber sleeps cheaply until a producer offers a value — no polling loop required.

import { Effect, Fiber, TxQueue } from "effect"
const program = Effect.gen(function*() {
// Bounded capacity gives natural back-pressure: producers wait when full.
const queue = yield* TxQueue.bounded<number>(16)
// Consumer: take blocks the transaction until an item arrives, so there is
// no polling loop — the fiber sleeps until a producer offers a value.
const consumer = yield* Effect.forkChild(
Effect.forever(
Effect.gen(function*() {
const job = yield* TxQueue.take(queue)
yield* Effect.log(`processing ${job}`)
})
)
)
// Producer: enqueue some work.
yield* TxQueue.offerAll(queue, [1, 2, 3])
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(consumer)
})

The strategy is chosen at construction time and decides what offer does when the queue is full:

ConstructorWhen full, offer
TxQueue.bounded(capacity)retries the transaction (producers wait)
TxQueue.dropping(capacity)returns false, rejecting the new value
TxQueue.sliding(capacity)drops the oldest value to make room, returns true
TxQueue.unbounded()always accepts while open (no capacity limit)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const dropping = yield* TxQueue.dropping<number>(2)
yield* TxQueue.offerAll(dropping, [1, 2]) // fills to capacity
const accepted = yield* TxQueue.offer(dropping, 3)
console.log(accepted) // => false (rejected)
const sliding = yield* TxQueue.sliding<number>(2)
yield* TxQueue.offerAll(sliding, [1, 2])
yield* TxQueue.offer(sliding, 3) // evicts 1
const first = yield* TxQueue.take(sliding)
console.log(first) // => 2
})

A TxQueue moves through three states (its State type):

  1. Open — accepting offers and serving takes normally.
  2. Closing — no new offers accepted, but buffered items are still served until the queue drains.
  3. Done — terminal. Blocking consumers fail with the stored Cause; poll returns Option.none.

You move the queue toward completion with:

  • end — signal a clean finish via Cause.Done (queues whose E includes Cause.Done). Buffered items drain first, then takes fail with Cause.Done.
  • fail / failCause — complete with an error or arbitrary cause. fail discards buffered items and goes straight to Done; failCause enters Closing if items remain.
  • interrupt — complete with the current fiber’s interruption cause, draining buffered items first.
  • shutdown — clear everything immediately and interrupt.

Observe state with isOpen / isClosing / isDone / isShutdown, and wait for the terminal state with awaitCompletion.

import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number, Cause.Done>(8)
yield* TxQueue.offerAll(queue, [1, 2])
// Closing: buffered items can still be taken.
yield* TxQueue.end(queue)
console.log(yield* TxQueue.isClosing(queue)) // => true
console.log(yield* TxQueue.take(queue)) // => 1
console.log(yield* TxQueue.take(queue)) // => 2
// Now drained → Done. take fails with Cause.Done.
const result = yield* Effect.flip(TxQueue.take(queue))
console.log(Cause.isDone(result)) // => true
})

TxQueue<A, E> stores values of type A, exposes write-only TxEnqueue and read-only TxDequeue handles, and can complete or fail with a Cause observed by consumers.

The full queue interface; extends both TxEnqueue and TxDequeue. The E channel defaults to never.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue: TxQueue.TxQueue<number> = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offer(queue, 42)
console.log(yield* TxQueue.take(queue)) // => 42
})

The write-only view of a queue: offer, offerAll, fail, failCause, end, interrupt, shutdown, plus state inspection. Accept this type when a function should only produce.

import { Effect, TxQueue } from "effect"
const produce = (out: TxQueue.TxEnqueue<number>) => TxQueue.offer(out, 1)
const program = Effect.gen(function*() {
const queue = yield* TxQueue.unbounded<number>()
yield* produce(queue) // TxQueue is assignable to TxEnqueue
})

The read-only view of a queue: take, takeAll, takeN, takeBetween, poll, peek, plus state inspection. Accept this type when a function should only consume.

import { Effect, TxQueue } from "effect"
const consume = (input: TxQueue.TxDequeue<number>) => TxQueue.take(input)
const program = Effect.gen(function*() {
const queue = yield* TxQueue.unbounded<number>()
yield* TxQueue.offer(queue, 7)
console.log(yield* consume(queue)) // => 7
})

The lifecycle tag of a queue: { _tag: "Open" }, { _tag: "Closing", cause }, or { _tag: "Done", cause }. You normally inspect it via the isOpen / isClosing / isDone helpers rather than reading it directly.

import type { TxQueue } from "effect"
declare const state: TxQueue.State<string, Error>
if (state._tag === "Open") {
// accepting items
} else {
console.log(state.cause) // => the completion Cause (Closing | Done)
}

Type guard for a full TxQueue.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(1)
console.log(TxQueue.isTxQueue(queue)) // => true
console.log(TxQueue.isTxQueue("nope")) // => false
})

Type guard for a TxEnqueue (write-only) handle.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(1)
console.log(TxQueue.isTxEnqueue(queue)) // => true
})

Type guard for a TxDequeue (read-only) handle.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(1)
console.log(TxQueue.isTxDequeue(queue)) // => true
})

Creates a bounded queue. Producers retry (wait) when the queue is full.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(2)
yield* TxQueue.offer(queue, 1)
console.log(yield* TxQueue.size(queue)) // => 1
})

Creates a queue with unlimited capacity; offer always succeeds while open.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.unbounded<string>()
yield* TxQueue.offerAll(queue, ["a", "b"])
console.log(yield* TxQueue.size(queue)) // => 2
})

Creates a bounded queue that drops new items when full (offer returns false).

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.dropping<number>(1)
yield* TxQueue.offer(queue, 1)
console.log(yield* TxQueue.offer(queue, 2)) // => false
})

Creates a bounded queue that evicts the oldest item when full to accept the new one.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.sliding<number>(1)
yield* TxQueue.offer(queue, 1)
yield* TxQueue.offer(queue, 2) // evicts 1
console.log(yield* TxQueue.take(queue)) // => 2
})

Offers one item, returning true if accepted. Bounded queues retry while full; dropping queues return false; sliding queues evict; closing/done queues return false.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(yield* TxQueue.offer(queue, 42)) // => true
})

Offers many items, returning the array of items that were not accepted.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.dropping<number>(2)
const rejected = yield* TxQueue.offerAll(queue, [1, 2, 3])
console.log(rejected) // => [3]
})

Takes the next item, retrying while empty. Fails with the completion cause if the queue is done.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offer(queue, 1)
console.log(yield* TxQueue.take(queue)) // => 1
})

Takes all buffered items at once, returning a NonEmptyArray. Blocks until at least one item is available.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offerAll(queue, [1, 2, 3])
console.log(yield* TxQueue.takeAll(queue)) // => [1, 2, 3]
})

Takes exactly n items, waiting until that many are available (capped at the queue’s capacity). Closing queues drain what they have.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(5)
yield* TxQueue.offerAll(queue, [1, 2, 3, 4])
console.log(yield* TxQueue.takeN(queue, 4)) // => [1, 2, 3, 4]
})

Takes between min and max items, waiting for at least min on an open queue and taking up to max of what is available.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5, 6])
console.log(yield* TxQueue.takeBetween(queue, 2, 5)) // => [1, 2, 3, 4, 5]
})

Tries to take one item without blocking, returning an Option.

import { Effect, Option, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(Option.isNone(yield* TxQueue.poll(queue))) // => true
yield* TxQueue.offer(queue, 42)
console.log(yield* TxQueue.poll(queue)) // => Option.some(42)
})

Waits for and returns the next item without removing it. Fails with the completion cause if the queue is done.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offer(queue, 42)
console.log(yield* TxQueue.peek(queue)) // => 42
console.log(yield* TxQueue.size(queue)) // => 1 (still there)
})

The number of buffered items.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offerAll(queue, [1, 2, 3])
console.log(yield* TxQueue.size(queue)) // => 3
})

Whether the queue currently has no buffered items.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(yield* TxQueue.isEmpty(queue)) // => true
})

Whether the queue is at capacity (always false for unbounded queues).

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(2)
yield* TxQueue.offerAll(queue, [1, 2])
console.log(yield* TxQueue.isFull(queue)) // => true
})

Whether the queue is in the Open state.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(yield* TxQueue.isOpen(queue)) // => true
yield* TxQueue.interrupt(queue)
console.log(yield* TxQueue.isOpen(queue)) // => false
})

Whether the queue is Closing (draining buffered items before completion).

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offer(queue, 1)
yield* TxQueue.interrupt(queue) // items remain → Closing
console.log(yield* TxQueue.isClosing(queue)) // => true
})

Whether the queue has reached the terminal Done state.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.interrupt(queue) // empty → Done immediately
console.log(yield* TxQueue.isDone(queue)) // => true
})

Alias for isDone, kept for compatibility.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.shutdown(queue)
console.log(yield* TxQueue.isShutdown(queue)) // => true
})

Fails the queue with an error, discarding buffered items and going straight to Done. Returns false if already closing/done.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number, string>(10)
console.log(yield* TxQueue.fail(queue, "connection lost")) // => true
const err = yield* Effect.flip(TxQueue.take(queue))
console.log(err) // => "connection lost"
})

Completes the queue with an arbitrary Cause. Enters Closing if items remain so they can drain, otherwise goes directly to Done.

import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(yield* TxQueue.failCause(queue, Cause.interrupt())) // => true
})

Convenience wrapper that completes a queue with Cause.Done (the E channel must include Cause.Done). Buffered items drain first; later takes fail with Cause.Done.

import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number, Cause.Done>(10)
console.log(yield* TxQueue.end(queue)) // => true
const result = yield* Effect.flip(TxQueue.take(queue))
console.log(Cause.isDone(result)) // => true
})

Removes and returns all currently buffered items without changing the lifecycle state. Returns [] if the queue is done via Cause.Done; propagates other causes.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offerAll(queue, [1, 2, 3])
console.log(yield* TxQueue.clear(queue)) // => [1, 2, 3]
console.log(yield* TxQueue.size(queue)) // => 0
})

Completes the queue with the current fiber’s interruption cause, draining buffered items first. Returns false if already closing/done.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
console.log(yield* TxQueue.interrupt(queue)) // => true
})

Clears all items and interrupts the queue immediately.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* TxQueue.offerAll(queue, [1, 2, 3])
yield* TxQueue.shutdown(queue)
console.log(yield* TxQueue.size(queue)) // => 0
})

Retries until the queue reaches Done, then succeeds. Useful for waiting on another fiber to finish producing.

import { Effect, TxQueue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* TxQueue.bounded<number>(10)
yield* Effect.forkChild(
Effect.delay(TxQueue.interrupt(queue), "50 millis")
)
yield* TxQueue.awaitCompletion(queue)
console.log("queue completed")
})

TxPriorityQueue<A> keeps elements ordered by an Order<A> supplied at construction. take and peek return the smallest element by that order, so Order.Number dequeues lower numbers first — reverse the order if larger values should have higher priority. Unlike TxQueue, it has no capacity or lifecycle; it is a pure ordered multiset (equal values are kept, not merged).

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.take(pq)) // => 1
console.log(yield* TxPriorityQueue.take(pq)) // => 2
// Highest-first: flip the order.
const maxFirst = yield* TxPriorityQueue.empty<number>(
Order.flip(Order.Number)
)
yield* TxPriorityQueue.offerAll(maxFirst, [1, 5, 3])
console.log(yield* TxPriorityQueue.take(maxFirst)) // => 5
})

A transactional priority queue backed by a sorted chunk and an Order<A>.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq: TxPriorityQueue.TxPriorityQueue<number> =
yield* TxPriorityQueue.empty<number>(Order.Number)
yield* TxPriorityQueue.offer(pq, 1)
})

Creates an empty priority queue with the given Order.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true
})

Creates a priority queue from variadic elements. Curried on the Order.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.make(Order.Number)(3, 1, 2)
console.log(yield* TxPriorityQueue.take(pq)) // => 1
})

Creates a priority queue from an iterable, sorting on construction.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]
})

Type guard for a TxPriorityQueue.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
console.log(TxPriorityQueue.isTxPriorityQueue(pq)) // => true
})

Inserts a value into its sorted position.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
yield* TxPriorityQueue.offer(pq, 2)
yield* TxPriorityQueue.offer(pq, 1)
console.log(yield* TxPriorityQueue.take(pq)) // => 1
})

Inserts every element of an iterable.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
yield* TxPriorityQueue.offerAll(pq, [3, 1, 2])
console.log(yield* TxPriorityQueue.take(pq)) // => 1
})

Takes the smallest element, retrying while the queue is empty.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.take(pq)) // => 1
})

Tries to take the smallest element, returning Option.none when empty (no retry).

import { Effect, Option, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
console.log(Option.isNone(yield* TxPriorityQueue.takeOption(pq))) // => true
})

Takes every element in priority order.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 2, 3]
})

Takes up to n elements in priority order.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [5, 3, 1, 4, 2])
console.log(yield* TxPriorityQueue.takeUpTo(pq, 2)) // => [1, 2]
})

Observes the smallest element without removing it, retrying while empty.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.peek(pq)) // => 1
})

Observes the smallest element without removing it, returning Option.none when empty.

import { Effect, Option, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
console.log(Option.isNone(yield* TxPriorityQueue.peekOption(pq))) // => true
})

Removes every element matching the predicate.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5])
yield* TxPriorityQueue.removeIf(pq, (n) => n % 2 === 0)
console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 3, 5]
})

Keeps only the elements matching the predicate.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5])
yield* TxPriorityQueue.retainIf(pq, (n) => n % 2 === 0)
console.log(yield* TxPriorityQueue.takeAll(pq)) // => [2, 4]
})

The number of elements in the queue.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3])
console.log(yield* TxPriorityQueue.size(pq)) // => 3
})

Whether the queue has no elements.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true
})

Whether the queue has at least one element.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1])
console.log(yield* TxPriorityQueue.isNonEmpty(pq)) // => true
})

Returns all elements in priority order without removing them.

import { Effect, Order, TxPriorityQueue } from "effect"
const program = Effect.gen(function*() {
const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]
})

TxPubSub<A> is a transactional broadcast hub. Each subscriber owns its own TxQueue, and a publish offers the value to every currently-registered subscriber queue in one atomic transaction. Subscribers only receive values published after they subscribe. subscribe is Scope-managed: the subscriber queue is registered on acquire and removed (and shut down) when the scope closes.

The hub’s strategy (bounded, dropping, sliding, unbounded) is applied to each subscriber queue, so it controls back-pressure per subscriber.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped(
Effect.gen(function*() {
const sub1 = yield* TxPubSub.subscribe(hub)
const sub2 = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, "broadcast")
console.log(yield* TxQueue.take(sub1)) // => "broadcast"
console.log(yield* TxQueue.take(sub2)) // => "broadcast"
})
)
})

The hub interface. Carries the broadcast strategy and capacity.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub: TxPubSub.TxPubSub<number> = yield* TxPubSub.bounded<number>(16)
})

Creates a hub where a full subscriber queue makes publishers retry until space is available (back-pressure).

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.bounded<number>(16)
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, 42)
console.log(yield* TxQueue.take(sub)) // => 42
})
)
})

Creates a hub where a full subscriber queue drops the new message; publish returns false.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.dropping<number>(2)
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, 1)
yield* TxPubSub.publish(hub, 2)
console.log(yield* TxPubSub.publish(hub, 3)) // => false (dropped)
})
)
})

Creates a hub where a full subscriber queue evicts its oldest message to keep the newest.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.sliding<number>(2)
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publishAll(hub, [1, 2, 3]) // evicts 1
console.log(yield* TxQueue.take(sub)) // => 2
})
)
})

Creates a hub whose subscriber queues grow without bound; messages are always accepted.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, "msg")
console.log(yield* TxQueue.take(sub)) // => "msg"
})
)
})

Type guard for a TxPubSub.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
console.log(TxPubSub.isTxPubSub(hub)) // => true
})

Broadcasts one value to all current subscribers. Returns true when delivered to every subscriber, or false if the hub is shut down or the message was dropped for any subscriber. With no subscribers it is a no-op returning true.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<string>()
console.log(yield* TxPubSub.publish(hub, "no one listening")) // => true
})

Broadcasts every message from an iterable. Returns true if all were delivered to all subscribers.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publishAll(hub, [1, 2, 3])
console.log(yield* TxQueue.takeAll(sub)) // => [1, 2, 3]
})
)
})

Subscribes to the hub, returning a Scope-managed TxQueue that receives messages published after subscription. The queue is removed and shut down when the scope closes.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, "hello")
console.log(yield* TxQueue.take(sub)) // => "hello"
})
)
})

The transactional acquire step of subscribe: creates and registers a subscriber queue inside a transaction (requires Effect.Transaction). Use it when registration must be atomic with other Tx operations; pair with releaseSubscriber.

import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub))
yield* TxPubSub.publish(hub, 1)
console.log(yield* TxQueue.take(sub)) // => 1
yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub))
})

The transactional release step of subscribe: removes a manually acquired queue from the hub and shuts it down. Composes inside Effect.tx with other cleanup.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub))
yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub)) // removes + shuts down
})

The configured capacity of the hub (a plain number, not an Effect).

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.bounded<number>(16)
console.log(TxPubSub.capacity(hub)) // => 16
})

The maximum number of pending messages in any single subscriber queue (not the total across queues).

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
yield* Effect.scoped(
Effect.gen(function*() {
yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publishAll(hub, [1, 2])
console.log(yield* TxPubSub.size(hub)) // => 2
})
)
})

Whether all subscriber queues are empty.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
console.log(yield* TxPubSub.isEmpty(hub)) // => true
})

Whether any subscriber queue is at capacity (always false for unbounded hubs).

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.bounded<number>(2)
console.log(yield* TxPubSub.isFull(hub)) // => false
})

Whether the hub has been shut down.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
console.log(yield* TxPubSub.isShutdown(hub)) // => false
yield* TxPubSub.shutdown(hub)
console.log(yield* TxPubSub.isShutdown(hub)) // => true
})

Shuts down the hub and the subscriber queues registered at that moment. After shutdown, publish / publishAll return false. Idempotent.

import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
yield* TxPubSub.shutdown(hub)
console.log(yield* TxPubSub.publish(hub, 1)) // => false
})

Retries until the hub is shut down, then succeeds.

import { Effect, Fiber, TxPubSub } from "effect"
const program = Effect.gen(function*() {
const hub = yield* TxPubSub.unbounded<number>()
const fiber = yield* Effect.forkChild(TxPubSub.awaitShutdown(hub))
yield* TxPubSub.shutdown(hub)
yield* Fiber.await(fiber)
})