Entities
An entity is a sharded, addressable actor. You give it a stable type name and an
RPC protocol; the cluster then routes every message for a given entity id
to a single live instance. While that instance is active it can hold in-memory state
(a Ref, an STM TxRef, an open connection); when it sits idle long enough it is
passivated — stopped and recreated on demand the next time a message arrives.
This page builds a small counter entity end to end: define its messages, implement its handlers, call it from a client, and assemble both a production and a test cluster.
A complete entity
Section titled “A complete entity”import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"import { Effect, Layer, Ref, Schema } from "effect"import { ClusterSchema, Entity, TestRunner } from "effect/unstable/cluster"import { Rpc } from "effect/unstable/rpc"import type { SqlClient } from "effect/unstable/sql"
// --- 1. The protocol -------------------------------------------------------// Each message is an Rpc with a typed payload and success schema.const Increment = Rpc.make("Increment", { payload: { amount: Schema.Number }, success: Schema.Number})
const GetCount = Rpc.make("GetCount", { success: Schema.Number}) // By default messages are volatile: they are sent over the network and lost if // the runner dies before processing. Annotating the Rpc with // `ClusterSchema.Persisted` saves it to durable mailbox storage first, so it // survives a crash and is retried. .annotate(ClusterSchema.Persisted, true)
// --- 2. The entity ---------------------------------------------------------// `Entity.make` pairs a stable type name with the array of Rpcs it handles.const Counter = Entity.make("Counter", [Increment, GetCount])
// --- 3. The handlers -------------------------------------------------------// `toLayer` registers the entity with Sharding. The build effect runs once per// active entity instance, so anything created here (the Ref) is that instance's// private, in-memory state.const CounterLayer = Counter.toLayer( Effect.gen(function*() { const count = yield* Ref.make(0)
return Counter.of({ Increment: ({ payload }) => Ref.updateAndGet(count, (n) => n + payload.amount), GetCount: () => Ref.get(count).pipe( // Handlers for one entity run sequentially by default, which is what // gives you lock-free access to `count`. `Rpc.fork` opts a read-only // handler out of that ordering so it can run concurrently. Rpc.fork ) }) }), // If the entity receives no messages for this long, it is passivated. The next // message recreates it (with a fresh count of 0). { maxIdleTime: "5 minutes" })
// --- 4. Calling the entity -------------------------------------------------// `Counter.client` yields a function from entity id to a typed RPC client.const useCounter = Effect.gen(function*() { const clientFor = yield* Counter.client const counter = clientFor("counter-123")
const afterIncrement = yield* counter.Increment({ amount: 1 }) const current = yield* counter.GetCount() yield* Effect.log(`after increment: ${afterIncrement}, current: ${current}`)})
// --- 5. Wiring a cluster ---------------------------------------------------// In production, `NodeClusterSocket.layer` provides the socket transport. By// default it persists messages to SQL, so it needs a SqlClient.declare const SqlClientLayer: Layer.Layer<SqlClient.SqlClient>
const ClusterLayer = NodeClusterSocket.layer().pipe( Layer.provide(SqlClientLayer))
// Merge every entity layer, then provide the cluster transport + storage.const EntitiesLayer = Layer.mergeAll(CounterLayer)
const ProductionLayer = EntitiesLayer.pipe( Layer.provide(ClusterLayer))
// `Layer.launch` keeps the runner alive to serve messages until interrupted.Layer.launch(ProductionLayer).pipe(NodeRuntime.runMain)How it fits together
Section titled “How it fits together”-
Define the protocol with RPCs. Each
Rpc.makedeclares a message name, itspayloadschema, and itssuccessschema (and optionally anerrorschema). Because the protocol is plain Schema, messages serialize for the wire and for storage automatically. -
Create the entity with
Entity.make("Type", [...rpcs]). The type name must be stable and unique across your deployment — it participates in routing, so renaming it moves work to different shards. -
Implement handlers with
entity.toLayer(build, options). Thebuildeffect produces the handler record viaentity.of({ ... }). It runs once per active instance, which is where you allocate per-id state. -
Send messages through
entity.client. You get a function(id) => client; the returned client exposes one method per Rpc, returning anEffect. The cluster routes the call to the instance that ownsid. -
Provide a cluster layer that supplies the transport and storage, then
Layer.launchit.
State and concurrency
Section titled “State and concurrency”The build effect runs per active entity instance, so state declared inside it — the
Ref above — belongs to a single id and a single owner. By default, all handlers
for one entity instance run sequentially, which is what makes lock-free state safe.
Reach for Ref for simple values, or an STM
TxRef when a handler needs to coordinate several pieces of state
atomically.
Wrap a read-only handler with Rpc.fork (as GetCount does) to let it run
concurrently with the sequential ones. You can also tune ordering and back pressure
with toLayer options such as concurrency and mailboxCapacity.
Volatile vs. persisted messages
Section titled “Volatile vs. persisted messages”By default messages are volatile: sent directly over the network, and lost if the
owning runner dies before processing. Annotating an Rpc with ClusterSchema.Persisted
writes the message to durable mailbox storage first, so it survives crashes and is
retried after failover.
Running entities in tests
Section titled “Running entities in tests”You don’t need sockets or a database to exercise an entity. TestRunner.layer
assembles a single in-process cluster with in-memory message and runner storage — the
same entity runtime model, no network.
import { Layer } from "effect"import { TestRunner } from "effect/unstable/cluster"
// `EntitiesLayer` is the merged entity layers from the example above.const TestLayer = EntitiesLayer.pipe( // `provideMerge` keeps the cluster services (Sharding, MessageStorage, ...) in // the result so a test can also inspect storage directly. Layer.provideMerge(TestRunner.layer))Provide TestLayer to a test that resolves Counter.client and asserts on the
results. Because the in-memory storage is scoped to the layer, each test gets a fresh,
isolated cluster. See Testing for the it.effect harness used to run
effects in tests.
Reading the current address
Section titled “Reading the current address”Inside a handler you sometimes need the entity’s own identity — for logging, metrics,
or routing decisions. Yield Entity.CurrentAddress to read the
EntityAddress (its type, id, and shard) currently being processed.
import { Effect } from "effect"import { Entity } from "effect/unstable/cluster"
const handler = Effect.gen(function*() { const address = yield* Entity.CurrentAddress yield* Effect.log(`handling ${address.entityType}/${address.entityId}`)})Next steps
Section titled “Next steps”- For one cluster-wide background process rather than many addressable ids, use
Singletonfromeffect/unstable/cluster— see the Cluster overview. - The entity protocol is ordinary RPC; the same
Rpc.makedefinitions can back an HTTP API too. - Durable mailbox storage is backed by SQL.