Skip to content

Observable transactional state (TxSubscriptionRef)

A TxSubscriptionRef<A> is a TxRef paired with a TxPubSub: it holds a current value and lets observers see every committed change. Each subscriber first receives the value that is current at subscription time, then every value published by later updates — so a consumer never misses the “starting point” of the state it is watching.

import { Effect, Stream, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
// Observable transactional state, initialized to 0.
const ref = yield* TxSubscriptionRef.make(0)
// Drive two committed updates.
yield* TxSubscriptionRef.set(ref, 1)
yield* TxSubscriptionRef.set(ref, 2)
// A new subscriber starts from the *current* value (2), not the history.
const values = yield* Stream.runCollect(
TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1))
)
console.log(values) // => [2]
})

The key idea: because the value lives in a TxRef, an update composes inside Effect.tx like any other transactional write. You can update a TxSubscriptionRef atomically alongside a TxRef, a TxHashMap, or any other Tx* data structure, and observers only see the new value once the whole transaction commits.

The common real-world shape is a service that owns a piece of shared state, lets the rest of the app mutate it transactionally, and exposes a Stream so any number of fibers can react to changes — including the current value on subscription.

import { Context, Effect, Fiber, Layer, Stream, TxSubscriptionRef } from "effect"
interface AppConfig {
readonly logLevel: "info" | "debug"
readonly maxConnections: number
}
class Config extends Context.Service<Config, {
// Atomically derive the next config from the current one and publish it.
readonly update: (f: (current: AppConfig) => AppConfig) => Effect.Effect<void>
// Observe every committed config, starting with the one current at subscription.
readonly changes: Stream.Stream<AppConfig>
}>()("app/Config") {
static layer = Layer.effect(
Config,
Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make<AppConfig>({
logLevel: "info",
maxConnections: 10
})
return Config.of({
update: (f) => TxSubscriptionRef.update(ref, f),
changes: TxSubscriptionRef.changesStream(ref)
})
})
)
}
const program = Effect.gen(function*() {
const config = yield* Config
// A reactor that logs every config it observes (including the initial one).
const reactor = yield* config.changes.pipe(
Stream.take(3),
Stream.runForEach((c) => Effect.log(`config -> ${JSON.stringify(c)}`)),
Effect.forkChild
)
// Each update is published to the reactor above once it commits.
yield* config.update((c) => ({ ...c, logLevel: "debug" }))
yield* config.update((c) => ({ ...c, maxConnections: 50 }))
yield* Fiber.join(reactor)
}).pipe(Effect.provide(Config.layer))

The reactor prints the initial config, then the debug update, then the maxConnections update. It reacts to state changes without polling, and any number of consumers can subscribe independently.

TxSubscriptionRef is the STM-backed sibling of SubscriptionRef. Both implement the same observable-state idea — a current value plus a change feed that replays the current value to each new subscriber — but they differ in how updates compose:

  • SubscriptionRef serializes effectful updates under an internal semaphore. Reach for it when the observable state stands alone.
  • TxSubscriptionRef stores the value in a TxRef, so an update participates in a transaction and commits atomically together with other Tx* operations. Reach for it when the observable state must change in lockstep with other transactional data.

The model type: TxSubscriptionRef<A> wraps a TxRef<A> (the current value) and a TxPubSub<A> (the change feed). It is Pipeable and Inspectable. You rarely reference the type directly except in service signatures.

import { Effect, TxSubscriptionRef } from "effect"
// Annotate shared transactional state in a service constructor.
const make: Effect.Effect<TxSubscriptionRef.TxSubscriptionRef<number>> =
TxSubscriptionRef.make(0)

Creates a new TxSubscriptionRef from an initial value. Allocates a TxRef plus an unbounded TxPubSub, so the whole construction runs inside a transaction.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(42)
return yield* TxSubscriptionRef.get(ref) // => 42
})

Type guard that returns true when the value is a TxSubscriptionRef. Useful for narrowing an unknown before calling the module’s operations.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(0)
TxSubscriptionRef.isTxSubscriptionRef(ref) // => true
TxSubscriptionRef.isTxSubscriptionRef(42) // => false
})

Reads the current value once, without subscribing to future changes. Delegates to TxRef.get, so it composes inside Effect.tx.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make("hello")
return yield* TxSubscriptionRef.get(ref) // => "hello"
})

Replaces the current value and publishes the new value to all current subscribers, atomically.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(0)
yield* TxSubscriptionRef.set(ref, 42)
return yield* TxSubscriptionRef.get(ref) // => 42
})

Derives the next value from the current value with a function, then publishes it. Use this when the new value depends on the old one.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(5)
yield* TxSubscriptionRef.update(ref, (n) => n * 2)
return yield* TxSubscriptionRef.get(ref) // => 10
})

Computes a separate return value and the next state in one transactional update, publishing the new value. The function returns [returnValue, newValue].

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(10)
const result = yield* TxSubscriptionRef.modify(ref, (n) => [`was ${n}`, n + 1])
console.log(result) // => "was 10"
console.log(yield* TxSubscriptionRef.get(ref)) // => 11
})

Replaces the value and returns the previous value, publishing the new one.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make("a")
const old = yield* TxSubscriptionRef.getAndSet(ref, "b")
console.log(old) // => "a"
console.log(yield* TxSubscriptionRef.get(ref)) // => "b"
})

Derives and publishes a new value while returning the previous value.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(1)
const old = yield* TxSubscriptionRef.getAndUpdate(ref, (n) => n + 10)
console.log(old) // => 1
console.log(yield* TxSubscriptionRef.get(ref)) // => 11
})

Derives and publishes a new value while returning the new value.

import { Effect, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(3)
const result = yield* TxSubscriptionRef.updateAndGet(ref, (n) => n * 3)
console.log(result) // => 9
})

Subscribes to all committed changes through a scoped TxQueue. The returned dequeue first yields the value current at subscription time, then every subsequent update. Because it is scoped, leaving the scope removes the subscriber. This is the transactional, queue-based counterpart to changesStream, and it composes with other TxQueue operations.

import { Effect, TxQueue, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(0)
yield* Effect.scoped(
Effect.gen(function*() {
const sub = yield* TxSubscriptionRef.changes(ref)
// The subscription replays the current value first.
const initial = yield* TxQueue.take(sub)
console.log(initial) // => 0
// Subsequent committed updates arrive in order.
yield* TxSubscriptionRef.set(ref, 1)
const next = yield* TxQueue.take(sub)
console.log(next) // => 1
})
)
})

Exposes the same change feed as a Stream: it emits the current value followed by every subsequent committed update. Use this to consume changes from stream pipelines (e.g. Stream.take, Stream.runForEach). Taking one element yields the value current when the stream is run.

import { Effect, Stream, TxSubscriptionRef } from "effect"
const program = Effect.gen(function*() {
const ref = yield* TxSubscriptionRef.make(0)
yield* TxSubscriptionRef.set(ref, 1)
yield* TxSubscriptionRef.set(ref, 2)
// A fresh subscriber starts from the current value (2).
const values = yield* Stream.runCollect(
TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1))
)
console.log(values) // => [2]
})