Transactional data structures
A bare TxRef holds a single value, but
shared state is often a collection: a registry of sessions, an inventory by
SKU, a work queue. Effect ships transactional versions of the common data
structures — TxHashMap, TxHashSet, TxChunk, TxQueue, and friends — each
backed by a TxRef so that every operation participates in the same journal.
That means you can read from a map, update a set, and offer to a queue inside one
Effect.tx block and have it all commit atomically.
import { Context, Effect, Layer, Option, TxHashMap } from "effect"
// A shared inventory keyed by SKU. Multiple fibers reserve stock concurrently;// each reservation must check availability and decrement atomically.class Inventory extends Context.Service<Inventory, { readonly reserve: (sku: string, qty: number) => Effect.Effect<boolean> readonly stock: (sku: string) => Effect.Effect<number>}>()("app/Inventory") { static layer = Layer.effect(Inventory)( Effect.gen(function*() { const items = yield* TxHashMap.make(["laptop", 5], ["mouse", 20])
const reserve = Effect.fn("Inventory.reserve")(function*( sku: string, qty: number ) { // The read + check + write happen in one transaction, so two fibers // can never both pass the availability check for the last unit. return yield* Effect.tx( Effect.gen(function*() { const current = yield* TxHashMap.get(items, sku) if (Option.isNone(current) || current.value < qty) { return false } yield* TxHashMap.set(items, sku, current.value - qty) return true }) ) })
const stock = Effect.fn("Inventory.stock")(function*(sku: string) { const current = yield* TxHashMap.get(items, sku) return Option.getOrElse(current, () => 0) })
return Inventory.of({ reserve, stock }) }) )}TxHashMap
Section titled “TxHashMap”TxHashMap<K, V> is a transactional key-value map. Constructors return an
Effect: TxHashMap.empty(), TxHashMap.fromIterable(entries), or
TxHashMap.make([k1, v1], [k2, v2], ...). Reads that may be absent return an
Option, so handle both cases.
| Operation | Result |
| --- | --- |
| TxHashMap.get(map, key) | Effect<Option<V>> |
| TxHashMap.has(map, key) | Effect<boolean> |
| TxHashMap.set(map, key, value) | Effect<void> |
| TxHashMap.modify(map, key, f) | Effect<Option<V>> — applies f to an existing entry and returns the previous value, or None (no write) if the key is absent |
| TxHashMap.remove(map, key) | Effect<void> |
| TxHashMap.size(map) | Effect<number> |
import { Effect, TxHashMap } from "effect"
// Atomically rename a key: read the old value, set the new key, drop the old.const rename = ( map: TxHashMap.TxHashMap<string, number>, from: string, to: string) => Effect.tx( Effect.gen(function*() { const value = yield* TxHashMap.get(map, from) if (value._tag === "Some") { yield* TxHashMap.set(map, to, value.value) yield* TxHashMap.remove(map, from) } }) )TxHashSet
Section titled “TxHashSet”TxHashSet<V> is the transactional set. Build one with TxHashSet.empty(),
TxHashSet.fromIterable(values), or TxHashSet.make(v1, v2, ...), and use
add, remove, has, and size. It is ideal for tracking membership that
several fibers update — active connections, seen ids, a set of locked resources.
import { Effect, TxHashSet } from "effect"
const program = Effect.gen(function*() { const active = yield* TxHashSet.empty<string>()
// Try to claim a slot only if it isn't already taken — atomically. const claim = (id: string) => Effect.tx( Effect.gen(function*() { if (yield* TxHashSet.has(active, id)) { return false } yield* TxHashSet.add(active, id) return true }) )
return { claimed: yield* claim("session-1"), again: yield* claim("session-1") } // { claimed: true, again: false }})TxChunk
Section titled “TxChunk”TxChunk<A> is a transactional, ordered sequence backed by a
Chunk. Create one with TxChunk.empty(),
TxChunk.fromIterable(values), or TxChunk.make(chunk), then append, get
(returns the whole Chunk), take, modify, and size. Use it as a
transactional buffer or log when you need indexed, ordered access rather than
map/set semantics.
import { Chunk, Effect, TxChunk } from "effect"
const program = Effect.gen(function*() { const buffer = yield* TxChunk.empty<string>()
yield* Effect.tx( Effect.gen(function*() { yield* TxChunk.append(buffer, "a") yield* TxChunk.append(buffer, "b") }) )
const all = yield* TxChunk.get(buffer) return Chunk.toReadonlyArray(all) // ["a", "b"]})TxQueue
Section titled “TxQueue”TxQueue<A, E> is a transactional queue with built-in blocking semantics, which
makes it a natural fit for producer/consumer pipelines. Choose a back-pressure
strategy at construction time:
TxQueue.bounded(capacity)— producers retry when the queue is fullTxQueue.dropping(capacity)—offerreturnsfalsewhen fullTxQueue.sliding(capacity)— dropping the oldest value to make roomTxQueue.unbounded()— always accepts while open
offer(queue, value) adds an item (returning false if the queue is closed or
a full dropping queue rejected it). take(queue) removes one item, blocking
the transaction (via Effect.txRetry) until a value is available. Use
poll(queue) when absence should be immediate — it returns an Option.
import { Effect, Fiber, TxQueue } from "effect"
const program = Effect.gen(function*() { // Bounded capacity gives natural back-pressure: producers wait when full. const queue = yield* TxQueue.bounded<number>(16)
// Consumer: take blocks the transaction until an item arrives, so there is // no polling loop — the fiber sleeps until a producer offers a value. const consumer = yield* Effect.forkChild( Effect.forever( Effect.gen(function*() { const job = yield* TxQueue.take(queue) yield* Effect.log(`processing ${job}`) }) ) )
// Producer: enqueue some work. yield* TxQueue.offerAll(queue, [1, 2, 3])
yield* Effect.sleep("100 millis") yield* Fiber.interrupt(consumer)})Choosing a structure
Section titled “Choosing a structure”| You need… | Use |
| --- | --- |
| A single atomic value | TxRef |
| Keyed lookup / registry | TxHashMap |
| Membership tracking | TxHashSet |
| Ordered, indexed buffer | TxChunk |
| Producer/consumer with blocking | TxQueue |
The library also includes TxPriorityQueue, TxPubSub, and
TxSubscriptionRef for priority ordering, transactional broadcast, and
observable transactional state respectively. They follow the same pattern: every
operation returns an Effect and composes inside Effect.tx.
When the thing you need to coordinate is not data but access — permits, locks, one-shot signals — reach for the coordination primitives.