Skip to content

Atom combinators & integrations

Combinators are pipeable transformations that take an atom and return a new atom. They never mutate the source: Atom.map(source, f) produces a derived atom, leaving source untouched. Most combinators preserve writability where it makes sense — if you map a Writable, you get a Writable back whose write input is forwarded to the source. A few combinators that subscribe to changes intentionally drop serialization metadata (their return type is Atom.WithoutSerializable<A>).

Everything here lives in the Atom module:

import { Atom } from "effect/unstable/reactivity"
// or, namespace style straight from the module file:
import * as Atom from "effect/unstable/reactivity/Atom"

(The effect/unstable/reactivity barrel re-exports Atom, AsyncResult, AtomRegistry, and Reactivity as namespaces — that is what the { Atom } imports below resolve to.)

Pipe an atom through one or more combinators. The result is just another atom you read and write like any other.

import { Atom } from "effect/unstable/reactivity"
// A writable source.
const count = Atom.make(0)
// Derive a read-only view.
const doubled = count.pipe(Atom.map((n) => n * 2))
// reading `doubled` => 0; set(count, 3) => doubled reads 6
// Map preserves writability: writes flow back to `count`.
const asString = count.pipe(Atom.map((n) => `count is ${n}`))
// asString stays Writable<string, number>: set(asString, 10) writes 10 to `count`
// Async source mapped on its success channel.
const user = Atom.make(fetchUser) // Atom<AsyncResult<User, Error>>
const userName = user.pipe(Atom.mapResult((u) => u.name))
// Initial/Failure pass through unchanged; Success<User> => Success<string>
declare const fetchUser: import("effect").Effect.Effect<User>
interface User {
readonly name: string
}

Caching, refresh timing, and persistence are all combinators too:

import { Atom } from "effect/unstable/reactivity"
import { Duration } from "effect"
const search = Atom.make(searchEffect).pipe(
// keep the result cached for 30s after the last reader leaves
Atom.setIdleTTL(Duration.seconds(30)),
// stale-while-revalidate: serve cached data, refetch in the background
Atom.swr({ staleTime: Duration.minutes(1) }),
// refetch when the tab regains focus
Atom.refreshOnWindowFocus
)
declare const searchEffect: import("effect").Effect.Effect<ReadonlyArray<string>>

By default an atom is lazy (recomputed only while observed) and not kept alive (disposed shortly after its last reader leaves, releasing finalizers). These combinators tune that behavior.

Returns a copy that stays cached and mounted even when nothing is subscribed. Useful for expensive computations or long-lived subscriptions you don’t want torn down between reads.

import { Atom } from "effect/unstable/reactivity"
const config = Atom.make(loadConfig).pipe(Atom.keepAlive)
// => config is never disposed by the registry; its scope stays open
declare const loadConfig: import("effect").Effect.Effect<unknown>

Undoes keepAlive on a copied atom, restoring the default “dispose when unused” behavior.

import { Atom } from "effect/unstable/reactivity"
const base = Atom.make(0).pipe(Atom.keepAlive)
const ephemeral = base.pipe(Atom.autoDispose)
// => ephemeral.keepAlive === false again

Sets whether the atom is lazy. Lazy atoms defer recomputation while they have no active listeners or non-lazy dependents; setting false makes the atom eager.

import { Atom } from "effect/unstable/reactivity"
const eager = Atom.make(metricsStream).pipe(Atom.setLazy(false))
// => eager keeps consuming the stream even without direct listeners
declare const metricsStream: import("effect").Stream.Stream<number>

Returns a copy with an idle time-to-live. A finite duration disposes the atom that long after the last reader leaves; an infinite duration behaves like keepAlive.

import { Atom } from "effect/unstable/reactivity"
import { Duration } from "effect"
const cached = Atom.make(fetchData).pipe(Atom.setIdleTTL(Duration.seconds(30)))
// => cached survives 30s of inactivity before disposal
const forever = Atom.make(fetchData).pipe(Atom.setIdleTTL(Duration.infinity))
// => forever.keepAlive === true (infinite TTL == keepAlive)
declare const fetchData: import("effect").Effect.Effect<unknown>

Attaches a diagnostic label (name plus a captured stack frame) for inspection and debugging. It does not change read/write behavior.

import { Atom } from "effect/unstable/reactivity"
const todos = Atom.make(fetchTodos).pipe(Atom.withLabel("todos"))
// => todos.label === ["todos", "<stack frame>"]
declare const fetchTodos: import("effect").Effect.Effect<ReadonlyArray<unknown>>

Pairs an atom with a value, producing a readonly [Atom<A>, A] tuple that you feed to an AtomRegistry so the atom starts seeded before its first rebuild.

