# Transactional queues and PubSub

`TxQueue`, `TxPriorityQueue`, and `TxPubSub` are queues and broadcast hubs whose
state lives in transactional memory. Every operation returns an `Effect`, and
because the underlying state is a [`TxRef`](https://effect.plants.sh/transactions/transactional-state/),
you can offer, take, and inspect them atomically alongside any other
transactional value. The blocking operations (`take`, `peek`) don't block a
thread — they call `Effect.txRetry` and park the fiber until the queue changes.

If you don't need transactional composition, reach for the non-transactional
[`Queue` and `PubSub`](https://effect.plants.sh/concurrency/queue-and-pubsub/) instead. Use the `Tx*`
variants when a queue operation must commit or retry together with other STM
state.

## Producer/consumer with TxQueue

`TxQueue<A, E>` is a transactional queue with built-in blocking semantics, which
makes it a natural fit for producer/consumer pipelines. A consumer fiber simply
calls `take` in a loop: when the queue is empty the transaction retries, so the
fiber sleeps cheaply until a producer offers a value — no polling loop required.

```ts
import { Effect, Fiber, TxQueue } from "effect"

const program = Effect.gen(function*() {
  // Bounded capacity gives natural back-pressure: producers wait when full.
  const queue = yield* TxQueue.bounded<number>(16)

  // Consumer: take blocks the transaction until an item arrives, so there is
  // no polling loop — the fiber sleeps until a producer offers a value.
  const consumer = yield* Effect.forkChild(
    Effect.forever(
      Effect.gen(function*() {
        const job = yield* TxQueue.take(queue)
        yield* Effect.log(`processing ${job}`)
      })
    )
  )

  // Producer: enqueue some work.
  yield* TxQueue.offerAll(queue, [1, 2, 3])

  yield* Effect.sleep("100 millis")
  yield* Fiber.interrupt(consumer)
})
```
**Blocking is just txRetry:** `take` and `peek` are "blocking" only in transactional terms: under the hood
they call `Effect.txRetry`, so the fiber parks cheaply and wakes when the queue
changes. There is no thread blocking and no busy-wait. Use `poll` (which returns
an `Option`) when absence should be immediate.

### Back-pressure strategies

The strategy is chosen at construction time and decides what `offer` does when
the queue is full:

| Constructor | When full, `offer`… |
| --- | --- |
| `TxQueue.bounded(capacity)` | **retries** the transaction (producers wait) |
| `TxQueue.dropping(capacity)` | returns `false`, rejecting the new value |
| `TxQueue.sliding(capacity)` | drops the **oldest** value to make room, returns `true` |
| `TxQueue.unbounded()` | always accepts while open (no capacity limit) |

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const dropping = yield* TxQueue.dropping<number>(2)
  yield* TxQueue.offerAll(dropping, [1, 2]) // fills to capacity
  const accepted = yield* TxQueue.offer(dropping, 3)
  console.log(accepted) // => false (rejected)

  const sliding = yield* TxQueue.sliding<number>(2)
  yield* TxQueue.offerAll(sliding, [1, 2])
  yield* TxQueue.offer(sliding, 3) // evicts 1
  const first = yield* TxQueue.take(sliding)
  console.log(first) // => 2
})
```

### Lifecycle and completion

A `TxQueue` moves through three states (its [`State`](#state) type):

1. **Open** — accepting offers and serving takes normally.
2. **Closing** — no new offers accepted, but buffered items are still served
   until the queue drains.
3. **Done** — terminal. Blocking consumers fail with the stored `Cause`; `poll`
   returns `Option.none`.

You move the queue toward completion with:

- `end` — signal a clean finish via `Cause.Done` (queues whose `E` includes
  `Cause.Done`). Buffered items drain first, then takes fail with `Cause.Done`.
- `fail` / `failCause` — complete with an error or arbitrary cause. `fail`
  discards buffered items and goes straight to **Done**; `failCause` enters
  **Closing** if items remain.
- `interrupt` — complete with the current fiber's interruption cause, draining
  buffered items first.
- `shutdown` — clear everything immediately and interrupt.

Observe state with `isOpen` / `isClosing` / `isDone` / `isShutdown`, and wait
for the terminal state with `awaitCompletion`.

```ts
import { Cause, Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number, Cause.Done>(8)
  yield* TxQueue.offerAll(queue, [1, 2])

  // Closing: buffered items can still be taken.
  yield* TxQueue.end(queue)
  console.log(yield* TxQueue.isClosing(queue)) // => true
  console.log(yield* TxQueue.take(queue)) // => 1
  console.log(yield* TxQueue.take(queue)) // => 2

  // Now drained → Done. take fails with Cause.Done.
  const result = yield* Effect.flip(TxQueue.take(queue))
  console.log(Cause.isDone(result)) // => true
})
```
**Producer-only and consumer-only views:** A `TxQueue<A, E>` is both a [`TxEnqueue`](#txenqueue) (the write-only interface:
`offer`, `offerAll`, `fail`, `end`, …) and a [`TxDequeue`](#txdequeue) (the
read-only interface: `take`, `poll`, `peek`, …). Accept a `TxEnqueue` parameter
for producers and a `TxDequeue` parameter for consumers to make the data-flow
direction explicit in your types.

## TxQueue

`TxQueue<A, E>` stores values of type `A`, exposes write-only `TxEnqueue` and
read-only `TxDequeue` handles, and can complete or fail with a `Cause` observed
by consumers.

### TxQueue (type)

The full queue interface; extends both [`TxEnqueue`](#txenqueue) and
[`TxDequeue`](#txdequeue). The `E` channel defaults to `never`.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue: TxQueue.TxQueue<number> = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offer(queue, 42)
  console.log(yield* TxQueue.take(queue)) // => 42
})
```

