Skip to content

Message storage and persistence

By default, a message sent to an entity is volatile. The client serializes the request, sends it over the wire to the runner that currently owns the entity’s shard, and waits for a reply. If that runner crashes — or the shard is reassigned — before the message is processed, the message is simply lost. For many request/response calls that is fine: the caller sees a failure and can retry.

For work that must survive a crash, mark the RPC as persisted. A persisted message is written to a durable mailbox (a MessageStorage backing) before it is handled. The owning runner then reads its mailbox for the shards it owns and replays anything that was not yet completed. This is what makes the rest of the durable-cluster features possible:

  • At-least-once processing — a message stays in the mailbox until a terminal reply (a WithExit) is recorded, so a crash mid-handler means the message is redelivered, not dropped.
  • Deduplication — if the payload has a PrimaryKey, a second send with the same key returns the original request’s id and last reply instead of enqueuing duplicate work.
  • Delayed delivery — a payload implementing DeliverAt schedules when it becomes eligible for processing, which is how singletons and ClusterCron fire on a schedule.
  • Fire-and-forget — because the message is durable, the client can discard the reply (the entity will still process it after the client disconnects).

Persistence is opt-in per RPC. Annotate the Rpc with ClusterSchema.Persisted set to true:

import { Schema } from "effect"
import { ClusterSchema } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"
// A volatile message — lost if the owner dies before handling it.
const Increment = Rpc.make("Increment", {
payload: { amount: Schema.Number },
success: Schema.Number
})
// A persisted message — written to the durable mailbox first, then replayed
// after a crash for the shards the local runner owns.
const ChargeCard = Rpc.make("ChargeCard", {
payload: { cents: Schema.Number },
success: Schema.Void
}).annotate(ClusterSchema.Persisted, true)

ClusterSchema.Persisted is a Context.Reference<boolean> whose default is false, so unannotated RPCs are volatile. (Two related annotations live in the same module: ClusterSchema.WithTransaction wraps server handling in the storage transaction, and ClusterSchema.Uninterruptible controls interrupt handling.)

You pick the backing when you wire the cluster transport. With NodeClusterSocket.layer the storage option selects the implementation:

import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"
import { Layer } from "effect"
import { PgClient } from "@effect/sql-pg"
// "sql" — durable. SqlMessageStorage is provided for you; you supply the
// SqlClient. Migrations (cluster_messages, cluster_replies, cluster_migrations)
// are created automatically on startup.
const SqlLive = PgClient.layer({
database: "effect_cluster"
// host / port / username / password as needed
})
const ClusterLive = NodeClusterSocket.layer({ storage: "sql" }).pipe(
Layer.provide(SqlLive)
)
NodeRuntime.runMain(Layer.launch(ClusterLive))

The three values for storage:

  • "sql" (default) — provides SqlMessageStorage.layer (and SqlRunnerStorage.layer for runner membership). Requires a SqlClient in context. Durable and multi-process safe.
  • "local" — provides MessageStorage.layerNoop. Nothing is persisted; persisted RPCs behave as volatile. Useful for short-lived single-process dev.
  • "byo" — provides nothing; you supply both MessageStorage and RunnerStorage layers. Use this to plug in MessageStorage.layerMemory (tests), a custom driver, or a non-default SQL prefix.
import { NodeClusterSocket } from "@effect/platform-node"
import { Layer } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
import { RunnerStorage } from "effect/unstable/cluster"
// "byo" — bring your own storage. Here, in-memory storage for an integration test.
const TestStorage = Layer.mergeAll(
MessageStorage.layerMemory,
RunnerStorage.layerMemory
)
const ClusterTest = NodeClusterSocket.layer({ storage: "byo" }).pipe(
Layer.provide(TestStorage)
)

MessageStorage is a Context.Service (tag "effect/cluster/MessageStorage"). It combines two responsibilities: durable storage of requests/envelopes/replies, and process-local reply-handler management that connects persisted replies to the fibers waiting for them on this runner. The storage methods are listed first, then the constructors and layers.

import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
const program = Effect.gen(function*() {
const storage = yield* MessageStorage
// storage.saveRequest, storage.repliesFor, storage.unprocessedMessages, ...
})

Persists an OutgoingRequest (and its primary key / DeliverAt time) and returns a SaveResultSuccess for new work, or Duplicate when the payload’s primary key matches an existing request.

// storage.saveRequest(outgoing)
// => SaveResult.Success() (new request)
// => SaveResult.Duplicate({ originalId, lastReceivedReply }) (deduped)

Persists a control OutgoingEnvelope (an AckChunk or Interrupt) rather than a request. Returns void.

