# Message storage and persistence

By default, a message sent to an [entity](https://effect.plants.sh/cluster/entities/) is **volatile**. The
client serializes the request, sends it over the wire to the runner that currently
owns the entity's shard, and waits for a reply. If that runner crashes — or the
shard is reassigned — before the message is processed, the message is simply lost.
For many request/response calls that is fine: the caller sees a failure and can
retry.

For work that must *survive a crash*, mark the RPC as **persisted**. A persisted
message is written to a durable **mailbox** (a `MessageStorage` backing) *before*
it is handled. The owning runner then reads its mailbox for the shards it owns and
replays anything that was not yet completed. This is what makes the rest of the
durable-cluster features possible:

- **At-least-once processing** — a message stays in the mailbox until a terminal
  reply (a `WithExit`) is recorded, so a crash mid-handler means the message is
  redelivered, not dropped.
- **Deduplication** — if the payload has a [`PrimaryKey`](https://effect.plants.sh/schema/), a second send
  with the same key returns the *original* request's id and last reply instead of
  enqueuing duplicate work.
- **Delayed delivery** — a payload implementing [`DeliverAt`](#deliverat) schedules
  *when* it becomes eligible for processing, which is how [singletons and
  ClusterCron](https://effect.plants.sh/cluster/singletons-and-cron/) fire on a schedule.
- **Fire-and-forget** — because the message is durable, the client can discard the
  reply (the entity will still process it after the client disconnects).
**Persistence needs storage:** Persisted messages require a `MessageStorage` implementation that can actually save
the mailbox. The default Node layer uses **SQL** (`SqlMessageStorage`). The `local`
storage option is a *no-op* store — it satisfies the type but does not persist
anything, so persisted delivery degrades to volatile.

## Enabling persistence

Persistence is opt-in per RPC. Annotate the `Rpc` with `ClusterSchema.Persisted`
set to `true`:

```ts
import { Schema } from "effect"
import { ClusterSchema } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"

// A volatile message — lost if the owner dies before handling it.
const Increment = Rpc.make("Increment", {
  payload: { amount: Schema.Number },
  success: Schema.Number
})

// A persisted message — written to the durable mailbox first, then replayed
// after a crash for the shards the local runner owns.
const ChargeCard = Rpc.make("ChargeCard", {
  payload: { cents: Schema.Number },
  success: Schema.Void
}).annotate(ClusterSchema.Persisted, true)
```

`ClusterSchema.Persisted` is a `Context.Reference<boolean>` whose default is
`false`, so unannotated RPCs are volatile. (Two related annotations live in the
same module: `ClusterSchema.WithTransaction` wraps server handling in the storage
transaction, and `ClusterSchema.Uninterruptible` controls interrupt handling.)

### Choosing a storage backing

You pick the backing when you wire the cluster transport. With
`NodeClusterSocket.layer` the `storage` option selects the implementation:

```ts
import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"
import { Layer } from "effect"
import { PgClient } from "@effect/sql-pg"

// "sql" — durable. SqlMessageStorage is provided for you; you supply the
// SqlClient. Migrations (cluster_messages, cluster_replies, cluster_migrations)
// are created automatically on startup.
const SqlLive = PgClient.layer({
  database: "effect_cluster"
  // host / port / username / password as needed
})

const ClusterLive = NodeClusterSocket.layer({ storage: "sql" }).pipe(
  Layer.provide(SqlLive)
)

NodeRuntime.runMain(Layer.launch(ClusterLive))
```

The three values for `storage`:

- **`"sql"`** (default) — provides `SqlMessageStorage.layer` (and
  `SqlRunnerStorage.layer` for runner membership). Requires a `SqlClient` in
  context. Durable and multi-process safe.
- **`"local"`** — provides `MessageStorage.layerNoop`. Nothing is persisted;
  persisted RPCs behave as volatile. Useful for short-lived single-process dev.
- **`"byo"`** — provides nothing; *you* supply both `MessageStorage` and
  `RunnerStorage` layers. Use this to plug in `MessageStorage.layerMemory` (tests),
  a custom driver, or a non-default SQL prefix.

```ts
import { NodeClusterSocket } from "@effect/platform-node"
import { Layer } from "effect"
import { MessageStorage } from "effect/unstable/cluster"
import { RunnerStorage } from "effect/unstable/cluster"

// "byo" — bring your own storage. Here, in-memory storage for an integration test.
const TestStorage = Layer.mergeAll(
  MessageStorage.layerMemory,
  RunnerStorage.layerMemory
)

const ClusterTest = NodeClusterSocket.layer({ storage: "byo" }).pipe(
  Layer.provide(TestStorage)
)
```
**Note:** Most apps never touch the `MessageStorage` service directly — `Sharding` drives it.
The reference below documents the API so you can recognize stored shapes, write a
custom driver, or inspect a mailbox in a test.

## The MessageStorage service

`MessageStorage` is a `Context.Service` (tag
`"effect/cluster/MessageStorage"`). It combines two responsibilities: durable
storage of requests/envelopes/replies, and *process-local* reply-handler
management that connects persisted replies to the fibers waiting for them on this
runner. The storage methods are listed first, then the constructors and layers.

```ts
import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"

const program = Effect.gen(function*() {
  const storage = yield* MessageStorage
  // storage.saveRequest, storage.repliesFor, storage.unprocessedMessages, ...
})
```

### saveRequest

Persists an `OutgoingRequest` (and its primary key / `DeliverAt` time) and returns
a [`SaveResult`](#saveresult--saveresultencoded) — `Success` for new work, or
`Duplicate` when the payload's primary key matches an existing request.

```ts
// storage.saveRequest(outgoing)
// => SaveResult.Success()                              (new request)
// => SaveResult.Duplicate({ originalId, lastReceivedReply })   (deduped)
```

### saveEnvelope

Persists a control `OutgoingEnvelope` (an `AckChunk` or `Interrupt`) rather than a
request. Returns `void`.

```ts
// storage.saveEnvelope(outgoingInterruptEnvelope) // => void
```

### saveReply

Persists a `ReplyWithContext` for a request. A terminal `WithExit` marks the
request processed; a streaming `Chunk` is appended and replayed until acknowledged.
Also notifies any locally registered reply handlers.

```ts
// storage.saveReply(replyWithContext) // => void
```

### clearReplies

Removes the stored replies for a request id, making the request unprocessed again
so it can be replayed.

```ts
import { Snowflake } from "effect/unstable/cluster"
// storage.clearReplies(Snowflake.Snowflake(123n)) // => void
```

### repliesFor

Given outgoing requests, returns the *decoded* replies that still matter:
unacknowledged stream chunks and terminal `WithExit` replies. Used when resuming a
request to deliver what was already produced.

```ts
// storage.repliesFor([outgoing]) // => Array<Reply.Reply<R>>
```

### repliesForUnfiltered

Like `repliesFor`, but keyed by raw `Snowflake` request ids and returning *encoded*
replies without per-RPC filtering.

```ts
// storage.repliesForUnfiltered([Snowflake.Snowflake(123n)]) // => Array<Reply.Encoded>
```

### requestIdForPrimaryKey

Looks up the original request id for a primary key (entity address + RPC tag +
payload key). This is the lookup behind duplicate detection.

```ts
// storage.requestIdForPrimaryKey({ address, tag: "ChargeCard", id: "order-1" })
// => Option.some(Snowflake) | Option.none()
```

### registerReplyHandler / unregisterReplyHandler / unregisterShardReplyHandlers

Process-local handler management. `registerReplyHandler` waits for replies to a
locally sent request; `unregisterReplyHandler` cancels one request's handler;
`unregisterShardReplyHandlers` fails every handler for a shard (used when a shard is
reassigned away from this runner). All three are driven by `Sharding`.

```ts
// yield* storage.registerReplyHandler(outgoingOrIncomingRequest)
// fails with EntityNotAssignedToRunner if the shard is unregistered first
```

### unprocessedMessages / unprocessedMessagesById

`unprocessedMessages(shardIds)` returns the incoming messages a runner still has to
process for the shards it owns: requests with no terminal reply (and no unacked
chunks), the latest `AckChunk`, and `Interrupt` envelopes for those requests.
`unprocessedMessagesById` does the same for specific request ids. Delayed-delivery
requests whose `deliverAt` is in the future are skipped until due.

```ts
import { ShardId } from "effect/unstable/cluster"
// storage.unprocessedMessages([ShardId.make("default", 1)])
// => Array<Message.Incoming<any>>
```

### resetShards / resetAddress / clearAddress

Recovery operations. `resetShards` / `resetAddress` mark mailbox messages
unprocessed again (so they are re-read) for a set of shards or one entity address;
`clearAddress` deletes all messages and replies for an address (used on entity
removal).

```ts
// storage.resetShards([ShardId.make("default", 1)]) // => void
// storage.resetAddress(entityAddress)               // => void
// storage.clearAddress(entityAddress)               // => void
```

There is also `withTransaction`, which wraps an effect in the backing's
transaction (a no-op for in-memory/no-op stores).

### MessageStorage.make / makeEncoded

Two constructors build the service. `make` takes a fully *decoded* storage object
(everything except the reply-handler methods, which it adds) and returns the
service. `makeEncoded` is the usual entry point for drivers: you implement the
lower-level [`Encoded`](#the-encoded-driver-interface) contract (encoded envelopes
+ replies) and it handles encode/decode, primary-key generation, `DeliverAt`
checks, and duplicate decoding.

```ts
import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"

// Build a service from an encoded driver (needs a Snowflake.Generator in context).
// const service = yield* MessageStorage.makeEncoded(myEncodedDriver)
```

### MessageStorage.noop / layerNoop

`noop` is a `MessageStorage` service that persists nothing —
`saveRequest` always returns `SaveResult.Success`, reads return empty. `layerNoop`
provides it as a `Layer`. This backs the `storage: "local"` option.

```ts
import { MessageStorage } from "effect/unstable/cluster"

const NoStorage = MessageStorage.layerNoop // Layer<MessageStorage>
```

### MessageStorage.layerMemory + MemoryDriver

`layerMemory` provides an in-memory store *and* its inspectable backing,
`MemoryDriver`. It requires a `ShardingConfig`. The `MemoryDriver` service exposes
the internal maps (`requests`, `requestsByPrimaryKey`, `unprocessed`, `journal`)
so tests can assert on what was stored. `MemoryEntry` is the per-request record
(`envelope`, `replies`, `lastReceivedChunk`, `deliverAt`), and `MemoryTransaction`
is a `Context.Reference<boolean>` flipped to `true` inside `withTransaction` so
tests can confirm transactional wrapping.

```ts
import { Effect } from "effect"
import { MessageStorage } from "effect/unstable/cluster"

const inspect = Effect.gen(function*() {
  const driver = yield* MessageStorage.MemoryDriver
  // driver.requests, driver.journal, driver.unprocessed, driver.requestsByPrimaryKey
  return driver.journal.length // => number of saved envelopes
})

// Provide both MessageStorage and MemoryDriver (needs ShardingConfig).
const TestLayer = MessageStorage.layerMemory
```

### SaveResult / SaveResultEncoded

`SaveResult` is a `Data.taggedEnum` with two variants: `Success` (new entry) and
`Duplicate` (carries `originalId` and the decoded `lastReceivedReply`).
`SaveResultEncoded` is the driver-level form whose `Duplicate` carries an
*encoded* last reply. Match on the `_tag` after `saveRequest`:

```ts
import { Effect, Option } from "effect"
import { MessageStorage } from "effect/unstable/cluster"

const handle = (storage: MessageStorage["Service"], outgoing: any) =>
  Effect.gen(function*() {
    const result = yield* storage.saveRequest(outgoing)
    if (result._tag === "Duplicate") {
      // already enqueued — reuse the original id, replay its last reply
      result.originalId // => Snowflake
      result.lastReceivedReply // => Option<Reply.Reply<R>>
    }
    // _tag === "Success" => brand new request, will be processed
  })

// Driver-side constructors:
// MessageStorage.SaveResult.Success()
// MessageStorage.SaveResult.Duplicate({ originalId, lastReceivedReply })
// MessageStorage.SaveResultEncoded.Duplicate({ originalId, lastReceivedReply })
```

### The Encoded driver interface

`Encoded` is the contract a custom storage driver implements. It works entirely in
*encoded* terms: `saveEnvelope({ envelope, primaryKey, deliverAt })`, `saveReply`,
`clearReplies`, `requestIdForPrimaryKey`, `repliesFor` / `repliesForUnfiltered`
(both taking `NonEmptyArray<string>`), `unprocessedMessages(shardIds, now)` /
`unprocessedMessagesById(ids, now)`, `resetAddress`, `clearAddress`, `resetShards`,
and `withTransaction`. Feed an implementation to `makeEncoded` to get a full
`MessageStorage`. `EncodedUnprocessedOptions` and `EncodedRepliesOptions` are cursor
shapes (`existingShards`/`newShards` or `existingRequests`/`newRequests` plus a
driver `cursor`) used for paginated reads.

```ts
import { Effect, Option } from "effect"
import { MessageStorage } from "effect/unstable/cluster"

const driver: MessageStorage.Encoded = {
  saveEnvelope: ({ envelope, primaryKey, deliverAt }) =>
    Effect.succeed(MessageStorage.SaveResultEncoded.Success()),
  saveReply: (_reply) => Effect.void,
  clearReplies: (_id) => Effect.void,
  requestIdForPrimaryKey: (_pk) => Effect.succeedNone,
  repliesFor: (_ids) => Effect.succeed([]),
  repliesForUnfiltered: (_ids) => Effect.succeed([]),
  unprocessedMessages: (_shards, _now) => Effect.succeed([]),
  unprocessedMessagesById: (_ids, _now) => Effect.succeed([]),
  resetAddress: (_addr) => Effect.void,
  clearAddress: (_addr) => Effect.void,
  resetShards: (_shards) => Effect.void,
  withTransaction: (effect) => effect
}
// const storage = yield* MessageStorage.makeEncoded(driver)
```

## SQL-backed storage (SqlMessageStorage)

`SqlMessageStorage` is the production driver. It keeps two durable tables — a
`messages` table for requests and control envelopes, and a `replies` table for
chunk and final replies — plus a migrations table. Saving a request inserts into
`messages`; reading unprocessed messages claims due rows for the assigned shards
(using `last_read` as a ~10-minute lease so a stalled runner's messages can be
redelivered); reading replies replays unacknowledged chunks and final exits.

### SqlMessageStorage.make(options)

Builds the `MessageStorage["Service"]` directly, running migrations. The only
option is `prefix` (default `"cluster"`), which names the tables
`<prefix>_messages`, `<prefix>_replies`, and `<prefix>_migrations`. Requires a
`SqlClient` and a `Snowflake.Generator` in context. Use this when composing a
custom layer or supplying your own generator.

```ts
import { Effect } from "effect"
import { SqlMessageStorage } from "effect/unstable/cluster"

// const storage = yield* SqlMessageStorage.make({ prefix: "orders" })
// => MessageStorage service backed by orders_messages / orders_replies
```

### layer

Ready-made layer using the default `"cluster"` prefix and the standard snowflake
generator. Still requires `SqlClient` and `ShardingConfig`. This is what
`storage: "sql"` provides.

```ts
import { SqlMessageStorage } from "effect/unstable/cluster"

const Storage = SqlMessageStorage.layer
// Layer<MessageStorage, never, SqlClient | ShardingConfig>
```

### layerWith(options)

Same as `layer` but with a custom table `prefix`. Pick a stable prefix *before*
deployment — changing it later points the runtime at a different (empty) set of
tables and migration history.

```ts
import { SqlMessageStorage } from "effect/unstable/cluster"

const Storage = SqlMessageStorage.layerWith({ prefix: "orders" })
```
**Caution:** `make` runs migrations on startup, creating `<prefix>_messages`,
`<prefix>_replies`, and `<prefix>_migrations`. Keep the migration table alongside
the message tables, and never change the prefix after go-live.

### SqlRunnerStorage (durable runner membership)

A persisted mailbox is only half of durable cluster state — runner membership and
shard ownership must also be durable so a restarted process rejoins the same
shards. `SqlRunnerStorage.make({ prefix })`, `SqlRunnerStorage.layer`, and
`SqlRunnerStorage.layerWith({ prefix })` provide that, creating `<prefix>_runners`
and `<prefix>_locks` tables (PostgreSQL/MySQL use advisory locks unless
`ShardingConfig.shardLockDisableAdvisory` is set). `storage: "sql"` wires this for
you. See [Sharding and runners](https://effect.plants.sh/cluster/sharding-and-runners/) for the bigger
picture.

```ts
import { SqlRunnerStorage } from "effect/unstable/cluster"

const Runners = SqlRunnerStorage.layer
// Layer<RunnerStorage, SqlError, SqlClient | ShardingConfig>
```

## Message and reply types (the wire/storage model)

These are the shapes a mailbox actually stores. App code rarely constructs them,
but recognizing them helps when reading the tables or a custom driver. They live in
the `Message`, `Reply`, `Envelope`, and `DeliverAt` modules.

### Message

`Message.Incoming<R>` is what a runner reads from storage/transport: either an
`IncomingRequest<R>` (a `PartialRequest` envelope whose payload is still encoded,
plus a `respond` callback and `lastSentReply`) or an `IncomingEnvelope` (a control
`AckChunk`/`Interrupt`). `Message.IncomingLocal<R>` is the decoded in-process form
(`IncomingRequestLocal<R>` | `IncomingEnvelope`). `Message.Outgoing<R>` is what a
caller produces: `OutgoingRequest<R>` (decoded payload + RPC + context +
`lastReceivedReply`) or `OutgoingEnvelope` (with `OutgoingEnvelope.interrupt({
address, id, requestId })`). Serialization helpers convert between them:

```ts
import { Effect } from "effect"
import { Message } from "effect/unstable/cluster"

// const partial   = yield* Message.serialize(outgoing)        // => Envelope.Partial
// const encoded   = yield* Message.serializeEnvelope(outgoing) // => Envelope.Encoded (JSON)
// const partialReq = yield* Message.serializeRequest(outgoingRequest) // => Envelope.PartialRequest
// const local     = yield* Message.deserializeLocal(outgoing, partial) // => Message.IncomingLocal
const _ = Message.incomingLocalFromOutgoing // Outgoing<R> -> IncomingLocal<R>
```

### Reply

A `Reply.Reply<R>` is either a terminal `Reply.WithExit<R>` (carries the final
`Exit`) or a streaming `Reply.Chunk<R>` (a sequenced, non-empty batch of success
values, replayed until acknowledged). `Reply.Encoded` is the JSON form
(`WithExitEncoded | ChunkEncoded`). `Reply.ReplyWithContext<R>` pairs a reply with
the RPC + context needed to serialize it (with `ReplyWithContext.fromDefect` /
`.interrupt` helpers). `Reply.Reply(rpc)` builds the transport codec.

```ts
import { Effect } from "effect"
import { Reply } from "effect/unstable/cluster"

// const encoded = yield* Reply.serialize(replyWithContext)      // => Reply.Encoded
// const last    = yield* Reply.serializeLastReceived(outgoing)  // => Option<Reply.Encoded>
Reply.isReply(undefined) // => false
```

### Envelope

`Envelope.Envelope<R>` is the transport message: `Request<R>` (request id, entity
address, RPC tag, decoded payload, headers, trace context), `AckChunk` (acks a
received stream chunk), or `Interrupt`. Build a request with
`Envelope.makeRequest(...)`. `Envelope.Partial` / `PartialJson` decode/encode the
metadata while leaving the payload `unknown`; `PartialArray` batches them.
`Envelope.primaryKey(envelope)` / `primaryKeyByAddress({ address, tag, id })`
compute the dedup key (`entityType/entityId/tag/id`), and `Envelope.isEnvelope`
guards values.

```ts
import { Envelope } from "effect/unstable/cluster"

Envelope.primaryKeyByAddress({
  address: { entityType: "Order", entityId: "order-1" } as any,
  tag: "ChargeCard",
  id: "charge-1"
})
// => "Order/order-1/ChargeCard/charge-1"

Envelope.isEnvelope({}) // => false
```

### DeliverAt

A payload schedules its own delivery time by implementing the `DeliverAt.symbol`
method (returning a `DateTime`). `MessageStorage` reads it via
`DeliverAt.toMillis(payload)` when saving, storing it as `deliver_at`; the message
stays out of `unprocessedMessages` until that instant. `DeliverAt.isDeliverAt`
guards a value. This is the mechanism behind scheduled cron jobs — see
[Singletons and ClusterCron](https://effect.plants.sh/cluster/singletons-and-cron/).

```ts
import { DateTime, Schema } from "effect"
import { DeliverAt } from "effect/unstable/cluster"
import { PrimaryKey } from "effect"

// A persisted payload that asks to be delivered at a specific time, and
// deduplicates on its key.
class RunAt extends Schema.Class<RunAt>("RunAt")({
  at: Schema.DateTimeUtc,
  key: Schema.String
}) {
  [PrimaryKey.symbol]() {
    return this.key
  }
  [DeliverAt.symbol]() {
    return this.at
  }
}

DeliverAt.toMillis(new RunAt({ at: DateTime.makeUnsafe("2030-01-01"), key: "k" }))
// => 1893456000000  (epoch ms; null for payloads that don't implement DeliverAt)
DeliverAt.toMillis({ no: "deliverAt" }) // => null
```

## Next steps

- [Entities](https://effect.plants.sh/cluster/entities/) — where the `ClusterSchema.Persisted` annotation
  is applied to your RPCs.
- [Singletons and ClusterCron](https://effect.plants.sh/cluster/singletons-and-cron/) — how `DeliverAt` and
  persisted messages power scheduled, exactly-one-runner work.
- [Sharding and runners](https://effect.plants.sh/cluster/sharding-and-runners/) — runner membership and
  shard ownership, the durable state that sits alongside the mailbox.