### TxEnqueue

The write-only view of a queue: `offer`, `offerAll`, `fail`, `failCause`, `end`,
`interrupt`, `shutdown`, plus state inspection. Accept this type when a function
should only produce.

```ts
import { Effect, TxQueue } from "effect"

const produce = (out: TxQueue.TxEnqueue<number>) => TxQueue.offer(out, 1)

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.unbounded<number>()
  yield* produce(queue) // TxQueue is assignable to TxEnqueue
})
```

### TxDequeue

The read-only view of a queue: `take`, `takeAll`, `takeN`, `takeBetween`,
`poll`, `peek`, plus state inspection. Accept this type when a function should
only consume.

```ts
import { Effect, TxQueue } from "effect"

const consume = (input: TxQueue.TxDequeue<number>) => TxQueue.take(input)

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.unbounded<number>()
  yield* TxQueue.offer(queue, 7)
  console.log(yield* consume(queue)) // => 7
})
```

### State

The lifecycle tag of a queue: `{ _tag: "Open" }`, `{ _tag: "Closing", cause }`,
or `{ _tag: "Done", cause }`. You normally inspect it via the `isOpen` /
`isClosing` / `isDone` helpers rather than reading it directly.

```ts
import type { TxQueue } from "effect"

declare const state: TxQueue.State<string, Error>

if (state._tag === "Open") {
  // accepting items
} else {
  console.log(state.cause) // => the completion Cause (Closing | Done)
}
```

### isTxQueue

Type guard for a full `TxQueue`.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(1)
  console.log(TxQueue.isTxQueue(queue)) // => true
  console.log(TxQueue.isTxQueue("nope")) // => false
})
```

### isTxEnqueue

Type guard for a `TxEnqueue` (write-only) handle.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(1)
  console.log(TxQueue.isTxEnqueue(queue)) // => true
})
```

### isTxDequeue

Type guard for a `TxDequeue` (read-only) handle.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(1)
  console.log(TxQueue.isTxDequeue(queue)) // => true
})
```

### bounded

Creates a bounded queue. Producers **retry** (wait) when the queue is full.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(2)
  yield* TxQueue.offer(queue, 1)
  console.log(yield* TxQueue.size(queue)) // => 1
})
```

### unbounded

Creates a queue with unlimited capacity; `offer` always succeeds while open.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.unbounded<string>()
  yield* TxQueue.offerAll(queue, ["a", "b"])
  console.log(yield* TxQueue.size(queue)) // => 2
})
```

### dropping

Creates a bounded queue that drops new items when full (`offer` returns
`false`).

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.dropping<number>(1)
  yield* TxQueue.offer(queue, 1)
  console.log(yield* TxQueue.offer(queue, 2)) // => false
})
```

### sliding