// storage.saveEnvelope(outgoingInterruptEnvelope) // => void

Persists a ReplyWithContext for a request. A terminal WithExit marks the request processed; a streaming Chunk is appended and replayed until acknowledged. Also notifies any locally registered reply handlers.

// storage.saveReply(replyWithContext) // => void

Removes the stored replies for a request id, making the request unprocessed again so it can be replayed.

import { Snowflake } from "effect/unstable/cluster"
// storage.clearReplies(Snowflake.Snowflake(123n)) // => void

Given outgoing requests, returns the decoded replies that still matter: unacknowledged stream chunks and terminal WithExit replies. Used when resuming a request to deliver what was already produced.

// storage.repliesFor([outgoing]) // => Array<Reply.Reply<R>>

Like repliesFor, but keyed by raw Snowflake request ids and returning encoded replies without per-RPC filtering.

// storage.repliesForUnfiltered([Snowflake.Snowflake(123n)]) // => Array<Reply.Encoded>

Looks up the original request id for a primary key (entity address + RPC tag + payload key). This is the lookup behind duplicate detection.

// storage.requestIdForPrimaryKey({ address, tag: "ChargeCard", id: "order-1" })
// => Option.some(Snowflake) | Option.none()

registerReplyHandler / unregisterReplyHandler / unregisterShardReplyHandlers

Section titled “registerReplyHandler / unregisterReplyHandler / unregisterShardReplyHandlers”

Process-local handler management. registerReplyHandler waits for replies to a locally sent request; unregisterReplyHandler cancels one request’s handler; unregisterShardReplyHandlers fails every handler for a shard (used when a shard is reassigned away from this runner). All three are driven by Sharding.

// yield* storage.registerReplyHandler(outgoingOrIncomingRequest)
// fails with EntityNotAssignedToRunner if the shard is unregistered first

unprocessedMessages / unprocessedMessagesById

Section titled “unprocessedMessages / unprocessedMessagesById”

unprocessedMessages(shardIds) returns the incoming messages a runner still has to process for the shards it owns: requests with no terminal reply (and no unacked chunks), the latest AckChunk, and Interrupt envelopes for those requests. unprocessedMessagesById does the same for specific request ids. Delayed-delivery requests whose deliverAt is in the future are skipped until due.

import { ShardId } from "effect/unstable/cluster"
// storage.unprocessedMessages([ShardId.make("default", 1)])
// => Array<Message.Incoming<any>>

Recovery operations. resetShards / resetAddress mark mailbox messages unprocessed again (so they are re-read) for a set of shards or one entity address; clearAddress deletes all messages and replies for an address (used on entity removal).

// storage.resetShards([ShardId.make("default", 1)]) // => void
// storage.resetAddress(entityAddress) // => void
// storage.clearAddress(entityAddress) // => void

There is also withTransaction, which wraps an effect in the backing’s transaction (a no-op for in-memory/no-op stores).

Two constructors build the service. make takes a fully decoded storage object (everything except the reply-handler methods, which it adds) and returns the service. makeEncoded is the usual entry point for drivers: you implement the lower-level Encoded contract (encoded envelopes

  • replies) and it handles encode/decode, primary-key generation, DeliverAt checks, and duplicate decoding.
import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
// Build a service from an encoded driver (needs a Snowflake.Generator in context).
// const service = yield* MessageStorage.makeEncoded(myEncodedDriver)

noop is a MessageStorage service that persists nothing — saveRequest always returns SaveResult.Success, reads return empty. layerNoop provides it as a Layer. This backs the storage: "local" option.

import { MessageStorage } from "effect/unstable/cluster"
const NoStorage = MessageStorage.layerNoop // Layer<MessageStorage>

layerMemory provides an in-memory store and its inspectable backing, MemoryDriver. It requires a ShardingConfig. The MemoryDriver service exposes the internal maps (requests, requestsByPrimaryKey, unprocessed, journal) so tests can assert on what was stored. MemoryEntry is the per-request record (envelope, replies, lastReceivedChunk, deliverAt), and MemoryTransaction is a Context.Reference<boolean> flipped to true inside withTransaction so tests can confirm transactional wrapping.

import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
const inspect = Effect.gen(function*() {
const driver = yield* MessageStorage.MemoryDriver
// driver.requests, driver.journal, driver.unprocessed, driver.requestsByPrimaryKey
return driver.journal.length // => number of saved envelopes
})
// Provide both MessageStorage and MemoryDriver (needs ShardingConfig).
const TestLayer = MessageStorage.layerMemory

SaveResult is a Data.taggedEnum with two variants: Success (new entry) and Duplicate (carries originalId and the decoded lastReceivedReply). SaveResultEncoded is the driver-level form whose Duplicate carries an encoded last reply. Match on the _tag after saveRequest:

