# Observable transactional state (TxSubscriptionRef)

A `TxSubscriptionRef<A>` is a [`TxRef`](https://effect.plants.sh/transactions/transactional-state/) paired
with a [`TxPubSub`](https://effect.plants.sh/transactions/transactional-data-structures/): it holds a
current value **and** lets observers see every committed change. Each subscriber
first receives the value that is current at subscription time, then every value
published by later updates — so a consumer never misses the "starting point" of
the state it is watching.

```ts
import { Effect, Stream, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  // Observable transactional state, initialized to 0.
  const ref = yield* TxSubscriptionRef.make(0)

  // Drive two committed updates.
  yield* TxSubscriptionRef.set(ref, 1)
  yield* TxSubscriptionRef.set(ref, 2)

  // A new subscriber starts from the *current* value (2), not the history.
  const values = yield* Stream.runCollect(
    TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1))
  )
  console.log(values) // => [2]
})
```

The key idea: because the value lives in a `TxRef`, an update composes inside
`Effect.tx` like any other transactional write. You can update a
`TxSubscriptionRef` atomically alongside a [`TxRef`](https://effect.plants.sh/transactions/transactional-state/),
a [`TxHashMap`](https://effect.plants.sh/transactions/transactional-data-structures/), or any other `Tx*`
data structure, and observers only see the new value once the whole transaction
commits.

## Shared observable config with a service

The common real-world shape is a service that owns a piece of shared state, lets
the rest of the app mutate it transactionally, and exposes a `Stream` so any
number of fibers can react to changes — including the current value on
subscription.

```ts
import { Context, Effect, Fiber, Layer, Stream, TxSubscriptionRef } from "effect"

interface AppConfig {
  readonly logLevel: "info" | "debug"
  readonly maxConnections: number
}

class Config extends Context.Service<Config, {
  // Atomically derive the next config from the current one and publish it.
  readonly update: (f: (current: AppConfig) => AppConfig) => Effect.Effect<void>
  // Observe every committed config, starting with the one current at subscription.
  readonly changes: Stream.Stream<AppConfig>
}>()("app/Config") {
  static layer = Layer.effect(
    Config,
    Effect.gen(function*() {
      const ref = yield* TxSubscriptionRef.make<AppConfig>({
        logLevel: "info",
        maxConnections: 10
      })
      return Config.of({
        update: (f) => TxSubscriptionRef.update(ref, f),
        changes: TxSubscriptionRef.changesStream(ref)
      })
    })
  )
}

const program = Effect.gen(function*() {
  const config = yield* Config

  // A reactor that logs every config it observes (including the initial one).
  const reactor = yield* config.changes.pipe(
    Stream.take(3),
    Stream.runForEach((c) => Effect.log(`config -> ${JSON.stringify(c)}`)),
    Effect.forkChild
  )

  // Each update is published to the reactor above once it commits.
  yield* config.update((c) => ({ ...c, logLevel: "debug" }))
  yield* config.update((c) => ({ ...c, maxConnections: 50 }))

  yield* Fiber.join(reactor)
}).pipe(Effect.provide(Config.layer))
```

The reactor prints the initial config, then the `debug` update, then the
`maxConnections` update. It reacts to state changes without polling, and any
number of consumers can subscribe independently.

## Relationship to SubscriptionRef

`TxSubscriptionRef` is the STM-backed sibling of
[`SubscriptionRef`](https://effect.plants.sh/state-management/subscription-ref/). Both implement the same
observable-state idea — a current value plus a change feed that replays the
current value to each new subscriber — but they differ in how updates compose:

- [`SubscriptionRef`](https://effect.plants.sh/state-management/subscription-ref/) serializes effectful
  updates under an internal semaphore. Reach for it when the observable state
  stands alone.
- `TxSubscriptionRef` stores the value in a `TxRef`, so an update participates in
  a transaction and commits **atomically together with other `Tx*` operations**.
  Reach for it when the observable state must change in lockstep with other
  transactional data.
**Subscriptions start from the current value:** Subscribing reads the value current at subscription time and offers it to the
  subscriber's queue first, then delivers every subsequent committed update. The
  underlying [`TxPubSub`](https://effect.plants.sh/transactions/transactional-data-structures/) is
  unbounded, so publishing never blocks the writer; a slow subscriber accumulates
  pending values in its own queue until it catches up or its scope closes.
  Updates are published even when the new value equals the previous value.

## TxSubscriptionRef reference

### TxSubscriptionRef

The model type: `TxSubscriptionRef<A>` wraps a `TxRef<A>` (the current value) and
a `TxPubSub<A>` (the change feed). It is `Pipeable` and `Inspectable`. You rarely
reference the type directly except in service signatures.

```ts
import { Effect, TxSubscriptionRef } from "effect"

// Annotate shared transactional state in a service constructor.
const make: Effect.Effect<TxSubscriptionRef.TxSubscriptionRef<number>> =
  TxSubscriptionRef.make(0)
```

### make

Creates a new `TxSubscriptionRef` from an initial value. Allocates a `TxRef` plus
an unbounded `TxPubSub`, so the whole construction runs inside a transaction.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(42)
  return yield* TxSubscriptionRef.get(ref) // => 42
})
```

### isTxSubscriptionRef

Type guard that returns `true` when the value is a `TxSubscriptionRef`. Useful for
narrowing an `unknown` before calling the module's operations.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(0)

  TxSubscriptionRef.isTxSubscriptionRef(ref) // => true
  TxSubscriptionRef.isTxSubscriptionRef(42) // => false
})
```

### get

Reads the current value once, without subscribing to future changes. Delegates to
`TxRef.get`, so it composes inside `Effect.tx`.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make("hello")
  return yield* TxSubscriptionRef.get(ref) // => "hello"
})
```

### set

Replaces the current value and publishes the new value to all current
subscribers, atomically.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(0)
  yield* TxSubscriptionRef.set(ref, 42)
  return yield* TxSubscriptionRef.get(ref) // => 42
})
```

### update

Derives the next value from the current value with a function, then publishes it.
Use this when the new value depends on the old one.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(5)
  yield* TxSubscriptionRef.update(ref, (n) => n * 2)
  return yield* TxSubscriptionRef.get(ref) // => 10
})
```

### modify

Computes a separate return value and the next state in one transactional update,
publishing the new value. The function returns `[returnValue, newValue]`.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(10)
  const result = yield* TxSubscriptionRef.modify(ref, (n) => [`was ${n}`, n + 1])
  console.log(result) // => "was 10"
  console.log(yield* TxSubscriptionRef.get(ref)) // => 11
})
```

### getAndSet

Replaces the value and returns the **previous** value, publishing the new one.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make("a")
  const old = yield* TxSubscriptionRef.getAndSet(ref, "b")
  console.log(old) // => "a"
  console.log(yield* TxSubscriptionRef.get(ref)) // => "b"
})
```

### getAndUpdate

Derives and publishes a new value while returning the **previous** value.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(1)
  const old = yield* TxSubscriptionRef.getAndUpdate(ref, (n) => n + 10)
  console.log(old) // => 1
  console.log(yield* TxSubscriptionRef.get(ref)) // => 11
})
```