Creates a bounded queue that evicts the oldest item when full to accept the new
one.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.sliding<number>(1)
  yield* TxQueue.offer(queue, 1)
  yield* TxQueue.offer(queue, 2) // evicts 1
  console.log(yield* TxQueue.take(queue)) // => 2
})
```

### offer

Offers one item, returning `true` if accepted. Bounded queues retry while full;
dropping queues return `false`; sliding queues evict; closing/done queues return
`false`.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(yield* TxQueue.offer(queue, 42)) // => true
})
```

### offerAll

Offers many items, returning the array of items that were **not** accepted.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.dropping<number>(2)
  const rejected = yield* TxQueue.offerAll(queue, [1, 2, 3])
  console.log(rejected) // => [3]
})
```

### take

Takes the next item, retrying while empty. Fails with the completion cause if
the queue is done.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offer(queue, 1)
  console.log(yield* TxQueue.take(queue)) // => 1
})
```

### takeAll

Takes all buffered items at once, returning a `NonEmptyArray`. Blocks until at
least one item is available.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offerAll(queue, [1, 2, 3])
  console.log(yield* TxQueue.takeAll(queue)) // => [1, 2, 3]
})
```

### takeN

Takes exactly `n` items, waiting until that many are available (capped at the
queue's capacity). Closing queues drain what they have.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(5)
  yield* TxQueue.offerAll(queue, [1, 2, 3, 4])
  console.log(yield* TxQueue.takeN(queue, 4)) // => [1, 2, 3, 4]
})
```

### takeBetween

Takes between `min` and `max` items, waiting for at least `min` on an open
queue and taking up to `max` of what is available.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5, 6])
  console.log(yield* TxQueue.takeBetween(queue, 2, 5)) // => [1, 2, 3, 4, 5]
})
```

### poll

Tries to take one item without blocking, returning an `Option`.

```ts
import { Effect, Option, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(Option.isNone(yield* TxQueue.poll(queue))) // => true
  yield* TxQueue.offer(queue, 42)
  console.log(yield* TxQueue.poll(queue)) // => Option.some(42)
})
```

### peek

Waits for and returns the next item **without** removing it. Fails with the
completion cause if the queue is done.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offer(queue, 42)
  console.log(yield* TxQueue.peek(queue)) // => 42
  console.log(yield* TxQueue.size(queue)) // => 1 (still there)
})
```

### size

The number of buffered items.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offerAll(queue, [1, 2, 3])
  console.log(yield* TxQueue.size(queue)) // => 3
})
```

### isEmpty

Whether the queue currently has no buffered items.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(yield* TxQueue.isEmpty(queue)) // => true
})
```

### isFull

Whether the queue is at capacity (always `false` for unbounded queues).

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(2)
  yield* TxQueue.offerAll(queue, [1, 2])
  console.log(yield* TxQueue.isFull(queue)) // => true
})
```

### isOpen

Whether the queue is in the **Open** state.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(yield* TxQueue.isOpen(queue)) // => true
  yield* TxQueue.interrupt(queue)
  console.log(yield* TxQueue.isOpen(queue)) // => false
})
```

### isClosing

Whether the queue is **Closing** (draining buffered items before completion).

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offer(queue, 1)
  yield* TxQueue.interrupt(queue) // items remain → Closing
  console.log(yield* TxQueue.isClosing(queue)) // => true
})
```

### isDone

Whether the queue has reached the terminal **Done** state.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.interrupt(queue) // empty → Done immediately
  console.log(yield* TxQueue.isDone(queue)) // => true
})
```

### isShutdown

Alias for `isDone`, kept for compatibility.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.shutdown(queue)
  console.log(yield* TxQueue.isShutdown(queue)) // => true
})
```

### fail

Fails the queue with an error, discarding buffered items and going straight to
**Done**. Returns `false` if already closing/done.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number, string>(10)
  console.log(yield* TxQueue.fail(queue, "connection lost")) // => true
  const err = yield* Effect.flip(TxQueue.take(queue))
  console.log(err) // => "connection lost"
})
```

### failCause

Completes the queue with an arbitrary `Cause`. Enters **Closing** if items
remain so they can drain, otherwise goes directly to **Done**.

```ts
import { Cause, Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(yield* TxQueue.failCause(queue, Cause.interrupt())) // => true
})
```