import { Effect, Option } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
const handle = (storage: MessageStorage["Service"], outgoing: any) =>
Effect.gen(function*() {
const result = yield* storage.saveRequest(outgoing)
if (result._tag === "Duplicate") {
// already enqueued — reuse the original id, replay its last reply
result.originalId // => Snowflake
result.lastReceivedReply // => Option<Reply.Reply<R>>
}
// _tag === "Success" => brand new request, will be processed
})
// Driver-side constructors:
// MessageStorage.SaveResult.Success()
// MessageStorage.SaveResult.Duplicate({ originalId, lastReceivedReply })
// MessageStorage.SaveResultEncoded.Duplicate({ originalId, lastReceivedReply })

Encoded is the contract a custom storage driver implements. It works entirely in encoded terms: saveEnvelope({ envelope, primaryKey, deliverAt }), saveReply, clearReplies, requestIdForPrimaryKey, repliesFor / repliesForUnfiltered (both taking NonEmptyArray<string>), unprocessedMessages(shardIds, now) / unprocessedMessagesById(ids, now), resetAddress, clearAddress, resetShards, and withTransaction. Feed an implementation to makeEncoded to get a full MessageStorage. EncodedUnprocessedOptions and EncodedRepliesOptions are cursor shapes (existingShards/newShards or existingRequests/newRequests plus a driver cursor) used for paginated reads.

import { Effect, Option } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
const driver: MessageStorage.Encoded = {
saveEnvelope: ({ envelope, primaryKey, deliverAt }) =>
Effect.succeed(MessageStorage.SaveResultEncoded.Success()),
saveReply: (_reply) => Effect.void,
clearReplies: (_id) => Effect.void,
requestIdForPrimaryKey: (_pk) => Effect.succeedNone,
repliesFor: (_ids) => Effect.succeed([]),
repliesForUnfiltered: (_ids) => Effect.succeed([]),
unprocessedMessages: (_shards, _now) => Effect.succeed([]),
unprocessedMessagesById: (_ids, _now) => Effect.succeed([]),
resetAddress: (_addr) => Effect.void,
clearAddress: (_addr) => Effect.void,
resetShards: (_shards) => Effect.void,
withTransaction: (effect) => effect
}
// const storage = yield* MessageStorage.makeEncoded(driver)

SqlMessageStorage is the production driver. It keeps two durable tables — a messages table for requests and control envelopes, and a replies table for chunk and final replies — plus a migrations table. Saving a request inserts into messages; reading unprocessed messages claims due rows for the assigned shards (using last_read as a ~10-minute lease so a stalled runner’s messages can be redelivered); reading replies replays unacknowledged chunks and final exits.

Builds the MessageStorage["Service"] directly, running migrations. The only option is prefix (default "cluster"), which names the tables <prefix>_messages, <prefix>_replies, and <prefix>_migrations. Requires a SqlClient and a Snowflake.Generator in context. Use this when composing a custom layer or supplying your own generator.

import { Effect } from "effect"
import { SqlMessageStorage } from "effect/unstable/cluster"
// const storage = yield* SqlMessageStorage.make({ prefix: "orders" })
// => MessageStorage service backed by orders_messages / orders_replies

Ready-made layer using the default "cluster" prefix and the standard snowflake generator. Still requires SqlClient and ShardingConfig. This is what storage: "sql" provides.

import { SqlMessageStorage } from "effect/unstable/cluster"
const Storage = SqlMessageStorage.layer
// Layer<MessageStorage, never, SqlClient | ShardingConfig>

Same as layer but with a custom table prefix. Pick a stable prefix before deployment — changing it later points the runtime at a different (empty) set of tables and migration history.

import { SqlMessageStorage } from "effect/unstable/cluster"
const Storage = SqlMessageStorage.layerWith({ prefix: "orders" })

SqlRunnerStorage (durable runner membership)

Section titled “SqlRunnerStorage (durable runner membership)”

A persisted mailbox is only half of durable cluster state — runner membership and shard ownership must also be durable so a restarted process rejoins the same shards. SqlRunnerStorage.make({ prefix }), SqlRunnerStorage.layer, and SqlRunnerStorage.layerWith({ prefix }) provide that, creating <prefix>_runners and <prefix>_locks tables (PostgreSQL/MySQL use advisory locks unless ShardingConfig.shardLockDisableAdvisory is set). storage: "sql" wires this for you. See Sharding and runners for the bigger picture.

import { SqlRunnerStorage } from "effect/unstable/cluster"
const Runners = SqlRunnerStorage.layer
// Layer<RunnerStorage, SqlError, SqlClient | ShardingConfig>