import { Atom } from "effect/unstable/reactivity"
import * as Registry from "effect/unstable/reactivity/AtomRegistry"
const counter = Atom.make(0)
const registry = Registry.make({
initialValues: [counter.pipe(Atom.initialValue(42))]
})
// => registry.get(counter) reads 42 before any recompute

These build a new atom whose value is computed from a source. Writable sources stay writable: writes are forwarded to the source unchanged.

The low-level primitive behind map. You receive the read AtomContext and the source atom and return the derived value, so you can subscribe, register finalizers, refresh, etc. initialValueTarget controls which atom receives preloaded initial values.

import { Atom } from "effect/unstable/reactivity"
const source = Atom.make(1)
// Re-implement `map` with `transform`:
const tripled = source.pipe(Atom.transform((get) => get(source) * 3))
// => set(source, 4) makes tripled read 12

Maps the current value with a pure function. Writable sources keep their original write input type, so you can write through the mapped atom.

import { Atom } from "effect/unstable/reactivity"
const celsius = Atom.make(20)
const fahrenheit = celsius.pipe(Atom.map((c) => c * 9 / 5 + 32))
// read: fahrenheit => 68
// write: set(fahrenheit, 100) writes 100 to celsius (write input is the source's)

Maps the success value inside an AsyncResult atom. Initial and Failure states pass through untouched; writable sources keep their write input type.

import { Atom } from "effect/unstable/reactivity"
const user = Atom.make(fetchUser) // Atom<AsyncResult<{ id: number; name: string }>>
const name = user.pipe(Atom.mapResult((u) => u.name))
// Success({ id, name }) => Success(name)
// Initial / Failure / waiting flags are preserved
declare const fetchUser: import("effect").Effect.Effect<{
readonly id: number
readonly name: string
}>

Combinators for AsyncResult atoms (built from Atom.make(effect) etc.) that control fallbacks, timing, and refresh policy. See /reactivity/async-result/ for the result shape.

Shows a fallback AsyncResult atom (marked waiting) while the primary atom is still Initial. Once the primary produces any non-initial result, the primary wins.

import { Atom } from "effect/unstable/reactivity"
import { AsyncResult } from "effect/unstable/reactivity"
const remote = Atom.make(fetchProfile)
const cached = Atom.make(AsyncResult.success(defaultProfile))
const profile = remote.pipe(Atom.withFallback(cached))
// while remote is Initial => waiting(Success(defaultProfile))
// once remote resolves => remote's result
declare const defaultProfile: { readonly name: string }
declare const fetchProfile: import("effect").Effect.Effect<{ readonly name: string }>

Publishes source changes only after the source has stopped changing for the given duration. The current value is used immediately; the pending timer is cleared on disposal. (Returns WithoutSerializable.)

import { Atom } from "effect/unstable/reactivity"
import { Duration } from "effect"
const query = Atom.make("")
const debouncedQuery = query.pipe(Atom.debounce(Duration.millis(300)))
// rapid set(query, ...) calls => debouncedQuery only updates 300ms after the last one

Reads the source and schedules a refresh of it after the given duration; the scheduled refresh is canceled on disposal. Use it for simple polling.

import { Atom } from "effect/unstable/reactivity"
import { Duration } from "effect"
const clock = Atom.make(fetchServerTime).pipe(Atom.withRefresh(Duration.seconds(10)))
// => re-fetches server time roughly every 10s while observed
declare const fetchServerTime: import("effect").Effect.Effect<number>

Adds stale-while-revalidate refresh to an AsyncResult atom. While the cached value is fresh within staleTime, automatic revalidation on read is skipped; manual refresh calls always force a refetch.

Options:

  • staleTime — how long a value is considered fresh.
  • revalidateOnMount — set false to serve stale data on first mount without a background refresh (defaults to revalidating).
  • revalidateOnFocustrue respects staleTime on focus; "always" forces a refetch on focus.
  • focusSignal — the signal atom that drives focus revalidation (e.g. Atom.windowFocusSignal).
import { Atom } from "effect/unstable/reactivity"
import { Duration } from "effect"
const posts = Atom.make(fetchPosts).pipe(
Atom.swr({
staleTime: Duration.minutes(2),
revalidateOnFocus: true,
focusSignal: Atom.windowFocusSignal
})
)
// read within 2min => returns cached data, no refetch
// read when stale => returns cached data immediately AND triggers a background refresh
// tab refocused => revalidates if data is stale
declare const fetchPosts: import("effect").Effect.Effect<ReadonlyArray<unknown>>

Refreshes the atom whenever one of the given invalidation keys changes in the default Reactivity runtime. Keys can be a flat array or a record of { table: [...ids] }. Pair it with Reactivity.mutation or the reactivityKeys option on a runtime’s fn (runtime.fn(..., { reactivityKeys })) so writes invalidate the right reads.

