# Advanced RPC

The everyday RPC surface — [defining RPCs](https://effect.plants.sh/rpc/defining-rpcs/), wiring a
[client and server](https://effect.plants.sh/rpc/client-and-server/), and choosing a
[transport](https://effect.plants.sh/rpc/transports-and-serialization/) — is built on a handful of
lower-level modules. You rarely touch them directly, but they become necessary
when you:

- need to send **startup data** to a worker before the first request (config,
  credentials, transferable handles),
- want to wire a client and server **in process** with no bytes on the wire,
- are writing a **custom transport or serializer** and need the message
  protocol, or
- need to control how **defects** flow back to callers.

This page collects those surfaces. It is example-first for the things you call
(`RpcWorker`, the no-serialization channels, defect options) and a labelled
reference for the protocol data types (`RpcMessage`) that custom transports
exchange.

## Worker bootstrap messages (RpcWorker)

A worker-backed RPC client sometimes needs to hand the worker a one-time
envelope of data *before* any RPC request is handled: per-worker configuration,
credentials, feature flags, a preloaded cache, or transferable handles
(`MessagePort`, `ArrayBuffer`, ...). `RpcWorker` provides a typed channel for
exactly that.

The initial message is encoded with a schema's JSON codec on the client side,
posted with the worker's `postMessage`, and decoded with the same schema inside
the worker. It is **separate from `RpcSerialization`** — changing how
requests/responses are serialized does not change how this bootstrap payload is
encoded — and it crosses the structured-clone boundary, so any transferables
you collect are *moved* to the worker (the sender can no longer use them).

```ts
import { Effect, Layer, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"

// The shape of the startup data both sides agree on.
const Bootstrap = Schema.Struct({
  tenantId: Schema.String,
  features: Schema.Array(Schema.String)
})

// CLIENT SIDE: provide the typed startup data as a layer. The build effect runs
// once; its result is encoded and stored in context for the worker client
// runtime to post. This layer provides the `RpcWorker.InitialMessage` service.
const InitialMessageLayer = RpcWorker.layerInitialMessage(Bootstrap, Effect.succeed({
  tenantId: "acme",
  features: ["beta-ui", "fast-path"]
}))

// WORKER SIDE: read and decode the bootstrap payload. Requires the server
// `Protocol`, and fails with `NoSuchElementError` if no initial message was
// sent or `Schema.SchemaError` if decoding fails.
const program = Effect.gen(function* () {
  const boot = yield* RpcWorker.initialMessage(Bootstrap)
  yield* Effect.log(`booting tenant ${boot.tenantId}`)
  // => decoded as { tenantId: string; features: ReadonlyArray<string> }
})
```
**Worker client protocol:** The actual worker transport (the layers that spawn workers and route encoded
  messages) lives on the
  [Transports and serialization](https://effect.plants.sh/rpc/transports-and-serialization/) page.
  `RpcWorker` only handles the bootstrap envelope that rides alongside it.

### RpcWorker.InitialMessage

A `Context.Service` holding the encoded initial message paired with the
transferables that should be posted with it. The service value is an `Effect`
that, when run, yields `readonly [data: unknown, transfers: ReadonlyArray<Transferable>]`.
You normally obtain it via `layerInitialMessage` rather than constructing it
by hand.

```ts
import { RpcWorker } from "effect/unstable/rpc"

RpcWorker.InitialMessage.key
// => "effect/rpc/RpcWorker/InitialMessage"
```

### RpcWorker.layerInitialMessage

`layerInitialMessage(schema, build)` (client side) builds a `Layer` providing
`InitialMessage`. It runs the `build` effect, encodes the value with the
schema's JSON codec, collects transferables, and stores the result in context.
Schema encoding failures are turned into a defect (the build effect's error
channel is `never`).

```ts
import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"

const Config = Schema.Struct({ region: Schema.String })

const layer = RpcWorker.layerInitialMessage(
  Config,
  Effect.succeed({ region: "eu-west-1" })
)
// => Layer<RpcWorker.InitialMessage, never, never>
```

### RpcWorker.makeInitialMessage

`makeInitialMessage(schema, effect)` runs `effect`, encodes its result with the
schema's JSON codec, and returns the encoded value together with the list of
transferables collected during encoding. Use this when you want the encoded
payload and transfer list directly (`layerInitialMessage` is built on top of
it). It can fail with `Schema.SchemaError`.

```ts
import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"

const Payload = Schema.Struct({ token: Schema.String })

const make = RpcWorker.makeInitialMessage(
  Payload,
  Effect.succeed({ token: "secret" })
)
// => Effect<readonly [data: unknown, transferables: ReadonlyArray<Transferable>], SchemaError, ...>
```

### RpcWorker.initialMessage

`initialMessage(schema)` (worker side) reads the protocol's bootstrap payload
and decodes it with the supplied schema. It requires the server `Protocol`
service and fails with `NoSuchElementError` when no initial message is present,
or `Schema.SchemaError` when decoding fails.

```ts
import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"

const Bootstrap = Schema.Struct({ tenantId: Schema.String })

const boot = RpcWorker.initialMessage(Bootstrap)
// => Effect<{ tenantId: string }, NoSuchElementError | SchemaError, Protocol>
```

## Streaming internals (RpcSchema)

A streaming RPC still has an ordinary RPC exit, but its success schema is
wrapped in an `RpcSchema.Stream` marker that records the stream element schema
and the stream error schema. `Rpc.make(tag, { stream: true })` installs this
marker for you, so you rarely call `RpcSchema.Stream` directly — but transports
and the cluster/reactivity layers read it to encode and decode chunks.

The key semantic: for a streaming RPC the **immediate exit succeeds with
`void`**; the stream elements and stream errors travel separately as `Chunk`
responses (see [the wire protocol](#the-wire-protocol-rpcmessage) below).

### RpcSchema.Stream

`Stream(success, error)` builds the streaming success marker from a success
*element* schema and a stream *error* schema. The resulting schema describes a
`Stream<A, E>` value.

```ts
import { Schema } from "effect"
import { RpcSchema } from "effect/unstable/rpc"

const marker = RpcSchema.Stream(Schema.Number, Schema.String)
// => an RpcSchema.Stream<Schema.Number, Schema.String>
// equivalent to what Rpc.make("name", { success: Schema.Number,
//   error: Schema.String, stream: true }) installs for you
```

### RpcSchema.isStreamSchema

A type guard that returns `true` for schemas created by `RpcSchema.Stream`.
Protocol code uses it to branch between one-shot and streaming responses, and to
reach the stored `success` / `error` schemas.

```ts
import { Schema } from "effect"
import { RpcSchema } from "effect/unstable/rpc"

const streamed = RpcSchema.Stream(Schema.Number, Schema.String)
RpcSchema.isStreamSchema(streamed) // => true
RpcSchema.isStreamSchema(Schema.Number) // => false

if (RpcSchema.isStreamSchema(streamed)) {
  streamed.success // => Schema.Number (the element schema)
  streamed.error // => Schema.String  (the stream error schema)
}
```

### RpcSchema.ClientAbort

A `Context.Service` annotation that tags interruptions originating from a remote
client aborting — disconnecting or cancelling an in-flight request. The server
attaches `RpcSchema.ClientAbort.annotation` to the interrupt cause, so handler
finalizers can distinguish a client-initiated cancel from other interruptions.

```ts
import { Cause, Context, Effect, Option } from "effect"
import { RpcSchema } from "effect/unstable/rpc"

// `Effect.onError` hands the finalizer the failure `Cause`, which carries the
// interrupt annotation the server attaches when a client aborts.
const handler = Effect.never.pipe(
  Effect.onError((cause) =>
    Effect.gen(function* () {
      const annotations = Cause.annotations(cause)
      const aborted = Context.getOption(annotations, RpcSchema.ClientAbort)
      if (Option.isSome(aborted)) {
        // => Option.some(true) when the client disconnected/cancelled
        yield* Effect.log("client aborted the request")
      }
    })
  )
)
```

## The wire protocol (RpcMessage)

`RpcMessage` is the protocol vocabulary that sits below `RpcClient` and
`RpcServer`. If you are writing a custom transport or serializer, these are the
envelopes you move. Each message comes in two forms:

- **Decoded** messages carry branded ids (`RequestId`), typed RPC tags, typed
  payloads, and `Headers`. These flow through in-process channels.
- **Encoded** (`*Encoded`) messages use string ids and `unknown` payloads that
  have already crossed the schema serialization boundary. These flow across
  transport boundaries.

A single request is identified by one `RequestId` from its first `Request`,
through any `Chunk` batches, to the terminal `Exit`, plus any `Ack`s and
`Interrupt`s. Transports **must preserve request ids exactly** across the
encoded/decoded boundary, or responses, acks, and interrupts will be routed to
the wrong call.

These are protocol data types, so the reference below is a labelled list rather
than a runnable example per interface.

### Client to server messages

The decoded union is `FromClient = Request | Ack | Interrupt | Eof`; the encoded
union is `FromClientEncoded = RequestEncoded | AckEncoded | InterruptEncoded | Ping | Eof`.

- **`Request<A>` / `RequestEncoded`** — starts work. Carries `id`, `tag`,
  `payload`, `headers`, and optional `traceId` / `spanId` / `sampled` trace
  context. Decoded uses a branded `RequestId` and typed `tag`/`payload`; encoded
  uses a string `id` and `unknown` payload.
- **`Ack` / `AckEncoded`** — acknowledges a streamed response chunk for stream
  back pressure. It is **not** a call-completion signal. Carries `requestId`.
- **`Interrupt` / `InterruptEncoded`** — cancels an in-flight RPC. Decoded
  carries `requestId` and the interrupting fiber ids (`interruptors`); encoded
  carries just the string `requestId`.
- **`Eof`** — the client has finished sending input for the current connection
  or request batch. It does **not** replace the terminal `Exit` response.
- **`Ping`** — a client-to-server keepalive used by liveness-monitoring
  transports (encoded family only).

### Server to client messages

The decoded union is `FromServer = ResponseChunk | ResponseExit | ResponseDefect | ClientEnd`;
the encoded union is `FromServerEncoded = ResponseChunkEncoded | ResponseExitEncoded | ResponseDefectEncoded | Pong | ClientProtocolError`.

- **`ResponseChunk<A>` / `ResponseChunkEncoded`** (`_tag: "Chunk"`) — a
  non-empty batch of stream chunk values for a request. Decoded also carries the
  `clientId`.
- **`ResponseExit<A>` / `ResponseExitEncoded`** (`_tag: "Exit"`) — the terminal
  response carrying the RPC's `Exit`. The encoded `exit` uses `ExitEncoded<A, E>`
  whose failure side is an array of `Fail` (typed error), `Die` (defect), or
  `Interrupt` (with optional `fiberId`) entries.
- **`ResponseDefect` / `ResponseDefectEncoded`** (`_tag: "Defect"`) — a
  connection-level (protocol) defect affecting the client connection, not tied
  to a single request's exit.
- **`ClientEnd`** — the client connection has ended (decoded only; carries
  `clientId`).
- **`ClientProtocolError`** — reports an `RpcClientError` to affected in-flight
  requests (encoded family only).
- **`Pong`** — keepalive response to a `Ping` (encoded family only).

### RpcMessage.RequestId

A `RequestId` is a `bigint` branded as `"~effect/rpc/RpcMessage/RequestId"`. The
`RequestId(id)` constructor coerces a `bigint` or a `string` into the branded
type — strings are parsed with `BigInt`.

```ts
import { RpcMessage } from "effect/unstable/rpc"

const a = RpcMessage.RequestId(42n)
const b = RpcMessage.RequestId("42")
a === b // => true (both are the branded bigint 42n)
```

### RpcMessage.constEof / constPing / constPong

Reusable singleton message values, so transports don't reallocate them.

```ts
import { RpcMessage } from "effect/unstable/rpc"

RpcMessage.constEof // => { _tag: "Eof" }
RpcMessage.constPing // => { _tag: "Ping" }
RpcMessage.constPong // => { _tag: "Pong" }
```

### RpcMessage.ResponseDefectEncoded

A constructor that builds a transport-encoded `Defect` response, encoding the
input with `Schema.Defect` (so arbitrary thrown values survive the wire).

```ts
import { RpcMessage } from "effect/unstable/rpc"

RpcMessage.ResponseDefectEncoded(new Error("boom"))
// => { _tag: "Defect", defect: <Schema.Defect-encoded Error> }
```

### RpcMessage.ResponseExitDieEncoded

Builds an encoded terminal `Exit` response whose exit is a *die* (defect) for a
specific `requestId`. The defect is encoded with `Schema.Defect`. Use this to
fail a single request with a defect rather than tearing down the connection.

```ts
import { RpcMessage } from "effect/unstable/rpc"

RpcMessage.ResponseExitDieEncoded({
  requestId: RpcMessage.RequestId(7n),
  defect: new Error("handler crashed")
})
// => { _tag: "Exit", requestId: "7", exit: { _tag: "Failure",
//      cause: [{ _tag: "Die", defect: <encoded Error> }] } }
```

## Custom and in-process channels

`RpcServer.makeNoSerialization` and `RpcClient.makeNoSerialization` wire a server
and client (or an already-decoded channel) using **decoded `FromClient` /
`FromServer` messages directly — no bytes, no serialization**. This is how
[`RpcTest`](https://effect.plants.sh/rpc/client-and-server/) connects a generated client straight to
handlers, and it is the foundation for in-process or already-decoded transports.

The pattern is symmetrical: each side is created with a callback that delivers
*its* outgoing messages, and returns a `write` (server) / `write` (client)
function for delivering the *other* side's messages back in.

```ts
import { Effect } from "effect"
import { RpcClient, RpcServer } from "effect/unstable/rpc"
import type { Rpc, RpcGroup } from "effect/unstable/rpc"

// `group` is your RpcGroup. Both halves are scoped Effects.
const wire = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) =>
  Effect.gen(function* () {
    // Created lazily so the server can reference the client's `write`.
    let client!: Effect.Success<
      ReturnType<typeof RpcClient.makeNoSerialization<Rpcs, never, false>>
    >

    // Server: handlers run here; decoded responses are pushed to the client.
    const server = yield* RpcServer.makeNoSerialization(group, {
      onFromServer: (response) => client.write(response)
    })

    // Client: requests are pushed to the server as decoded messages.
    client = yield* RpcClient.makeNoSerialization(group, {
      supportsAck: true,
      onFromClient: ({ message }) => server.write(0, message)
    })

    return client.client
  })
```

`RpcServer.makeNoSerialization(group, options)` returns an `RpcServer` with
`write(clientId, message)` and `disconnect(clientId)`. The `onFromServer`
callback receives every decoded `FromServer` response. It accepts the same
tuning options as the higher-level server — `concurrency`, `disableTracing`,
`disableClientAcks`, `spanPrefix`, and `disableFatalDefects` (see
[below](#defect-handling-and-exit-schema)).

`RpcClient.makeNoSerialization(group, options)` returns `{ client, write }`,
where `client` is the generated RPC client and `write(message)` delivers a
decoded `FromServer` message to it. The `onFromClient` callback receives each
outgoing `FromClient` message (plus its `context` and a `discard` flag). The
`flatten` option matches `RpcTest` and `RpcClient.make`.

### Building a brand-new transport

To build a genuinely new transport (not just in-process), implement the
`Protocol` services. Both are created through a `make` hook that buffers writes
until the run loop installs a writer:

- **`RpcClient.Protocol.make`** (`withRunClient`) — you supply a function
  `(write, clientIds) => Effect<Omit<Protocol["Service"], "run">>`. Call `write`
  to push decoded-from-encoded server responses back to a client; return the
  rest of the protocol service (`send`, `supportsAck`, `supportsTransferables`).
- **`RpcServer.Protocol.make`** (`withRun`) — you supply a function
  `(write) => Effect<Omit<Protocol["Service"], "run">>` and return `send`, `end`,
  `disconnects`, `clientIds`, `initialMessage`, and the capability flags. Call
  `write(clientId, data)` to feed encoded client messages into the server.

```ts
import { Effect, Option, Queue } from "effect"
import { RpcServer } from "effect/unstable/rpc"

const myServerProtocol = RpcServer.Protocol.make(
  Effect.fnUntraced(function* (writeRequest) {
    // ... wire up your transport, calling `writeRequest(clientId, encoded)`
    // for each incoming encoded `FromClientEncoded` message ...
    const disconnects = yield* Queue.make<number>()
    return {
      send: (_clientId, _response, _transferables) => Effect.void,
      end: (_clientId) => Effect.void,
      disconnects, // a Queue.Dequeue<number>
      clientIds: Effect.succeed(new Set<number>()),
      initialMessage: Effect.succeed(Option.none<unknown>()),
      supportsAck: true,
      supportsTransferables: false,
      supportsSpanPropagation: false
    }
  })
)
```
**Tip:** Study the built-in protocols in the source —
  `RpcServer.makeProtocolSocketServer`, `RpcClient.makeProtocolHttp`, and the
  worker protocol — for complete, real implementations of these hooks.

## Defect handling and exit schema

By default a **defect** thrown in a handler (an unexpected error outside the
RPC's typed error channel) is treated as *fatal* at the protocol level: it
becomes a connection-level `Defect` response rather than a normal request exit.
This is usually what you want — an unexpected crash should not look like an
ordinary result.

### disableFatalDefects

The `disableFatalDefects` option (on `RpcServer.make`, `makeNoSerialization`,
`layer`, and `layerHttp`) flips that behaviour: when `true`, defects stay as
ordinary request *exits* (a `Die` in the encoded `ExitEncoded` cause) and the
connection survives. Use it when callers should observe per-request defects
instead of seeing the connection torn down.

```ts
import { RpcServer } from "effect/unstable/rpc"
import type { Rpc, RpcGroup } from "effect/unstable/rpc"

// Defects become ordinary request exits, not connection-level protocol defects.
const server = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) =>
  RpcServer.makeNoSerialization(group, {
    onFromServer: () => undefined as any,
    disableFatalDefects: true
  })
```

### Rpc.DefectSchema and the per-RPC defect schema

Every `Rpc` carries a `defectSchema` (default `Schema.Defect`). You can override
it per RPC with the `defect` option on `Rpc.make`. The schema must satisfy the
`Rpc.DefectSchema` constraint: it must encode/decode **without any services**
(`DecodingServices` and `EncodingServices` are `never`), since defects are
serialized at the protocol layer with no context available.

```ts
import { Schema } from "effect"
import { Rpc } from "effect/unstable/rpc"

// Constrain how defects are represented on the wire for this RPC.
const Risky = Rpc.make("Risky", {
  success: Schema.String,
  defect: Schema.String // a DefectSchema: no encode/decode services
})

Risky.defectSchema // => Schema.String
```

### Rpc.exitSchema

`Rpc.exitSchema(rpc)` builds the `Schema.Exit` used to encode and decode an
RPC's results. The failure side folds together the RPC's own error schema, any
middleware error schemas, and (for streaming RPCs) the stream error schema;
streaming RPCs use `Schema.Void` for the exit success value. The result is
cached per RPC definition. This is the exact schema the client and server use at
the `Exit` boundary.

```ts
import { Schema } from "effect"
import { Rpc } from "effect/unstable/rpc"

const GetUser = Rpc.make("GetUser", {
  success: Schema.Struct({ id: Schema.String }),
  error: Schema.String
})

const exit = Rpc.exitSchema(GetUser)
// => Schema.Exit<success: {id:string}, error: string | (middleware errors),
//      defect: Schema.Defect>
```
**Client-side transport failures:** A failed *transport* (not a handler error) surfaces to callers as
  `RpcClientError`, which is separate from the RPC's own error channel. See
  [Client and server](https://effect.plants.sh/rpc/client-and-server/) for how `RpcClientError` appears
  in a client's error type.