Message and reply types (the wire/storage model)

Section titled “Message and reply types (the wire/storage model)”

These are the shapes a mailbox actually stores. App code rarely constructs them, but recognizing them helps when reading the tables or a custom driver. They live in the Message, Reply, Envelope, and DeliverAt modules.

Message.Incoming<R> is what a runner reads from storage/transport: either an IncomingRequest<R> (a PartialRequest envelope whose payload is still encoded, plus a respond callback and lastSentReply) or an IncomingEnvelope (a control AckChunk/Interrupt). Message.IncomingLocal<R> is the decoded in-process form (IncomingRequestLocal<R> | IncomingEnvelope). Message.Outgoing<R> is what a caller produces: OutgoingRequest<R> (decoded payload + RPC + context + lastReceivedReply) or OutgoingEnvelope (with OutgoingEnvelope.interrupt({ address, id, requestId })). Serialization helpers convert between them:

import { Effect } from "effect"
import { Message } from "effect/unstable/cluster"
// const partial = yield* Message.serialize(outgoing) // => Envelope.Partial
// const encoded = yield* Message.serializeEnvelope(outgoing) // => Envelope.Encoded (JSON)
// const partialReq = yield* Message.serializeRequest(outgoingRequest) // => Envelope.PartialRequest
// const local = yield* Message.deserializeLocal(outgoing, partial) // => Message.IncomingLocal
const _ = Message.incomingLocalFromOutgoing // Outgoing<R> -> IncomingLocal<R>

A Reply.Reply<R> is either a terminal Reply.WithExit<R> (carries the final Exit) or a streaming Reply.Chunk<R> (a sequenced, non-empty batch of success values, replayed until acknowledged). Reply.Encoded is the JSON form (WithExitEncoded | ChunkEncoded). Reply.ReplyWithContext<R> pairs a reply with the RPC + context needed to serialize it (with ReplyWithContext.fromDefect / .interrupt helpers). Reply.Reply(rpc) builds the transport codec.

import { Effect } from "effect"
import { Reply } from "effect/unstable/cluster"
// const encoded = yield* Reply.serialize(replyWithContext) // => Reply.Encoded
// const last = yield* Reply.serializeLastReceived(outgoing) // => Option<Reply.Encoded>
Reply.isReply(undefined) // => false

Envelope.Envelope<R> is the transport message: Request<R> (request id, entity address, RPC tag, decoded payload, headers, trace context), AckChunk (acks a received stream chunk), or Interrupt. Build a request with Envelope.makeRequest(...). Envelope.Partial / PartialJson decode/encode the metadata while leaving the payload unknown; PartialArray batches them. Envelope.primaryKey(envelope) / primaryKeyByAddress({ address, tag, id }) compute the dedup key (entityType/entityId/tag/id), and Envelope.isEnvelope guards values.

import { Envelope } from "effect/unstable/cluster"
Envelope.primaryKeyByAddress({
address: { entityType: "Order", entityId: "order-1" } as any,
tag: "ChargeCard",
id: "charge-1"
})
// => "Order/order-1/ChargeCard/charge-1"
Envelope.isEnvelope({}) // => false

A payload schedules its own delivery time by implementing the DeliverAt.symbol method (returning a DateTime). MessageStorage reads it via DeliverAt.toMillis(payload) when saving, storing it as deliver_at; the message stays out of unprocessedMessages until that instant. DeliverAt.isDeliverAt guards a value. This is the mechanism behind scheduled cron jobs — see Singletons and ClusterCron.

import { DateTime, Schema } from "effect"
import { DeliverAt } from "effect/unstable/cluster"
import { PrimaryKey } from "effect"
// A persisted payload that asks to be delivered at a specific time, and
// deduplicates on its key.
class RunAt extends Schema.Class<RunAt>("RunAt")({
at: Schema.DateTimeUtc,
key: Schema.String
}) {
[PrimaryKey.symbol]() {
return this.key
}
[DeliverAt.symbol]() {
return this.at
}
}
DeliverAt.toMillis(new RunAt({ at: DateTime.makeUnsafe("2030-01-01"), key: "k" }))
// => 1893456000000 (epoch ms; null for payloads that don't implement DeliverAt)
DeliverAt.toMillis({ no: "deliverAt" }) // => null
  • Entities — where the ClusterSchema.Persisted annotation is applied to your RPCs.
  • Singletons and ClusterCron — how DeliverAt and persisted messages power scheduled, exactly-one-runner work.
  • Sharding and runners — runner membership and shard ownership, the durable state that sits alongside the mailbox.