### end

Convenience wrapper that completes a queue with `Cause.Done` (the `E` channel
must include `Cause.Done`). Buffered items drain first; later takes fail with
`Cause.Done`.

```ts
import { Cause, Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number, Cause.Done>(10)
  console.log(yield* TxQueue.end(queue)) // => true
  const result = yield* Effect.flip(TxQueue.take(queue))
  console.log(Cause.isDone(result)) // => true
})
```

### clear

Removes and returns all currently buffered items without changing the lifecycle
state. Returns `[]` if the queue is done via `Cause.Done`; propagates other
causes.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offerAll(queue, [1, 2, 3])
  console.log(yield* TxQueue.clear(queue)) // => [1, 2, 3]
  console.log(yield* TxQueue.size(queue)) // => 0
})
```

### interrupt

Completes the queue with the current fiber's interruption cause, draining
buffered items first. Returns `false` if already closing/done.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  console.log(yield* TxQueue.interrupt(queue)) // => true
})
```

### shutdown

Clears all items and interrupts the queue immediately.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* TxQueue.offerAll(queue, [1, 2, 3])
  yield* TxQueue.shutdown(queue)
  console.log(yield* TxQueue.size(queue)) // => 0
})
```

### awaitCompletion

Retries until the queue reaches **Done**, then succeeds. Useful for waiting on
another fiber to finish producing.

```ts
import { Effect, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* TxQueue.bounded<number>(10)
  yield* Effect.forkChild(
    Effect.delay(TxQueue.interrupt(queue), "50 millis")
  )
  yield* TxQueue.awaitCompletion(queue)
  console.log("queue completed")
})
```

## TxPriorityQueue

`TxPriorityQueue<A>` keeps elements ordered by an `Order<A>` supplied at
construction. `take` and `peek` return the **smallest** element by that order,
so `Order.Number` dequeues lower numbers first — reverse the order if larger
values should have higher priority. Unlike `TxQueue`, it has no capacity or
lifecycle; it is a pure ordered multiset (equal values are kept, not merged).

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.take(pq)) // => 1
  console.log(yield* TxPriorityQueue.take(pq)) // => 2

  // Highest-first: flip the order.
  const maxFirst = yield* TxPriorityQueue.empty<number>(
    Order.flip(Order.Number)
  )
  yield* TxPriorityQueue.offerAll(maxFirst, [1, 5, 3])
  console.log(yield* TxPriorityQueue.take(maxFirst)) // => 5
})
```

### TxPriorityQueue (type)

A transactional priority queue backed by a sorted chunk and an `Order<A>`.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq: TxPriorityQueue.TxPriorityQueue<number> =
    yield* TxPriorityQueue.empty<number>(Order.Number)
  yield* TxPriorityQueue.offer(pq, 1)
})
```

### empty

Creates an empty priority queue with the given `Order`.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true
})
```

### make

Creates a priority queue from variadic elements. Curried on the `Order`.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.make(Order.Number)(3, 1, 2)
  console.log(yield* TxPriorityQueue.take(pq)) // => 1
})
```

### fromIterable

Creates a priority queue from an iterable, sorting on construction.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]
})
```

### isTxPriorityQueue

Type guard for a `TxPriorityQueue`.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  console.log(TxPriorityQueue.isTxPriorityQueue(pq)) // => true
})
```

### offer

Inserts a value into its sorted position.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  yield* TxPriorityQueue.offer(pq, 2)
  yield* TxPriorityQueue.offer(pq, 1)
  console.log(yield* TxPriorityQueue.take(pq)) // => 1
})
```

### offerAll

Inserts every element of an iterable.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  yield* TxPriorityQueue.offerAll(pq, [3, 1, 2])
  console.log(yield* TxPriorityQueue.take(pq)) // => 1
})
```

### take

Takes the smallest element, retrying while the queue is empty.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.take(pq)) // => 1
})
```

### takeOption

Tries to take the smallest element, returning `Option.none` when empty (no
retry).

```ts
import { Effect, Option, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  console.log(Option.isNone(yield* TxPriorityQueue.takeOption(pq))) // => true
})
```

### takeAll

