# Key-based invalidation (Reactivity)

The `Reactivity` service (from `effect/unstable/reactivity/Reactivity`) provides
**process-local, key-based invalidation**: it connects writes to the dependent
reads that should re-run when those writes succeed.

It does **not** cache values. Instead it tracks a set of *keys*, lets you
register query handlers against those keys, and re-runs the registered effects
whenever a matching key is invalidated. That keeps queues, streams, UI
subscriptions, and read models fresh after a successful mutation.

**Mental model**

- A **query** registers one or more keys, runs once immediately, and publishes
  each result to a queue or stream.
- Invalidating any registered key **schedules the query to rerun**.
- A **mutation** wraps an effect and invalidates keys **only after it succeeds**.
- Keys can be a flat array, or a record whose property names act as broad
  **namespaces** and whose ids address individual records.
**Process-local:** The default `layer` is in-memory and process-local. It does **not**
coordinate invalidations across processes or cluster runners. For
distributed/UI state, see the Atom integration in
[/reactivity/server-state/](https://effect.plants.sh/reactivity/server-state/).

## Common case: a query and a mutation

The pattern is: read data with `Reactivity.query` (or `stream`) keyed on the
data it depends on, then wrap your writes with `Reactivity.mutation` keyed on the
same keys. When the write succeeds, the query reruns and pushes a fresh result.

```ts
import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

// An in-memory "table" of todos for the example.
const todos = new Map<number, string>([[1, "Buy milk"]])

const program = Effect.gen(function* () {
  // A query keyed on the "todos" namespace. Runs immediately, then again
  // every time the "todos" key is invalidated. The result is a Dequeue.
  const results = yield* Reactivity.query(["todos"])(
    Effect.sync(() => Array.from(todos.values()))
  )

  // First (immediate) result.
  console.log(yield* Queue.take(results)) // => [ 'Buy milk' ]

  // A mutation keyed on the same "todos" key. On success it invalidates the
  // key, which reruns the query above.
  yield* Reactivity.mutation(["todos"])(
    Effect.sync(() => {
      todos.set(2, "Walk dog")
    })
  )

  // The rerun pushes the new snapshot onto the queue.
  console.log(yield* Queue.take(results)) // => [ 'Buy milk', 'Walk dog' ]
}).pipe(Effect.scoped)

// query/mutation/invalidate require the Reactivity service.
program.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```

`Reactivity.query` returns a [`Queue.Dequeue`](https://effect.plants.sh/concurrency/queue-and-pubsub/), so
the registration lives in a [Scope](https://effect.plants.sh/resource-management/scope-and-finalizers/)
and is removed when the scope closes — hence the `Effect.scoped` above.

### Keys: arrays vs. namespaced records

The `keys` argument accepts two shapes, and both `query`/`stream`/`mutation`/
`invalidate` use the same matching rules.

```ts
// 1) A flat array of keys. Each element is matched independently.
Reactivity.invalidate(["todos", "users"])

// 2) A record of namespaces -> ids. A namespace key matches broadly; each id
//    additionally registers a "<namespace>:<id>" key for record-level matching.
Reactivity.invalidate({ todos: [1, 2], users: ["alice"] })
```

With the record form, invalidating `{ todos: [] }` (the namespace, no ids)
re-runs every query that registered the `todos` namespace, while
`{ todos: [1] }` also matches queries that scoped themselves to that specific id.
**Use stable keys:** Primitive keys (`string`, `number`, `bigint`, `boolean`) are matched by their
string value. **Non-primitive keys are matched by `Hash.hash`**, so a fresh
object literal will not match an earlier one unless it hashes the same. Prefer
stable primitive keys (or values implementing `Hash`) over mutable objects.

## How reruns behave

A few semantics worth internalizing:

- **Runs once immediately.** A query executes its effect right away and offers
  the first result before any invalidation.
- **Coalescing.** If an invalidation arrives while the query's effect is still
  running, it is collapsed into a *single* follow-up run rather than queuing
  many runs.
- **Mutations only invalidate on success.** If the wrapped effect fails, the
  keys are not invalidated.
- **Failures propagate.** If a query's effect fails, its queue/stream fails with
  the same `Cause`.

```ts
import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  let attempts = 0
  const results = yield* Reactivity.query(["k"])(
    Effect.sync(() => ++attempts)
  )

  console.log(yield* Queue.take(results)) // => 1 (immediate run)

  yield* Reactivity.invalidate(["k"])
  console.log(yield* Queue.take(results)) // => 2 (rerun after invalidate)
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)
```

## Exhaustive reference

Everything below is exported from
`effect/unstable/reactivity/Reactivity`. The standalone accessors
(`query`, `stream`, `mutation`, `invalidate`) require the `Reactivity` service
in context; the service class exposes the same operations plus the `Unsafe`
variants and `withBatch` for coalescing.

### `layer`

The default `Layer` providing an in-memory `Reactivity` service. Provide it once
at the edge of your program.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

const main = Reactivity.invalidate(["todos"])

main.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
// => provides Reactivity, then resolves
```

### `make`

An `Effect` that constructs the in-memory service value. `layer` is just
`Layer.effect(Reactivity)(make)`; use `make` only if you need to build the
service manually.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const service = yield* Reactivity.make
  // service: Reactivity (the same value layer provides)
}).pipe(Effect.runPromise)
```

### `query`

Runs an effect as a query tied to the supplied keys and returns a
`Queue.Dequeue` of results. It emits the initial result immediately and a new
result after any registered key is invalidated. The registration is removed when
the enclosing `Scope` closes. Dual: `query(keys)(effect)` or
`query(effect, keys)`.

```ts
import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  // Array form: keyed on the "users" namespace.
  const q1 = yield* Reactivity.query(["users"])(Effect.succeed("snapshot-A"))
  console.log(yield* Queue.take(q1)) // => "snapshot-A"

  // Record form: keyed on a specific user id.
  const q2 = yield* Reactivity.query({ users: [42] })(
    Effect.succeed({ id: 42, name: "Ada" })
  )
  console.log(yield* Queue.take(q2)) // => { id: 42, name: "Ada" }
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `stream`

Same as `query`, but yields a [`Stream`](https://effect.plants.sh/streaming/) of rerun results
instead of a queue (it is `query` mapped through `Stream.fromQueue`). The `Scope`
requirement is folded into the stream's lifecycle. Dual.

```ts
import { Effect, Stream } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const updates = Reactivity.stream(["dashboard"])(Effect.succeed(Date.now()))

  // Drive invalidations, take the first 2 emissions, then stop.
  yield* Effect.fork(
    Effect.gen(function* () {
      yield* Reactivity.invalidate(["dashboard"])
    })
  )

  const first2 = yield* Stream.take(updates, 2).pipe(Stream.runCollect)
  console.log(first2.length) // => 2 (initial run + one rerun)
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `mutation`

Wraps an effect so the supplied keys are invalidated **after the effect
succeeds** (it is `Effect.tap(effect, invalidate(keys))`). If the effect fails,
nothing is invalidated. Dual: `mutation(keys)(effect)` or `mutation(effect, keys)`.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

const createTodo = (text: string) =>
  // Invalidates both the broad "todos" namespace and the new record's id.
  Reactivity.mutation({ todos: [text] })(
    Effect.sync(() => ({ id: text, text }))
  )

createTodo("Buy milk").pipe(
  Effect.provide(Reactivity.layer),
  Effect.runPromise
)
// => { id: "Buy milk", text: "Buy milk" }, and "todos" queries rerun
```

### `invalidate`

Invalidates the supplied keys directly through the service. Use it when
invalidation is already part of your workflow and you do not want to wrap a
specific effect. Matching queries rerun immediately (or are collected until an
enclosing batch exits — see `withBatch`).

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  // Array form.
  yield* Reactivity.invalidate(["todos"])
  // Record form: invalidate the namespace AND a specific id.
  yield* Reactivity.invalidate({ todos: [1] })
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```

## The `Reactivity` service

`Reactivity` is a `Context.Service` class. The standalone accessors above just
delegate to a `Reactivity` instance, but accessing the service directly gives you
the `Unsafe` (non-`Effect`) variants and batch coalescing. Grab it with
`Reactivity.use` or by yielding it in a generator.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const reactivity = yield* Reactivity.Reactivity
  yield* reactivity.invalidate(["todos"])
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `service.query` / `service.stream` / `service.mutation` / `service.invalidate`

The method forms of the four accessors. They take `(keys, effect)` positionally
(no dual/data-last form) and behave identically to the standalone functions.

```ts
import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const r = yield* Reactivity.Reactivity

  const results = yield* r.query(["items"], Effect.succeed([1, 2, 3]))
  console.log(yield* Queue.take(results)) // => [1, 2, 3]

  yield* r.mutation(["items"], Effect.sync(() => {}))
  yield* r.invalidate(["items"])
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `service.registerUnsafe`

Registers a plain `() => void` handler against the given keys and returns an
**unregister** function. This is the low-level primitive `query` is built on; use
it to bridge non-`Effect` subscribers. Synchronous, no scope management — you own
calling the returned cleanup.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const r = yield* Reactivity.Reactivity

  let runs = 0
  const unregister = r.registerUnsafe(["todos"], () => {
    runs++
  })

  r.invalidateUnsafe(["todos"])
  console.log(runs) // => 1

  unregister() // remove the handler
  r.invalidateUnsafe(["todos"])
  console.log(runs) // => 1 (handler no longer called)
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `service.invalidateUnsafe`

The synchronous, non-`Effect` form of `invalidate`. It runs matching handlers
immediately and ignores any active batch. Prefer `invalidate` inside Effect code;
reach for this only from imperative callbacks.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const r = yield* Reactivity.Reactivity
  r.invalidateUnsafe({ users: ["alice"] })
  // => synchronously runs handlers registered for "users" / "users:alice"
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```

### `service.withBatch`

Wraps an effect so that all `invalidate` calls inside it are **collected and
coalesced**, then flushed once when the batch exits (on success or failure).
This deduplicates invalidations so a query reruns at most once per key per batch,
even if you invalidate the same key many times.

```ts
import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"

Effect.gen(function* () {
  const r = yield* Reactivity.Reactivity

  let runs = 0
  r.registerUnsafe(["todos"], () => {
    runs++
  })

  yield* r.withBatch(
    Effect.gen(function* () {
      yield* r.invalidate(["todos"])
      yield* r.invalidate(["todos"])
      yield* r.invalidate(["todos"])
      console.log(runs) // => 0 (held until the batch exits)
    })
  )

  console.log(runs) // => 1 (flushed once, coalesced)
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
```
**Note:** `withBatch` only affects the `invalidate` (Effect) form, which checks for a
pending batch in context. `invalidateUnsafe` always fires immediately and is not
batched.

## Relationship to Atom

The Atom layer builds on this service for client/server state. Rather than
calling `Reactivity` directly in UI code, you typically attach `reactivityKeys`
to an atom and let writes invalidate reads automatically:

- `Atom.withReactivity` annotates a read atom with the keys it depends on — see
  the combinators reference at
  [/reactivity/atom-combinators/](https://effect.plants.sh/reactivity/atom-combinators/).
- `AtomRpc` and `AtomHttpApi` expose a `reactivityKeys` option so a mutation RPC/
  endpoint invalidates the matching query atoms on success — see
  [/reactivity/server-state/](https://effect.plants.sh/reactivity/server-state/).

Under the hood these use the same key shapes (arrays and namespaced records) and
the same `Hash`-based matching described above, so the mental model carries over.