Skip to content

Key-based invalidation (Reactivity)

The Reactivity service (from effect/unstable/reactivity/Reactivity) provides process-local, key-based invalidation: it connects writes to the dependent reads that should re-run when those writes succeed.

It does not cache values. Instead it tracks a set of keys, lets you register query handlers against those keys, and re-runs the registered effects whenever a matching key is invalidated. That keeps queues, streams, UI subscriptions, and read models fresh after a successful mutation.

Mental model

  • A query registers one or more keys, runs once immediately, and publishes each result to a queue or stream.
  • Invalidating any registered key schedules the query to rerun.
  • A mutation wraps an effect and invalidates keys only after it succeeds.
  • Keys can be a flat array, or a record whose property names act as broad namespaces and whose ids address individual records.

The pattern is: read data with Reactivity.query (or stream) keyed on the data it depends on, then wrap your writes with Reactivity.mutation keyed on the same keys. When the write succeeds, the query reruns and pushes a fresh result.

import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
// An in-memory "table" of todos for the example.
const todos = new Map<number, string>([[1, "Buy milk"]])
const program = Effect.gen(function* () {
// A query keyed on the "todos" namespace. Runs immediately, then again
// every time the "todos" key is invalidated. The result is a Dequeue.
const results = yield* Reactivity.query(["todos"])(
Effect.sync(() => Array.from(todos.values()))
)
// First (immediate) result.
console.log(yield* Queue.take(results)) // => [ 'Buy milk' ]
// A mutation keyed on the same "todos" key. On success it invalidates the
// key, which reruns the query above.
yield* Reactivity.mutation(["todos"])(
Effect.sync(() => {
todos.set(2, "Walk dog")
})
)
// The rerun pushes the new snapshot onto the queue.
console.log(yield* Queue.take(results)) // => [ 'Buy milk', 'Walk dog' ]
}).pipe(Effect.scoped)
// query/mutation/invalidate require the Reactivity service.
program.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

Reactivity.query returns a Queue.Dequeue, so the registration lives in a Scope and is removed when the scope closes — hence the Effect.scoped above.

The keys argument accepts two shapes, and both query/stream/mutation/ invalidate use the same matching rules.

// 1) A flat array of keys. Each element is matched independently.
Reactivity.invalidate(["todos", "users"])
// 2) A record of namespaces -> ids. A namespace key matches broadly; each id
// additionally registers a "<namespace>:<id>" key for record-level matching.
Reactivity.invalidate({ todos: [1, 2], users: ["alice"] })

With the record form, invalidating { todos: [] } (the namespace, no ids) re-runs every query that registered the todos namespace, while { todos: [1] } also matches queries that scoped themselves to that specific id.

A few semantics worth internalizing:

  • Runs once immediately. A query executes its effect right away and offers the first result before any invalidation.
  • Coalescing. If an invalidation arrives while the query’s effect is still running, it is collapsed into a single follow-up run rather than queuing many runs.
  • Mutations only invalidate on success. If the wrapped effect fails, the keys are not invalidated.
  • Failures propagate. If a query’s effect fails, its queue/stream fails with the same Cause.
import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
let attempts = 0
const results = yield* Reactivity.query(["k"])(
Effect.sync(() => ++attempts)
)
console.log(yield* Queue.take(results)) // => 1 (immediate run)
yield* Reactivity.invalidate(["k"])
console.log(yield* Queue.take(results)) // => 2 (rerun after invalidate)
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)

Everything below is exported from effect/unstable/reactivity/Reactivity. The standalone accessors (query, stream, mutation, invalidate) require the Reactivity service in context; the service class exposes the same operations plus the Unsafe variants and withBatch for coalescing.

The default Layer providing an in-memory Reactivity service. Provide it once at the edge of your program.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
const main = Reactivity.invalidate(["todos"])
main.pipe(Effect.provide(Reactivity.layer), Effect.runPromise)
// => provides Reactivity, then resolves

An Effect that constructs the in-memory service value. layer is just Layer.effect(Reactivity)(make); use make only if you need to build the service manually.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const service = yield* Reactivity.make
// service: Reactivity (the same value layer provides)
}).pipe(Effect.runPromise)

Runs an effect as a query tied to the supplied keys and returns a Queue.Dequeue of results. It emits the initial result immediately and a new result after any registered key is invalidated. The registration is removed when the enclosing Scope closes. Dual: query(keys)(effect) or query(effect, keys).

import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
// Array form: keyed on the "users" namespace.
const q1 = yield* Reactivity.query(["users"])(Effect.succeed("snapshot-A"))
console.log(yield* Queue.take(q1)) // => "snapshot-A"
// Record form: keyed on a specific user id.
const q2 = yield* Reactivity.query({ users: [42] })(
Effect.succeed({ id: 42, name: "Ada" })
)
console.log(yield* Queue.take(q2)) // => { id: 42, name: "Ada" }
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)

