Key-based invalidation (Reactivity)
The Reactivity service (from effect/unstable/reactivity/Reactivity) provides
process-local, key-based invalidation: it connects writes to the dependent
reads that should re-run when those writes succeed.
It does not cache values. Instead it tracks a set of keys, lets you register query handlers against those keys, and re-runs the registered effects whenever a matching key is invalidated. That keeps queues, streams, UI subscriptions, and read models fresh after a successful mutation.
Mental model
- A query registers one or more keys, runs once immediately, and publishes each result to a queue or stream.
- Invalidating any registered key schedules the query to rerun.
- A mutation wraps an effect and invalidates keys only after it succeeds.
- Keys can be a flat array, or a record whose property names act as broad namespaces and whose ids address individual records.
Common case: a query and a mutation
Section titled “Common case: a query and a mutation”The pattern is: read data with Reactivity.query (or stream) keyed on the
data it depends on, then wrap your writes with Reactivity.mutation keyed on the
same keys. When the write succeeds, the query reruns and pushes a fresh result.
import { Effect, Queue } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
// An in-memory "table" of todos for the example.const todos = new Map<number, string>([[1, "Buy milk"]])
const program = Effect.gen(function* () { // A query keyed on the "todos" namespace. Runs immediately, then again // every time the "todos" key is invalidated. The result is a Dequeue. const results = yield* Reactivity.query(["todos"])( Effect.sync(() => Array.from(todos.values())) )
// First (immediate) result. console.log(yield* Queue.take(results)) // => [ 'Buy milk' ]
// A mutation keyed on the same "todos" key. On success it invalidates the // key, which reruns the query above. yield* Reactivity.mutation(["todos"])( Effect.sync(() => { todos.set(2, "Walk dog") }) )
// The rerun pushes the new snapshot onto the queue. console.log(yield* Queue.take(results)) // => [ 'Buy milk', 'Walk dog' ]}).pipe(Effect.scoped)
// query/mutation/invalidate require the Reactivity service.program.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)Reactivity.query returns a Queue.Dequeue, so
the registration lives in a Scope
and is removed when the scope closes — hence the Effect.scoped above.
Keys: arrays vs. namespaced records
Section titled “Keys: arrays vs. namespaced records”The keys argument accepts two shapes, and both query/stream/mutation/
invalidate use the same matching rules.
// 1) A flat array of keys. Each element is matched independently.Reactivity.invalidate(["todos", "users"])
// 2) A record of namespaces -> ids. A namespace key matches broadly; each id// additionally registers a "<namespace>:<id>" key for record-level matching.Reactivity.invalidate({ todos: [1, 2], users: ["alice"] })With the record form, invalidating { todos: [] } (the namespace, no ids)
re-runs every query that registered the todos namespace, while
{ todos: [1] } also matches queries that scoped themselves to that specific id.
How reruns behave
Section titled “How reruns behave”A few semantics worth internalizing:
- Runs once immediately. A query executes its effect right away and offers the first result before any invalidation.
- Coalescing. If an invalidation arrives while the query’s effect is still running, it is collapsed into a single follow-up run rather than queuing many runs.
- Mutations only invalidate on success. If the wrapped effect fails, the keys are not invalidated.
- Failures propagate. If a query’s effect fails, its queue/stream fails with
the same
Cause.
import { Effect, Queue } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { let attempts = 0 const results = yield* Reactivity.query(["k"])( Effect.sync(() => ++attempts) )
console.log(yield* Queue.take(results)) // => 1 (immediate run)
yield* Reactivity.invalidate(["k"]) console.log(yield* Queue.take(results)) // => 2 (rerun after invalidate)}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)Exhaustive reference
Section titled “Exhaustive reference”Everything below is exported from
effect/unstable/reactivity/Reactivity. The standalone accessors
(query, stream, mutation, invalidate) require the Reactivity service
in context; the service class exposes the same operations plus the Unsafe
variants and withBatch for coalescing.
The default Layer providing an in-memory Reactivity service. Provide it once
at the edge of your program.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
const main = Reactivity.invalidate(["todos"])
main.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)// => provides Reactivity, then resolvesAn Effect that constructs the in-memory service value. layer is just
Layer.effect(Reactivity)(make); use make only if you need to build the
service manually.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const service = yield* Reactivity.make // service: Reactivity (the same value layer provides)}).pipe(Effect.runPromise)Runs an effect as a query tied to the supplied keys and returns a
Queue.Dequeue of results. It emits the initial result immediately and a new
result after any registered key is invalidated. The registration is removed when
the enclosing Scope closes. Dual: query(keys)(effect) or
query(effect, keys).
import { Effect, Queue } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { // Array form: keyed on the "users" namespace. const q1 = yield* Reactivity.query(["users"])(Effect.succeed("snapshot-A")) console.log(yield* Queue.take(q1)) // => "snapshot-A"
// Record form: keyed on a specific user id. const q2 = yield* Reactivity.query({ users: [42] })( Effect.succeed({ id: 42, name: "Ada" }) ) console.log(yield* Queue.take(q2)) // => { id: 42, name: "Ada" }}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)stream
Section titled “stream”Same as query, but yields a Stream of rerun results
instead of a queue (it is query mapped through Stream.fromQueue). The Scope
requirement is folded into the stream’s lifecycle. Dual.
import { Effect, Stream } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const updates = Reactivity.stream(["dashboard"])(Effect.succeed(Date.now()))
// Drive invalidations, take the first 2 emissions, then stop. yield* Effect.fork( Effect.gen(function* () { yield* Reactivity.invalidate(["dashboard"]) }) )
const first2 = yield* Stream.take(updates, 2).pipe(Stream.runCollect) console.log(first2.length) // => 2 (initial run + one rerun)}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)mutation
Section titled “mutation”Wraps an effect so the supplied keys are invalidated after the effect
succeeds (it is Effect.tap(effect, invalidate(keys))). If the effect fails,
nothing is invalidated. Dual: mutation(keys)(effect) or mutation(effect, keys).
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
const createTodo = (text: string) => // Invalidates both the broad "todos" namespace and the new record's id. Reactivity.mutation({ todos: [text] })( Effect.sync(() => ({ id: text, text })) )
createTodo("Buy milk").pipe( Effect.provide(Reactivity.layer), Effect.runPromise)// => { id: "Buy milk", text: "Buy milk" }, and "todos" queries reruninvalidate
Section titled “invalidate”Invalidates the supplied keys directly through the service. Use it when
invalidation is already part of your workflow and you do not want to wrap a
specific effect. Matching queries rerun immediately (or are collected until an
enclosing batch exits — see withBatch).
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { // Array form. yield* Reactivity.invalidate(["todos"]) // Record form: invalidate the namespace AND a specific id. yield* Reactivity.invalidate({ todos: [1] })}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)The Reactivity service
Section titled “The Reactivity service”Reactivity is a Context.Service class. The standalone accessors above just
delegate to a Reactivity instance, but accessing the service directly gives you
the Unsafe (non-Effect) variants and batch coalescing. Grab it with
Reactivity.use or by yielding it in a generator.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const reactivity = yield* Reactivity.Reactivity yield* reactivity.invalidate(["todos"])}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)service.query / service.stream / service.mutation / service.invalidate
Section titled “service.query / service.stream / service.mutation / service.invalidate”The method forms of the four accessors. They take (keys, effect) positionally
(no dual/data-last form) and behave identically to the standalone functions.
import { Effect, Queue } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const r = yield* Reactivity.Reactivity
const results = yield* r.query(["items"], Effect.succeed([1, 2, 3])) console.log(yield* Queue.take(results)) // => [1, 2, 3]
yield* r.mutation(["items"], Effect.sync(() => {})) yield* r.invalidate(["items"])}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)service.registerUnsafe
Section titled “service.registerUnsafe”Registers a plain () => void handler against the given keys and returns an
unregister function. This is the low-level primitive query is built on; use
it to bridge non-Effect subscribers. Synchronous, no scope management — you own
calling the returned cleanup.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const r = yield* Reactivity.Reactivity
let runs = 0 const unregister = r.registerUnsafe(["todos"], () => { runs++ })
r.invalidateUnsafe(["todos"]) console.log(runs) // => 1
unregister() // remove the handler r.invalidateUnsafe(["todos"]) console.log(runs) // => 1 (handler no longer called)}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)service.invalidateUnsafe
Section titled “service.invalidateUnsafe”The synchronous, non-Effect form of invalidate. It runs matching handlers
immediately and ignores any active batch. Prefer invalidate inside Effect code;
reach for this only from imperative callbacks.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const r = yield* Reactivity.Reactivity r.invalidateUnsafe({ users: ["alice"] }) // => synchronously runs handlers registered for "users" / "users:alice"}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)service.withBatch
Section titled “service.withBatch”Wraps an effect so that all invalidate calls inside it are collected and
coalesced, then flushed once when the batch exits (on success or failure).
This deduplicates invalidations so a query reruns at most once per key per batch,
even if you invalidate the same key many times.
import { Effect } from "effect"import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () { const r = yield* Reactivity.Reactivity
let runs = 0 r.registerUnsafe(["todos"], () => { runs++ })
yield* r.withBatch( Effect.gen(function* () { yield* r.invalidate(["todos"]) yield* r.invalidate(["todos"]) yield* r.invalidate(["todos"]) console.log(runs) // => 0 (held until the batch exits) }) )
console.log(runs) // => 1 (flushed once, coalesced)}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)Relationship to Atom
Section titled “Relationship to Atom”The Atom layer builds on this service for client/server state. Rather than
calling Reactivity directly in UI code, you typically attach reactivityKeys
to an atom and let writes invalidate reads automatically:
Atom.withReactivityannotates a read atom with the keys it depends on — see the combinators reference at /reactivity/atom-combinators/.AtomRpcandAtomHttpApiexpose areactivityKeysoption so a mutation RPC/ endpoint invalidates the matching query atoms on success — see /reactivity/server-state/.
Under the hood these use the same key shapes (arrays and namespaced records) and
the same Hash-based matching described above, so the mental model carries over.