### updateAndGet

Derives and publishes a new value while returning the **new** value.

```ts
import { Effect, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(3)
  const result = yield* TxSubscriptionRef.updateAndGet(ref, (n) => n * 3)
  console.log(result) // => 9
})
```

### changes

Subscribes to all committed changes through a scoped
[`TxQueue`](https://effect.plants.sh/transactions/transactional-data-structures/). The returned dequeue
first yields the value current at subscription time, then every subsequent
update. Because it is scoped, leaving the scope removes the subscriber. This is
the transactional, queue-based counterpart to [`changesStream`](#changesstream),
and it composes with other `TxQueue` operations.

```ts
import { Effect, TxQueue, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(0)

  yield* Effect.scoped(
    Effect.gen(function*() {
      const sub = yield* TxSubscriptionRef.changes(ref)

      // The subscription replays the current value first.
      const initial = yield* TxQueue.take(sub)
      console.log(initial) // => 0

      // Subsequent committed updates arrive in order.
      yield* TxSubscriptionRef.set(ref, 1)
      const next = yield* TxQueue.take(sub)
      console.log(next) // => 1
    })
  )
})
```

### changesStream

Exposes the same change feed as a [`Stream`](https://effect.plants.sh/streaming/): it emits the current
value followed by every subsequent committed update. Use this to consume changes
from stream pipelines (e.g. `Stream.take`, `Stream.runForEach`). Taking one
element yields the value current when the stream is run.

```ts
import { Effect, Stream, TxSubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* TxSubscriptionRef.make(0)
  yield* TxSubscriptionRef.set(ref, 1)
  yield* TxSubscriptionRef.set(ref, 2)

  // A fresh subscriber starts from the current value (2).
  const values = yield* Stream.runCollect(
    TxSubscriptionRef.changesStream(ref).pipe(Stream.take(1))
  )
  console.log(values) // => [2]
})
```

## See also

- [Transactional state (`TxRef`)](https://effect.plants.sh/transactions/transactional-state/) — the
  building block that holds the value and makes updates composable inside
  `Effect.tx`.
- [Transactional data structures](https://effect.plants.sh/transactions/transactional-data-structures/) —
  covers `TxPubSub` (the broadcast hub behind the change feed) and `TxQueue` (the
  type returned by [`changes`](#changes)).
- [`SubscriptionRef`](https://effect.plants.sh/state-management/subscription-ref/) — the
  non-transactional observable reference with the same change-feed model.