import { Atom } from "effect/unstable/reactivity"
const todos = Atom.make(fetchTodos).pipe(Atom.withReactivity(["todos"]))
// => any invalidation of the "todos" key refreshes this atom
declare const fetchTodos: import("effect").Effect.Effect<ReadonlyArray<unknown>>

Show a provisional value immediately, run the real mutation, then reconcile — refreshing the source on success or rolling back on failure.

Wraps an atom in a writable optimistic atom. You write transition atoms (atoms holding an AsyncResult). While a transition is Success + waiting, its value is shown optimistically; when a transition settles successfully the source is refreshed, and failures roll back to the latest source value.

import { Atom } from "effect/unstable/reactivity"
import { AsyncResult } from "effect/unstable/reactivity"
const likeCount = Atom.make(10)
const optimisticLikes = likeCount.pipe(Atom.optimistic)
// A transition that immediately shows 11, then resolves.
const transition = Atom.make(
AsyncResult.success(11, { waiting: true }) // pending optimistic value
)
// set(optimisticLikes, transition):
// pending => optimisticLikes reads 11 immediately
// success => likeCount is refreshed, optimisticLikes follows the source
// failure => optimisticLikes rolls back to the latest likeCount

The ergonomic form: an AtomResultFn that applies an optimistic update via a reducer before running the underlying mutation fn. The reducer computes the provisional value from (current, update); the wrapped function completes the transition (or you can set intermediate values through the provided callback).

import { Atom } from "effect/unstable/reactivity"
import { Effect } from "effect"
const count = Atom.make(0)
const optimisticCount = count.pipe(Atom.optimistic)
// Underlying mutation: persist the increment server-side, return the new total.
const increment = Atom.fn((by: number) =>
Effect.map(persist(by), () => undefined)
)
const incrementOptimistic = optimisticCount.pipe(
Atom.optimisticFn({
reducer: (current, by: number) => current + by, // provisional value shown now
fn: increment
})
)
// set(incrementOptimistic, 1):
// 1. count immediately reads current + 1 (optimistic)
// 2. `increment` runs
// 3. on success => count refreshes to the server value
// on failure => count rolls back
declare const persist: (by: number) => Effect.Effect<void>

Runs synchronous atom updates as a single batch. Stale nodes are rebuilt and listeners notified only after the callback returns, so dependents observe one final state instead of intermediate flickers.

import { Atom } from "effect/unstable/reactivity"
import * as Registry from "effect/unstable/reactivity/AtomRegistry"
const first = Atom.make("Ada")
const last = Atom.make("Lovelace")
const registry = Registry.make()
Atom.batch(() => {
registry.set(first, "Grace")
registry.set(last, "Hopper")
})
// => dependents recompute once, seeing both updates together

A signal is an atom that increments to broadcast “something happened”. Combinators turn signal changes into atom refreshes.

A browser-only Atom<number> that increments whenever the document becomes visible (via the visibilitychange event). The listener is removed on disposal.

import { Atom } from "effect/unstable/reactivity"
// Atom<number>; increments each time the tab is refocused.
const focus = Atom.windowFocusSignal

Builds a combinator that refreshes an atom whenever the supplied signal atom changes; normal source updates are still forwarded. (Returns WithoutSerializable.)

import { Atom } from "effect/unstable/reactivity"
const refreshOnSignal = Atom.makeRefreshOnSignal(mySignal)
const data = Atom.make(fetchData).pipe(refreshOnSignal)
// => every time mySignal increments, `data` is refreshed
declare const mySignal: import("effect/unstable/reactivity/Atom").Atom<number>
declare const fetchData: import("effect").Effect.Effect<unknown>

makeRefreshOnSignal(windowFocusSignal) — refreshes the atom whenever the tab regains focus. Browser-only.

import { Atom } from "effect/unstable/reactivity"
const dashboard = Atom.make(fetchDashboard).pipe(Atom.refreshOnWindowFocus)
// => refetches whenever the user returns to the tab
declare const fetchDashboard: import("effect").Effect.Effect<unknown>

Creates a Writable atom backed by a KeyValueStore entry. Values are encoded and decoded with the supplied Schema. You must provide an AtomRuntime that includes the KeyValueStore service.

  • sync mode (default): the atom exposes the decoded value directly and writes defaultValue() when the key is missing.
  • async mode: the atom exposes an AsyncResult of the decoded value.
