Observable transactional state (TxSubscriptionRef)
A TxSubscriptionRef<A> is a TxRef paired
with a TxPubSub: it holds a
current value and lets observers see every committed change. Each subscriber
first receives the value that is current at subscription time, then every value
published by later updates — so a consumer never misses the “starting point” of
the state it is watching.
import { Effect, Stream, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { // Observable transactional state, initialized to 0. const ref = yield* TxSubscriptionRef.make(0)
// Drive two committed updates. yield* TxSubscriptionRef.set(ref, 1) yield* TxSubscriptionRef.set(ref, 2)
// A new subscriber starts from the *current* value (2), not the history. const values = yield* Stream.runCollect( TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1)) ) console.log(values) // => [2]})The key idea: because the value lives in a TxRef, an update composes inside
Effect.tx like any other transactional write. You can update a
TxSubscriptionRef atomically alongside a TxRef,
a TxHashMap, or any other Tx*
data structure, and observers only see the new value once the whole transaction
commits.
Shared observable config with a service
Section titled “Shared observable config with a service”The common real-world shape is a service that owns a piece of shared state, lets
the rest of the app mutate it transactionally, and exposes a Stream so any
number of fibers can react to changes — including the current value on
subscription.
import { Context, Effect, Fiber, Layer, Stream, TxSubscriptionRef } from "effect"
interface AppConfig { readonly logLevel: "info" | "debug" readonly maxConnections: number}
class Config extends Context.Service<Config, { // Atomically derive the next config from the current one and publish it. readonly update: (f: (current: AppConfig) => AppConfig) => Effect.Effect<void> // Observe every committed config, starting with the one current at subscription. readonly changes: Stream.Stream<AppConfig>}>()("app/Config") { static layer = Layer.effect( Config, Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make<AppConfig>({ logLevel: "info", maxConnections: 10 }) return Config.of({ update: (f) => TxSubscriptionRef.update(ref, f), changes: TxSubscriptionRef.changesStream(ref) }) }) )}
const program = Effect.gen(function*() { const config = yield* Config
// A reactor that logs every config it observes (including the initial one). const reactor = yield* config.changes.pipe( Stream.take(3), Stream.runForEach((c) => Effect.log(`config -> ${JSON.stringify(c)}`)), Effect.forkChild )
// Each update is published to the reactor above once it commits. yield* config.update((c) => ({ ...c, logLevel: "debug" })) yield* config.update((c) => ({ ...c, maxConnections: 50 }))
yield* Fiber.join(reactor)}).pipe(Effect.provide(Config.layer))The reactor prints the initial config, then the debug update, then the
maxConnections update. It reacts to state changes without polling, and any
number of consumers can subscribe independently.
Relationship to SubscriptionRef
Section titled “Relationship to SubscriptionRef”TxSubscriptionRef is the STM-backed sibling of
SubscriptionRef. Both implement the same
observable-state idea — a current value plus a change feed that replays the
current value to each new subscriber — but they differ in how updates compose:
SubscriptionRefserializes effectful updates under an internal semaphore. Reach for it when the observable state stands alone.TxSubscriptionRefstores the value in aTxRef, so an update participates in a transaction and commits atomically together with otherTx*operations. Reach for it when the observable state must change in lockstep with other transactional data.
TxSubscriptionRef reference
Section titled “TxSubscriptionRef reference”TxSubscriptionRef
Section titled “TxSubscriptionRef”The model type: TxSubscriptionRef<A> wraps a TxRef<A> (the current value) and
a TxPubSub<A> (the change feed). It is Pipeable and Inspectable. You rarely
reference the type directly except in service signatures.
import { Effect, TxSubscriptionRef } from "effect"
// Annotate shared transactional state in a service constructor.const make: Effect.Effect<TxSubscriptionRef.TxSubscriptionRef<number>> = TxSubscriptionRef.make(0)Creates a new TxSubscriptionRef from an initial value. Allocates a TxRef plus
an unbounded TxPubSub, so the whole construction runs inside a transaction.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(42) return yield* TxSubscriptionRef.get(ref) // => 42})isTxSubscriptionRef
Section titled “isTxSubscriptionRef”Type guard that returns true when the value is a TxSubscriptionRef. Useful for
narrowing an unknown before calling the module’s operations.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(0)
TxSubscriptionRef.isTxSubscriptionRef(ref) // => true TxSubscriptionRef.isTxSubscriptionRef(42) // => false})Reads the current value once, without subscribing to future changes. Delegates to
TxRef.get, so it composes inside Effect.tx.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make("hello") return yield* TxSubscriptionRef.get(ref) // => "hello"})Replaces the current value and publishes the new value to all current subscribers, atomically.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(0) yield* TxSubscriptionRef.set(ref, 42) return yield* TxSubscriptionRef.get(ref) // => 42})update
Section titled “update”Derives the next value from the current value with a function, then publishes it. Use this when the new value depends on the old one.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(5) yield* TxSubscriptionRef.update(ref, (n) => n * 2) return yield* TxSubscriptionRef.get(ref) // => 10})modify
Section titled “modify”Computes a separate return value and the next state in one transactional update,
publishing the new value. The function returns [returnValue, newValue].
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(10) const result = yield* TxSubscriptionRef.modify(ref, (n) => [`was ${n}`, n + 1]) console.log(result) // => "was 10" console.log(yield* TxSubscriptionRef.get(ref)) // => 11})getAndSet
Section titled “getAndSet”Replaces the value and returns the previous value, publishing the new one.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make("a") const old = yield* TxSubscriptionRef.getAndSet(ref, "b") console.log(old) // => "a" console.log(yield* TxSubscriptionRef.get(ref)) // => "b"})getAndUpdate
Section titled “getAndUpdate”Derives and publishes a new value while returning the previous value.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(1) const old = yield* TxSubscriptionRef.getAndUpdate(ref, (n) => n + 10) console.log(old) // => 1 console.log(yield* TxSubscriptionRef.get(ref)) // => 11})updateAndGet
Section titled “updateAndGet”Derives and publishes a new value while returning the new value.
import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(3) const result = yield* TxSubscriptionRef.updateAndGet(ref, (n) => n * 3) console.log(result) // => 9})changes
Section titled “changes”Subscribes to all committed changes through a scoped
TxQueue. The returned dequeue
first yields the value current at subscription time, then every subsequent
update. Because it is scoped, leaving the scope removes the subscriber. This is
the transactional, queue-based counterpart to changesStream,
and it composes with other TxQueue operations.
import { Effect, TxQueue, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(0)
yield* Effect.scoped( Effect.gen(function*() { const sub = yield* TxSubscriptionRef.changes(ref)
// The subscription replays the current value first. const initial = yield* TxQueue.take(sub) console.log(initial) // => 0
// Subsequent committed updates arrive in order. yield* TxSubscriptionRef.set(ref, 1) const next = yield* TxQueue.take(sub) console.log(next) // => 1 }) )})changesStream
Section titled “changesStream”Exposes the same change feed as a Stream: it emits the current
value followed by every subsequent committed update. Use this to consume changes
from stream pipelines (e.g. Stream.take, Stream.runForEach). Taking one
element yields the value current when the stream is run.
import { Effect, Stream, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() { const ref = yield* TxSubscriptionRef.make(0) yield* TxSubscriptionRef.set(ref, 1) yield* TxSubscriptionRef.set(ref, 2)
// A fresh subscriber starts from the current value (2). const values = yield* Stream.runCollect( TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1)) ) console.log(values) // => [2]})See also
Section titled “See also”- Transactional state (
TxRef) — the building block that holds the value and makes updates composable insideEffect.tx. - Transactional data structures —
covers
TxPubSub(the broadcast hub behind the change feed) andTxQueue(the type returned bychanges). SubscriptionRef— the non-transactional observable reference with the same change-feed model.