Skip to content

Transactional data structures

A bare TxRef holds a single value, but shared state is often a collection: a registry of sessions, an inventory by SKU, a work queue. Effect ships transactional versions of the common data structures — TxHashMap, TxHashSet, TxChunk, TxQueue, and friends — each backed by a TxRef so that every operation participates in the same journal. That means you can read from a map, update a set, and offer to a queue inside one Effect.tx block and have it all commit atomically.

import { Context, Effect, Layer, Option, TxHashMap } from "effect"
// A shared inventory keyed by SKU. Multiple fibers reserve stock concurrently;
// each reservation must check availability and decrement atomically.
class Inventory extends Context.Service<Inventory, {
readonly reserve: (sku: string, qty: number) => Effect.Effect<boolean>
readonly stock: (sku: string) => Effect.Effect<number>
}>()("app/Inventory") {
static layer = Layer.effect(Inventory)(
Effect.gen(function*() {
const items = yield* TxHashMap.make(["laptop", 5], ["mouse", 20])
const reserve = Effect.fn("Inventory.reserve")(function*(
sku: string,
qty: number
) {
// The read + check + write happen in one transaction, so two fibers
// can never both pass the availability check for the last unit.
return yield* Effect.tx(
Effect.gen(function*() {
const current = yield* TxHashMap.get(items, sku)
if (Option.isNone(current) || current.value < qty) {
return false
}
yield* TxHashMap.set(items, sku, current.value - qty)
return true
})
)
})
const stock = Effect.fn("Inventory.stock")(function*(sku: string) {
const current = yield* TxHashMap.get(items, sku)
return Option.getOrElse(current, () => 0)
})
return Inventory.of({ reserve, stock })
})
)
}

TxHashMap<K, V> is a transactional key-value map. Constructors return an Effect: TxHashMap.empty(), TxHashMap.fromIterable(entries), or TxHashMap.make([k1, v1], [k2, v2], ...). Reads that may be absent return an Option, so handle both cases.

| Operation | Result | | --- | --- | | TxHashMap.get(map, key) | Effect<Option<V>> | | TxHashMap.has(map, key) | Effect<boolean> | | TxHashMap.set(map, key, value) | Effect<void> | | TxHashMap.modify(map, key, f) | Effect<Option<V>> — applies f to an existing entry and returns the previous value, or None (no write) if the key is absent | | TxHashMap.remove(map, key) | Effect<void> | | TxHashMap.size(map) | Effect<number> |

import { Effect, TxHashMap } from "effect"
// Atomically rename a key: read the old value, set the new key, drop the old.
const rename = (
map: TxHashMap.TxHashMap<string, number>,
from: string,
to: string
) =>
Effect.tx(
Effect.gen(function*() {
const value = yield* TxHashMap.get(map, from)
if (value._tag === "Some") {
yield* TxHashMap.set(map, to, value.value)
yield* TxHashMap.remove(map, from)
}
})
)

TxHashSet<V> is the transactional set. Build one with TxHashSet.empty(), TxHashSet.fromIterable(values), or TxHashSet.make(v1, v2, ...), and use add, remove, has, and size. It is ideal for tracking membership that several fibers update — active connections, seen ids, a set of locked resources.

import { Effect, TxHashSet } from "effect"
const program = Effect.gen(function*() {
const active = yield* TxHashSet.empty<string>()
// Try to claim a slot only if it isn't already taken — atomically.
const claim = (id: string) =>
Effect.tx(
Effect.gen(function*() {
if (yield* TxHashSet.has(active, id)) {
return false
}
yield* TxHashSet.add(active, id)
return true
})
)
return { claimed: yield* claim("session-1"), again: yield* claim("session-1") }
// { claimed: true, again: false }
})

TxChunk<A> is a transactional, ordered sequence backed by a Chunk. Create one with TxChunk.empty(), TxChunk.fromIterable(values), or TxChunk.make(chunk), then append, get (returns the whole Chunk), take, modify, and size. Use it as a transactional buffer or log when you need indexed, ordered access rather than map/set semantics.

import { Chunk, Effect, TxChunk } from "effect"
const program = Effect.gen(function*() {
const buffer = yield* TxChunk.empty<string>()
yield* Effect.tx(
Effect.gen(function*() {
yield* TxChunk.append(buffer, "a")
yield* TxChunk.append(buffer, "b")
})
)
const all = yield* TxChunk.get(buffer)
return Chunk.toReadonlyArray(all) // ["a", "b"]
})

TxQueue<A, E> is a transactional queue with built-in blocking semantics, which makes it a natural fit for producer/consumer pipelines. Choose a back-pressure strategy at construction time:

  • TxQueue.bounded(capacity) — producers retry when the queue is full
  • TxQueue.dropping(capacity)offer returns false when full
  • TxQueue.sliding(capacity) — dropping the oldest value to make room
  • TxQueue.unbounded() — always accepts while open

offer(queue, value) adds an item (returning false if the queue is closed or a full dropping queue rejected it). take(queue) removes one item, blocking the transaction (via Effect.txRetry) until a value is available. Use poll(queue) when absence should be immediate — it returns an Option.

import { Effect, Fiber, TxQueue } from "effect"
const program = Effect.gen(function*() {
// Bounded capacity gives natural back-pressure: producers wait when full.
const queue = yield* TxQueue.bounded<number>(16)
// Consumer: take blocks the transaction until an item arrives, so there is
// no polling loop — the fiber sleeps until a producer offers a value.
const consumer = yield* Effect.forkChild(
Effect.forever(
Effect.gen(function*() {
const job = yield* TxQueue.take(queue)
yield* Effect.log(`processing ${job}`)
})
)
)
// Producer: enqueue some work.
yield* TxQueue.offerAll(queue, [1, 2, 3])
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(consumer)
})

| You need… | Use | | --- | --- | | A single atomic value | TxRef | | Keyed lookup / registry | TxHashMap | | Membership tracking | TxHashSet | | Ordered, indexed buffer | TxChunk | | Producer/consumer with blocking | TxQueue |

The library also includes TxPriorityQueue, TxPubSub, and TxSubscriptionRef for priority ordering, transactional broadcast, and observable transactional state respectively. They follow the same pattern: every operation returns an Effect and composes inside Effect.tx.

When the thing you need to coordinate is not data but access — permits, locks, one-shot signals — reach for the coordination primitives.