import { Atom } from "effect/unstable/reactivity"
import { KeyValueStore } from "effect/unstable/persistence"
import { Schema } from "effect"
// A runtime providing browser localStorage as the KeyValueStore.
const runtime = Atom.runtime(
KeyValueStore.layerStorage(() => localStorage)
)
// sync mode: value is `Theme` directly.
const theme = Atom.kvs({
runtime,
key: "theme",
schema: Schema.Literals(["light", "dark"]),
defaultValue: () => "light" as const
})
// read theme => "light" (or the persisted value)
// set(theme, "dark") => writes "dark" to localStorage AND updates the atom
// async mode: value is AsyncResult<Settings>.
const settings = Atom.kvs({
runtime,
key: "settings",
schema: Settings,
defaultValue: () => ({ notifications: true }),
mode: "async"
})
// read settings => AsyncResult<{ notifications: boolean }>
const Settings = Schema.Struct({ notifications: Schema.Boolean })

Creates a Writable atom mapped to a URL query parameter. Without a schema the value is a string; with a Schema.Codec<_, string> the value is an Option<Type>. Writes are debounced (~500ms) and pushed via history.pushState; popstate / pushstate events update the atom.

import { Atom } from "effect/unstable/reactivity"
import { Schema } from "effect"
// String param: ?q=...
const q = Atom.searchParam("q")
// read q => "" or the current ?q value
// set(q, "effect") => URL becomes ?q=effect
// Typed param via a string codec: ?page=3
const page = Atom.searchParam("page", { schema: Schema.NumberFromString })
// read page => Option<number> (Option.some(3) when ?page=3)
// set(page, Option.some(5)) => ?page=5; Option.none() clears the param

These APIs underpin server-side rendering: serialize atom state on the server, ship it to the client, and rehydrate. The atom-ref hydration helpers and the React integration build on top of them — see /reactivity/atom-ref/ (Hydration) and /reactivity/server-state/.

Attaches serialization metadata (a stable key plus a Schema) to an atom. The schema is converted to a JSON codec for synchronous encode/decode; the key also becomes the atom’s label if it has none. Use Schema from core effect.

import { Atom } from "effect/unstable/reactivity"
import { Schema } from "effect"
const count = Atom.make(0).pipe(
Atom.serializable({ key: "count", schema: Schema.Number })
)
// => count now carries { key: "count", encode, decode } metadata
// used to dehydrate/rehydrate this atom across the network

Type guard: returns true when an atom carries Serializable metadata.

import { Atom } from "effect/unstable/reactivity"
import { Schema } from "effect"
const plain = Atom.make(0)
const tagged = plain.pipe(Atom.serializable({ key: "x", schema: Schema.Number }))
Atom.isSerializable(plain) // => false
Atom.isSerializable(tagged) // => true

Serializable<S> is the interface describing the attached metadata (key, encode, decode); SerializableTypeId is the literal marker used by isSerializable. You normally interact with these through serializable rather than directly.

import { Atom } from "effect/unstable/reactivity"
Atom.SerializableTypeId // => "~effect-atom/atom/Atom/Serializable"
// Read the metadata off a serializable atom:
declare const tagged: Atom.Atom<number> & Atom.Serializable<typeof import("effect").Schema.Number>
tagged[Atom.SerializableTypeId].key // => the stable serialization key

Overrides what an atom reads on the server. The override receives a get function for reading other atoms and returns the server-side value — handy when a browser-only atom needs a deterministic value during SSR.

import { Atom } from "effect/unstable/reactivity"
const focus = Atom.windowFocusSignal.pipe(
Atom.withServerValue(() => 0) // no `window` on the server
)
// on the client => listens to visibilitychange
// on the server (via getServerValue) => 0

Convenience for AsyncResult atoms: sets the server-side value to AsyncResult.initial(true) (i.e. “waiting”), so SSR renders a loading state instead of running the effect on the server.

import { Atom } from "effect/unstable/reactivity"
const data = Atom.make(fetchData).pipe(Atom.withServerValueInitial)
// server read => AsyncResult.initial(true) (waiting); client read => runs the effect
declare const fetchData: import("effect").Effect.Effect<unknown>

Reads an atom from a registry, using its server-side override (from withServerValue) when present and falling back to a normal registry.get otherwise. Nested reads resolve against the same registry.

import { Atom } from "effect/unstable/reactivity"
import * as Registry from "effect/unstable/reactivity/AtomRegistry"
const registry = Registry.make()
const focus = Atom.windowFocusSignal.pipe(Atom.withServerValue(() => 0))
const value = Atom.getServerValue(focus, registry)
// => 0 (uses the server override instead of touching `window`)

The literal marker ("~effect-atom/atom/Atom/ServerValue") attached by withServerValue and checked by getServerValue. Rarely referenced directly.

import { Atom } from "effect/unstable/reactivity"
Atom.ServerValueTypeId // => "~effect-atom/atom/Atom/ServerValue"