Takes every element in priority order.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 2, 3]
})
```

### takeUpTo

Takes up to `n` elements in priority order.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [5, 3, 1, 4, 2])
  console.log(yield* TxPriorityQueue.takeUpTo(pq, 2)) // => [1, 2]
})
```

### peek

Observes the smallest element without removing it, retrying while empty.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.peek(pq)) // => 1
})
```

### peekOption

Observes the smallest element without removing it, returning `Option.none` when
empty.

```ts
import { Effect, Option, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  console.log(Option.isNone(yield* TxPriorityQueue.peekOption(pq))) // => true
})
```

### removeIf

Removes every element matching the predicate.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5])
  yield* TxPriorityQueue.removeIf(pq, (n) => n % 2 === 0)
  console.log(yield* TxPriorityQueue.takeAll(pq)) // => [1, 3, 5]
})
```

### retainIf

Keeps only the elements matching the predicate.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3, 4, 5])
  yield* TxPriorityQueue.retainIf(pq, (n) => n % 2 === 0)
  console.log(yield* TxPriorityQueue.takeAll(pq)) // => [2, 4]
})
```

### size

The number of elements in the queue.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1, 2, 3])
  console.log(yield* TxPriorityQueue.size(pq)) // => 3
})
```

### isEmpty

Whether the queue has no elements.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.empty<number>(Order.Number)
  console.log(yield* TxPriorityQueue.isEmpty(pq)) // => true
})
```

### isNonEmpty

Whether the queue has at least one element.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [1])
  console.log(yield* TxPriorityQueue.isNonEmpty(pq)) // => true
})
```

### toArray

Returns all elements in priority order without removing them.

```ts
import { Effect, Order, TxPriorityQueue } from "effect"

const program = Effect.gen(function*() {
  const pq = yield* TxPriorityQueue.fromIterable(Order.Number, [3, 1, 2])
  console.log(yield* TxPriorityQueue.toArray(pq)) // => [1, 2, 3]
})
```

## TxPubSub

`TxPubSub<A>` is a transactional broadcast hub. Each subscriber owns its own
`TxQueue`, and a `publish` offers the value to every currently-registered
subscriber queue in one atomic transaction. Subscribers only receive values
published **after** they subscribe. `subscribe` is `Scope`-managed: the
subscriber queue is registered on acquire and removed (and shut down) when the
scope closes.

The hub's strategy (`bounded`, `dropping`, `sliding`, `unbounded`) is applied to
each subscriber queue, so it controls back-pressure per subscriber.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<string>()

  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub1 = yield* TxPubSub.subscribe(hub)
      const sub2 = yield* TxPubSub.subscribe(hub)

      yield* TxPubSub.publish(hub, "broadcast")

      console.log(yield* TxQueue.take(sub1)) // => "broadcast"
      console.log(yield* TxQueue.take(sub2)) // => "broadcast"
    })
  )
})
```
**Tip:** For an observable single value rather than a stream of events, see
`TxSubscriptionRef` on the
[transactional data structures](https://effect.plants.sh/transactions/transactional-data-structures/)
page — it composes `TxPubSub.acquireSubscriber` with a `TxRef` to broadcast
state changes.

### TxPubSub (type)

The hub interface. Carries the broadcast strategy and capacity.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub: TxPubSub.TxPubSub<number> = yield* TxPubSub.bounded<number>(16)
})
```

### bounded

Creates a hub where a full subscriber queue makes publishers **retry** until
space is available (back-pressure).

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.bounded<number>(16)
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publish(hub, 42)
      console.log(yield* TxQueue.take(sub)) // => 42
    })
  )
})
```

### dropping

Creates a hub where a full subscriber queue drops the new message; `publish`
returns `false`.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.dropping<number>(2)
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publish(hub, 1)
      yield* TxPubSub.publish(hub, 2)
      console.log(yield* TxPubSub.publish(hub, 3)) // => false (dropped)
    })
  )
})
```

### sliding

Creates a hub where a full subscriber queue evicts its oldest message to keep
the newest.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.sliding<number>(2)
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publishAll(hub, [1, 2, 3]) // evicts 1
      console.log(yield* TxQueue.take(sub)) // => 2
    })
  )
})
```

### unbounded

Creates a hub whose subscriber queues grow without bound; messages are always
accepted.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<string>()
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publish(hub, "msg")
      console.log(yield* TxQueue.take(sub)) // => "msg"
    })
  )
})
```