Same as query, but yields a Stream of rerun results instead of a queue (it is query mapped through Stream.fromQueue). The Scope requirement is folded into the stream’s lifecycle. Dual.

import { Effect, Stream } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const updates = Reactivity.stream(["dashboard"])(Effect.succeed(Date.now()))
// Drive invalidations, take the first 2 emissions, then stop.
yield* Effect.fork(
Effect.gen(function* () {
yield* Reactivity.invalidate(["dashboard"])
})
)
const first2 = yield* Stream.take(updates, 2).pipe(Stream.runCollect)
console.log(first2.length) // => 2 (initial run + one rerun)
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)

Wraps an effect so the supplied keys are invalidated after the effect succeeds (it is Effect.tap(effect, invalidate(keys))). If the effect fails, nothing is invalidated. Dual: mutation(keys)(effect) or mutation(effect, keys).

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
const createTodo = (text: string) =>
// Invalidates both the broad "todos" namespace and the new record's id.
Reactivity.mutation({ todos: [text] })(
Effect.sync(() => ({ id: text, text }))
)
createTodo("Buy milk").pipe(
Effect.provide(Reactivity.layer),
Effect.runPromise
)
// => { id: "Buy milk", text: "Buy milk" }, and "todos" queries rerun

Invalidates the supplied keys directly through the service. Use it when invalidation is already part of your workflow and you do not want to wrap a specific effect. Matching queries rerun immediately (or are collected until an enclosing batch exits — see withBatch).

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
// Array form.
yield* Reactivity.invalidate(["todos"])
// Record form: invalidate the namespace AND a specific id.
yield* Reactivity.invalidate({ todos: [1] })
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

Reactivity is a Context.Service class. The standalone accessors above just delegate to a Reactivity instance, but accessing the service directly gives you the Unsafe (non-Effect) variants and batch coalescing. Grab it with Reactivity.use or by yielding it in a generator.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const reactivity = yield* Reactivity.Reactivity
yield* reactivity.invalidate(["todos"])
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

service.query / service.stream / service.mutation / service.invalidate

Section titled “service.query / service.stream / service.mutation / service.invalidate”

The method forms of the four accessors. They take (keys, effect) positionally (no dual/data-last form) and behave identically to the standalone functions.

import { Effect, Queue } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const r = yield* Reactivity.Reactivity
const results = yield* r.query(["items"], Effect.succeed([1, 2, 3]))
console.log(yield* Queue.take(results)) // => [1, 2, 3]
yield* r.mutation(["items"], Effect.sync(() => {}))
yield* r.invalidate(["items"])
}).pipe(Effect.scoped, Effect.provide(Reactivity.layer), Effect.runPromise)

Registers a plain () => void handler against the given keys and returns an unregister function. This is the low-level primitive query is built on; use it to bridge non-Effect subscribers. Synchronous, no scope management — you own calling the returned cleanup.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const r = yield* Reactivity.Reactivity
let runs = 0
const unregister = r.registerUnsafe(["todos"], () => {
runs++
})
r.invalidateUnsafe(["todos"])
console.log(runs) // => 1
unregister() // remove the handler
r.invalidateUnsafe(["todos"])
console.log(runs) // => 1 (handler no longer called)
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

The synchronous, non-Effect form of invalidate. It runs matching handlers immediately and ignores any active batch. Prefer invalidate inside Effect code; reach for this only from imperative callbacks.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const r = yield* Reactivity.Reactivity
r.invalidateUnsafe({ users: ["alice"] })
// => synchronously runs handlers registered for "users" / "users:alice"
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

Wraps an effect so that all invalidate calls inside it are collected and coalesced, then flushed once when the batch exits (on success or failure). This deduplicates invalidations so a query reruns at most once per key per batch, even if you invalidate the same key many times.

import { Effect } from "effect"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
Effect.gen(function* () {
const r = yield* Reactivity.Reactivity
let runs = 0
r.registerUnsafe(["todos"], () => {
runs++
})
yield* r.withBatch(
Effect.gen(function* () {
yield* r.invalidate(["todos"])
yield* r.invalidate(["todos"])
yield* r.invalidate(["todos"])
console.log(runs) // => 0 (held until the batch exits)
})
)
console.log(runs) // => 1 (flushed once, coalesced)
}).pipe(Effect.provide(Reactivity.layer), Effect.runPromise)

The Atom layer builds on this service for client/server state. Rather than calling Reactivity directly in UI code, you typically attach reactivityKeys to an atom and let writes invalidate reads automatically:

  • Atom.withReactivity annotates a read atom with the keys it depends on — see the combinators reference at /reactivity/atom-combinators/.
  • AtomRpc and AtomHttpApi expose a reactivityKeys option so a mutation RPC/ endpoint invalidates the matching query atoms on success — see /reactivity/server-state/.

Under the hood these use the same key shapes (arrays and namespaced records) and the same Hash-based matching described above, so the mental model carries over.