Message storage and persistence
By default, a message sent to an entity is volatile. The client serializes the request, sends it over the wire to the runner that currently owns the entity’s shard, and waits for a reply. If that runner crashes — or the shard is reassigned — before the message is processed, the message is simply lost. For many request/response calls that is fine: the caller sees a failure and can retry.
For work that must survive a crash, mark the RPC as persisted. A persisted
message is written to a durable mailbox (a MessageStorage backing) before
it is handled. The owning runner then reads its mailbox for the shards it owns and
replays anything that was not yet completed. This is what makes the rest of the
durable-cluster features possible:
- At-least-once processing — a message stays in the mailbox until a terminal
reply (a
WithExit) is recorded, so a crash mid-handler means the message is redelivered, not dropped. - Deduplication — if the payload has a
PrimaryKey, a second send with the same key returns the original request’s id and last reply instead of enqueuing duplicate work. - Delayed delivery — a payload implementing
DeliverAtschedules when it becomes eligible for processing, which is how singletons and ClusterCron fire on a schedule. - Fire-and-forget — because the message is durable, the client can discard the reply (the entity will still process it after the client disconnects).
Enabling persistence
Section titled “Enabling persistence”Persistence is opt-in per RPC. Annotate the Rpc with ClusterSchema.Persisted
set to true:
import { Schema } from "effect"import { ClusterSchema } from "effect/unstable/cluster"import { Rpc } from "effect/unstable/rpc"
// A volatile message — lost if the owner dies before handling it.const Increment = Rpc.make("Increment", { payload: { amount: Schema.Number }, success: Schema.Number})
// A persisted message — written to the durable mailbox first, then replayed// after a crash for the shards the local runner owns.const ChargeCard = Rpc.make("ChargeCard", { payload: { cents: Schema.Number }, success: Schema.Void}).annotate(ClusterSchema.Persisted, true)ClusterSchema.Persisted is a Context.Reference<boolean> whose default is
false, so unannotated RPCs are volatile. (Two related annotations live in the
same module: ClusterSchema.WithTransaction wraps server handling in the storage
transaction, and ClusterSchema.Uninterruptible controls interrupt handling.)
Choosing a storage backing
Section titled “Choosing a storage backing”You pick the backing when you wire the cluster transport. With
NodeClusterSocket.layer the storage option selects the implementation:
import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"import { Layer } from "effect"import { PgClient } from "@effect/sql-pg"
// "sql" — durable. SqlMessageStorage is provided for you; you supply the// SqlClient. Migrations (cluster_messages, cluster_replies, cluster_migrations)// are created automatically on startup.const SqlLive = PgClient.layer({ database: "effect_cluster" // host / port / username / password as needed})
const ClusterLive = NodeClusterSocket.layer({ storage: "sql" }).pipe( Layer.provide(SqlLive))
NodeRuntime.runMain(Layer.launch(ClusterLive))The three values for storage:
"sql"(default) — providesSqlMessageStorage.layer(andSqlRunnerStorage.layerfor runner membership). Requires aSqlClientin context. Durable and multi-process safe."local"— providesMessageStorage.layerNoop. Nothing is persisted; persisted RPCs behave as volatile. Useful for short-lived single-process dev."byo"— provides nothing; you supply bothMessageStorageandRunnerStoragelayers. Use this to plug inMessageStorage.layerMemory(tests), a custom driver, or a non-default SQL prefix.
import { NodeClusterSocket } from "@effect/platform-node"import { Layer } from "effect"import { MessageStorage } from "effect/unstable/cluster"import { RunnerStorage } from "effect/unstable/cluster"
// "byo" — bring your own storage. Here, in-memory storage for an integration test.const TestStorage = Layer.mergeAll( MessageStorage.layerMemory, RunnerStorage.layerMemory)
const ClusterTest = NodeClusterSocket.layer({ storage: "byo" }).pipe( Layer.provide(TestStorage))The MessageStorage service
Section titled “The MessageStorage service”MessageStorage is a Context.Service (tag
"effect/cluster/MessageStorage"). It combines two responsibilities: durable
storage of requests/envelopes/replies, and process-local reply-handler
management that connects persisted replies to the fibers waiting for them on this
runner. The storage methods are listed first, then the constructors and layers.
import { Effect } from "effect"import { MessageStorage } from "effect/unstable/cluster"
const program = Effect.gen(function*() { const storage = yield* MessageStorage // storage.saveRequest, storage.repliesFor, storage.unprocessedMessages, ...})saveRequest
Section titled “saveRequest”Persists an OutgoingRequest (and its primary key / DeliverAt time) and returns
a SaveResult — Success for new work, or
Duplicate when the payload’s primary key matches an existing request.
// storage.saveRequest(outgoing)// => SaveResult.Success() (new request)// => SaveResult.Duplicate({ originalId, lastReceivedReply }) (deduped)saveEnvelope
Section titled “saveEnvelope”Persists a control OutgoingEnvelope (an AckChunk or Interrupt) rather than a
request. Returns void.
// storage.saveEnvelope(outgoingInterruptEnvelope) // => voidsaveReply
Section titled “saveReply”Persists a ReplyWithContext for a request. A terminal WithExit marks the
request processed; a streaming Chunk is appended and replayed until acknowledged.
Also notifies any locally registered reply handlers.
// storage.saveReply(replyWithContext) // => voidclearReplies
Section titled “clearReplies”Removes the stored replies for a request id, making the request unprocessed again so it can be replayed.
import { Snowflake } from "effect/unstable/cluster"// storage.clearReplies(Snowflake.Snowflake(123n)) // => voidrepliesFor
Section titled “repliesFor”Given outgoing requests, returns the decoded replies that still matter:
unacknowledged stream chunks and terminal WithExit replies. Used when resuming a
request to deliver what was already produced.
// storage.repliesFor([outgoing]) // => Array<Reply.Reply<R>>repliesForUnfiltered
Section titled “repliesForUnfiltered”Like repliesFor, but keyed by raw Snowflake request ids and returning encoded
replies without per-RPC filtering.
// storage.repliesForUnfiltered([Snowflake.Snowflake(123n)]) // => Array<Reply.Encoded>requestIdForPrimaryKey
Section titled “requestIdForPrimaryKey”Looks up the original request id for a primary key (entity address + RPC tag + payload key). This is the lookup behind duplicate detection.
// storage.requestIdForPrimaryKey({ address, tag: "ChargeCard", id: "order-1" })// => Option.some(Snowflake) | Option.none()registerReplyHandler / unregisterReplyHandler / unregisterShardReplyHandlers
Section titled “registerReplyHandler / unregisterReplyHandler / unregisterShardReplyHandlers”Process-local handler management. registerReplyHandler waits for replies to a
locally sent request; unregisterReplyHandler cancels one request’s handler;
unregisterShardReplyHandlers fails every handler for a shard (used when a shard is
reassigned away from this runner). All three are driven by Sharding.
// yield* storage.registerReplyHandler(outgoingOrIncomingRequest)// fails with EntityNotAssignedToRunner if the shard is unregistered firstunprocessedMessages / unprocessedMessagesById
Section titled “unprocessedMessages / unprocessedMessagesById”unprocessedMessages(shardIds) returns the incoming messages a runner still has to
process for the shards it owns: requests with no terminal reply (and no unacked
chunks), the latest AckChunk, and Interrupt envelopes for those requests.
unprocessedMessagesById does the same for specific request ids. Delayed-delivery
requests whose deliverAt is in the future are skipped until due.
import { ShardId } from "effect/unstable/cluster"// storage.unprocessedMessages([ShardId.make("default", 1)])// => Array<Message.Incoming<any>>resetShards / resetAddress / clearAddress
Section titled “resetShards / resetAddress / clearAddress”Recovery operations. resetShards / resetAddress mark mailbox messages
unprocessed again (so they are re-read) for a set of shards or one entity address;
clearAddress deletes all messages and replies for an address (used on entity
removal).
// storage.resetShards([ShardId.make("default", 1)]) // => void// storage.resetAddress(entityAddress) // => void// storage.clearAddress(entityAddress) // => voidThere is also withTransaction, which wraps an effect in the backing’s
transaction (a no-op for in-memory/no-op stores).
MessageStorage.make / makeEncoded
Section titled “MessageStorage.make / makeEncoded”Two constructors build the service. make takes a fully decoded storage object
(everything except the reply-handler methods, which it adds) and returns the
service. makeEncoded is the usual entry point for drivers: you implement the
lower-level Encoded contract (encoded envelopes
- replies) and it handles encode/decode, primary-key generation,
DeliverAtchecks, and duplicate decoding.
import { Effect } from "effect"import { MessageStorage } from "effect/unstable/cluster"
// Build a service from an encoded driver (needs a Snowflake.Generator in context).// const service = yield* MessageStorage.makeEncoded(myEncodedDriver)MessageStorage.noop / layerNoop
Section titled “MessageStorage.noop / layerNoop”noop is a MessageStorage service that persists nothing —
saveRequest always returns SaveResult.Success, reads return empty. layerNoop
provides it as a Layer. This backs the storage: "local" option.
import { MessageStorage } from "effect/unstable/cluster"
const NoStorage = MessageStorage.layerNoop // Layer<MessageStorage>MessageStorage.layerMemory + MemoryDriver
Section titled “MessageStorage.layerMemory + MemoryDriver”layerMemory provides an in-memory store and its inspectable backing,
MemoryDriver. It requires a ShardingConfig. The MemoryDriver service exposes
the internal maps (requests, requestsByPrimaryKey, unprocessed, journal)
so tests can assert on what was stored. MemoryEntry is the per-request record
(envelope, replies, lastReceivedChunk, deliverAt), and MemoryTransaction
is a Context.Reference<boolean> flipped to true inside withTransaction so
tests can confirm transactional wrapping.
import { Effect } from "effect"import { MessageStorage } from "effect/unstable/cluster"
const inspect = Effect.gen(function*() { const driver = yield* MessageStorage.MemoryDriver // driver.requests, driver.journal, driver.unprocessed, driver.requestsByPrimaryKey return driver.journal.length // => number of saved envelopes})
// Provide both MessageStorage and MemoryDriver (needs ShardingConfig).const TestLayer = MessageStorage.layerMemorySaveResult / SaveResultEncoded
Section titled “SaveResult / SaveResultEncoded”SaveResult is a Data.taggedEnum with two variants: Success (new entry) and
Duplicate (carries originalId and the decoded lastReceivedReply).
SaveResultEncoded is the driver-level form whose Duplicate carries an
encoded last reply. Match on the _tag after saveRequest:
import { Effect, Option } from "effect"import { MessageStorage } from "effect/unstable/cluster"
const handle = (storage: MessageStorage["Service"], outgoing: any) => Effect.gen(function*() { const result = yield* storage.saveRequest(outgoing) if (result._tag === "Duplicate") { // already enqueued — reuse the original id, replay its last reply result.originalId // => Snowflake result.lastReceivedReply // => Option<Reply.Reply<R>> } // _tag === "Success" => brand new request, will be processed })
// Driver-side constructors:// MessageStorage.SaveResult.Success()// MessageStorage.SaveResult.Duplicate({ originalId, lastReceivedReply })// MessageStorage.SaveResultEncoded.Duplicate({ originalId, lastReceivedReply })The Encoded driver interface
Section titled “The Encoded driver interface”Encoded is the contract a custom storage driver implements. It works entirely in
encoded terms: saveEnvelope({ envelope, primaryKey, deliverAt }), saveReply,
clearReplies, requestIdForPrimaryKey, repliesFor / repliesForUnfiltered
(both taking NonEmptyArray<string>), unprocessedMessages(shardIds, now) /
unprocessedMessagesById(ids, now), resetAddress, clearAddress, resetShards,
and withTransaction. Feed an implementation to makeEncoded to get a full
MessageStorage. EncodedUnprocessedOptions and EncodedRepliesOptions are cursor
shapes (existingShards/newShards or existingRequests/newRequests plus a
driver cursor) used for paginated reads.
import { Effect, Option } from "effect"import { MessageStorage } from "effect/unstable/cluster"
const driver: MessageStorage.Encoded = { saveEnvelope: ({ envelope, primaryKey, deliverAt }) => Effect.succeed(MessageStorage.SaveResultEncoded.Success()), saveReply: (_reply) => Effect.void, clearReplies: (_id) => Effect.void, requestIdForPrimaryKey: (_pk) => Effect.succeedNone, repliesFor: (_ids) => Effect.succeed([]), repliesForUnfiltered: (_ids) => Effect.succeed([]), unprocessedMessages: (_shards, _now) => Effect.succeed([]), unprocessedMessagesById: (_ids, _now) => Effect.succeed([]), resetAddress: (_addr) => Effect.void, clearAddress: (_addr) => Effect.void, resetShards: (_shards) => Effect.void, withTransaction: (effect) => effect}// const storage = yield* MessageStorage.makeEncoded(driver)SQL-backed storage (SqlMessageStorage)
Section titled “SQL-backed storage (SqlMessageStorage)”SqlMessageStorage is the production driver. It keeps two durable tables — a
messages table for requests and control envelopes, and a replies table for
chunk and final replies — plus a migrations table. Saving a request inserts into
messages; reading unprocessed messages claims due rows for the assigned shards
(using last_read as a ~10-minute lease so a stalled runner’s messages can be
redelivered); reading replies replays unacknowledged chunks and final exits.
SqlMessageStorage.make(options)
Section titled “SqlMessageStorage.make(options)”Builds the MessageStorage["Service"] directly, running migrations. The only
option is prefix (default "cluster"), which names the tables
<prefix>_messages, <prefix>_replies, and <prefix>_migrations. Requires a
SqlClient and a Snowflake.Generator in context. Use this when composing a
custom layer or supplying your own generator.
import { Effect } from "effect"import { SqlMessageStorage } from "effect/unstable/cluster"
// const storage = yield* SqlMessageStorage.make({ prefix: "orders" })// => MessageStorage service backed by orders_messages / orders_repliesReady-made layer using the default "cluster" prefix and the standard snowflake
generator. Still requires SqlClient and ShardingConfig. This is what
storage: "sql" provides.
import { SqlMessageStorage } from "effect/unstable/cluster"
const Storage = SqlMessageStorage.layer// Layer<MessageStorage, never, SqlClient | ShardingConfig>layerWith(options)
Section titled “layerWith(options)”Same as layer but with a custom table prefix. Pick a stable prefix before
deployment — changing it later points the runtime at a different (empty) set of
tables and migration history.
import { SqlMessageStorage } from "effect/unstable/cluster"
const Storage = SqlMessageStorage.layerWith({ prefix: "orders" })SqlRunnerStorage (durable runner membership)
Section titled “SqlRunnerStorage (durable runner membership)”A persisted mailbox is only half of durable cluster state — runner membership and
shard ownership must also be durable so a restarted process rejoins the same
shards. SqlRunnerStorage.make({ prefix }), SqlRunnerStorage.layer, and
SqlRunnerStorage.layerWith({ prefix }) provide that, creating <prefix>_runners
and <prefix>_locks tables (PostgreSQL/MySQL use advisory locks unless
ShardingConfig.shardLockDisableAdvisory is set). storage: "sql" wires this for
you. See Sharding and runners for the bigger
picture.
import { SqlRunnerStorage } from "effect/unstable/cluster"
const Runners = SqlRunnerStorage.layer// Layer<RunnerStorage, SqlError, SqlClient | ShardingConfig>Message and reply types (the wire/storage model)
Section titled “Message and reply types (the wire/storage model)”These are the shapes a mailbox actually stores. App code rarely constructs them,
but recognizing them helps when reading the tables or a custom driver. They live in
the Message, Reply, Envelope, and DeliverAt modules.
Message
Section titled “Message”Message.Incoming<R> is what a runner reads from storage/transport: either an
IncomingRequest<R> (a PartialRequest envelope whose payload is still encoded,
plus a respond callback and lastSentReply) or an IncomingEnvelope (a control
AckChunk/Interrupt). Message.IncomingLocal<R> is the decoded in-process form
(IncomingRequestLocal<R> | IncomingEnvelope). Message.Outgoing<R> is what a
caller produces: OutgoingRequest<R> (decoded payload + RPC + context +
lastReceivedReply) or OutgoingEnvelope (with OutgoingEnvelope.interrupt({ address, id, requestId })). Serialization helpers convert between them:
import { Effect } from "effect"import { Message } from "effect/unstable/cluster"
// const partial = yield* Message.serialize(outgoing) // => Envelope.Partial// const encoded = yield* Message.serializeEnvelope(outgoing) // => Envelope.Encoded (JSON)// const partialReq = yield* Message.serializeRequest(outgoingRequest) // => Envelope.PartialRequest// const local = yield* Message.deserializeLocal(outgoing, partial) // => Message.IncomingLocalconst _ = Message.incomingLocalFromOutgoing // Outgoing<R> -> IncomingLocal<R>A Reply.Reply<R> is either a terminal Reply.WithExit<R> (carries the final
Exit) or a streaming Reply.Chunk<R> (a sequenced, non-empty batch of success
values, replayed until acknowledged). Reply.Encoded is the JSON form
(WithExitEncoded | ChunkEncoded). Reply.ReplyWithContext<R> pairs a reply with
the RPC + context needed to serialize it (with ReplyWithContext.fromDefect /
.interrupt helpers). Reply.Reply(rpc) builds the transport codec.
import { Effect } from "effect"import { Reply } from "effect/unstable/cluster"
// const encoded = yield* Reply.serialize(replyWithContext) // => Reply.Encoded// const last = yield* Reply.serializeLastReceived(outgoing) // => Option<Reply.Encoded>Reply.isReply(undefined) // => falseEnvelope
Section titled “Envelope”Envelope.Envelope<R> is the transport message: Request<R> (request id, entity
address, RPC tag, decoded payload, headers, trace context), AckChunk (acks a
received stream chunk), or Interrupt. Build a request with
Envelope.makeRequest(...). Envelope.Partial / PartialJson decode/encode the
metadata while leaving the payload unknown; PartialArray batches them.
Envelope.primaryKey(envelope) / primaryKeyByAddress({ address, tag, id })
compute the dedup key (entityType/entityId/tag/id), and Envelope.isEnvelope
guards values.
import { Envelope } from "effect/unstable/cluster"
Envelope.primaryKeyByAddress({ address: { entityType: "Order", entityId: "order-1" } as any, tag: "ChargeCard", id: "charge-1"})// => "Order/order-1/ChargeCard/charge-1"
Envelope.isEnvelope({}) // => falseDeliverAt
Section titled “DeliverAt”A payload schedules its own delivery time by implementing the DeliverAt.symbol
method (returning a DateTime). MessageStorage reads it via
DeliverAt.toMillis(payload) when saving, storing it as deliver_at; the message
stays out of unprocessedMessages until that instant. DeliverAt.isDeliverAt
guards a value. This is the mechanism behind scheduled cron jobs — see
Singletons and ClusterCron.
import { DateTime, Schema } from "effect"import { DeliverAt } from "effect/unstable/cluster"import { PrimaryKey } from "effect"
// A persisted payload that asks to be delivered at a specific time, and// deduplicates on its key.class RunAt extends Schema.Class<RunAt>("RunAt")({ at: Schema.DateTimeUtc, key: Schema.String}) { [PrimaryKey.symbol]() { return this.key } [DeliverAt.symbol]() { return this.at }}
DeliverAt.toMillis(new RunAt({ at: DateTime.makeUnsafe("2030-01-01"), key: "k" }))// => 1893456000000 (epoch ms; null for payloads that don't implement DeliverAt)DeliverAt.toMillis({ no: "deliverAt" }) // => nullNext steps
Section titled “Next steps”- Entities — where the
ClusterSchema.Persistedannotation is applied to your RPCs. - Singletons and ClusterCron — how
DeliverAtand persisted messages power scheduled, exactly-one-runner work. - Sharding and runners — runner membership and shard ownership, the durable state that sits alongside the mailbox.