### isTxPubSub

Type guard for a `TxPubSub`.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  console.log(TxPubSub.isTxPubSub(hub)) // => true
})
```

### publish

Broadcasts one value to all current subscribers. Returns `true` when delivered
to every subscriber, or `false` if the hub is shut down or the message was
dropped for any subscriber. With no subscribers it is a no-op returning `true`.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<string>()
  console.log(yield* TxPubSub.publish(hub, "no one listening")) // => true
})
```

### publishAll

Broadcasts every message from an iterable. Returns `true` if all were delivered
to all subscribers.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publishAll(hub, [1, 2, 3])
      console.log(yield* TxQueue.takeAll(sub)) // => [1, 2, 3]
    })
  )
})
```

### subscribe

Subscribes to the hub, returning a `Scope`-managed `TxQueue` that receives
messages published after subscription. The queue is removed and shut down when
the scope closes.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<string>()
  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publish(hub, "hello")
      console.log(yield* TxQueue.take(sub)) // => "hello"
    })
  )
})
```

### acquireSubscriber

The transactional acquire step of `subscribe`: creates and registers a
subscriber queue inside a transaction (requires `Effect.Transaction`). Use it
when registration must be atomic with other Tx operations; pair with
`releaseSubscriber`.

```ts
import { Effect, TxPubSub, TxQueue } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub))
  yield* TxPubSub.publish(hub, 1)
  console.log(yield* TxQueue.take(sub)) // => 1
  yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub))
})
```

### releaseSubscriber

The transactional release step of `subscribe`: removes a manually acquired queue
from the hub and shuts it down. Composes inside `Effect.tx` with other cleanup.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  const sub = yield* Effect.tx(TxPubSub.acquireSubscriber(hub))
  yield* Effect.tx(TxPubSub.releaseSubscriber(hub, sub)) // removes + shuts down
})
```

### capacity

The configured capacity of the hub (a plain number, not an `Effect`).

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.bounded<number>(16)
  console.log(TxPubSub.capacity(hub)) // => 16
})
```

### size

The maximum number of pending messages in any single subscriber queue (not the
total across queues).

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  yield* Effect.scoped(
    Effect.gen(function*() {
      yield* TxPubSub.subscribe(hub)
      yield* TxPubSub.publishAll(hub, [1, 2])
      console.log(yield* TxPubSub.size(hub)) // => 2
    })
  )
})
```

### isEmpty

Whether all subscriber queues are empty.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  console.log(yield* TxPubSub.isEmpty(hub)) // => true
})
```

### isFull

Whether any subscriber queue is at capacity (always `false` for unbounded
hubs).

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.bounded<number>(2)
  console.log(yield* TxPubSub.isFull(hub)) // => false
})
```

### isShutdown

Whether the hub has been shut down.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  console.log(yield* TxPubSub.isShutdown(hub)) // => false
  yield* TxPubSub.shutdown(hub)
  console.log(yield* TxPubSub.isShutdown(hub)) // => true
})
```

### shutdown

Shuts down the hub and the subscriber queues registered at that moment. After
shutdown, `publish` / `publishAll` return `false`. Idempotent.

```ts
import { Effect, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  yield* TxPubSub.shutdown(hub)
  console.log(yield* TxPubSub.publish(hub, 1)) // => false
})
```

### awaitShutdown

Retries until the hub is shut down, then succeeds.

```ts
import { Effect, Fiber, TxPubSub } from "effect"

const program = Effect.gen(function*() {
  const hub = yield* TxPubSub.unbounded<number>()
  const fiber = yield* Effect.forkChild(TxPubSub.awaitShutdown(hub))
  yield* TxPubSub.shutdown(hub)
  yield* Fiber.await(fiber)
})
```

## See also

- [Transactional state](https://effect.plants.sh/transactions/transactional-state/) — `TxRef`, the atom
  of STM that backs these structures.
- [Transactional data structures](https://effect.plants.sh/transactions/transactional-data-structures/)
  — `TxHashMap`, `TxChunk`, `TxSubscriptionRef`, and friends.
- [Queue and PubSub](https://effect.plants.sh/concurrency/queue-and-pubsub/) — the non-transactional
  `Queue` and `PubSub` for general fiber communication.