Skip to content

Advanced RPC

The everyday RPC surface — defining RPCs, wiring a client and server, and choosing a transport — is built on a handful of lower-level modules. You rarely touch them directly, but they become necessary when you:

  • need to send startup data to a worker before the first request (config, credentials, transferable handles),
  • want to wire a client and server in process with no bytes on the wire,
  • are writing a custom transport or serializer and need the message protocol, or
  • need to control how defects flow back to callers.

This page collects those surfaces. It is example-first for the things you call (RpcWorker, the no-serialization channels, defect options) and a labelled reference for the protocol data types (RpcMessage) that custom transports exchange.

A worker-backed RPC client sometimes needs to hand the worker a one-time envelope of data before any RPC request is handled: per-worker configuration, credentials, feature flags, a preloaded cache, or transferable handles (MessagePort, ArrayBuffer, …). RpcWorker provides a typed channel for exactly that.

The initial message is encoded with a schema’s JSON codec on the client side, posted with the worker’s postMessage, and decoded with the same schema inside the worker. It is separate from RpcSerialization — changing how requests/responses are serialized does not change how this bootstrap payload is encoded — and it crosses the structured-clone boundary, so any transferables you collect are moved to the worker (the sender can no longer use them).

import { Effect, Layer, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"
// The shape of the startup data both sides agree on.
const Bootstrap = Schema.Struct({
tenantId: Schema.String,
features: Schema.Array(Schema.String)
})
// CLIENT SIDE: provide the typed startup data as a layer. The build effect runs
// once; its result is encoded and stored in context for the worker client
// runtime to post. This layer provides the `RpcWorker.InitialMessage` service.
const InitialMessageLayer = RpcWorker.layerInitialMessage(Bootstrap, Effect.succeed({
tenantId: "acme",
features: ["beta-ui", "fast-path"]
}))
// WORKER SIDE: read and decode the bootstrap payload. Requires the server
// `Protocol`, and fails with `NoSuchElementError` if no initial message was
// sent or `Schema.SchemaError` if decoding fails.
const program = Effect.gen(function* () {
const boot = yield* RpcWorker.initialMessage(Bootstrap)
yield* Effect.log(`booting tenant ${boot.tenantId}`)
// => decoded as { tenantId: string; features: ReadonlyArray<string> }
})

A Context.Service holding the encoded initial message paired with the transferables that should be posted with it. The service value is an Effect that, when run, yields readonly [data: unknown, transfers: ReadonlyArray<Transferable>]. You normally obtain it via layerInitialMessage rather than constructing it by hand.

import { RpcWorker } from "effect/unstable/rpc"
RpcWorker.InitialMessage.key
// => "effect/rpc/RpcWorker/InitialMessage"

layerInitialMessage(schema, build) (client side) builds a Layer providing InitialMessage. It runs the build effect, encodes the value with the schema’s JSON codec, collects transferables, and stores the result in context. Schema encoding failures are turned into a defect (the build effect’s error channel is never).

import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"
const Config = Schema.Struct({ region: Schema.String })
const layer = RpcWorker.layerInitialMessage(
Config,
Effect.succeed({ region: "eu-west-1" })
)
// => Layer<RpcWorker.InitialMessage, never, never>

makeInitialMessage(schema, effect) runs effect, encodes its result with the schema’s JSON codec, and returns the encoded value together with the list of transferables collected during encoding. Use this when you want the encoded payload and transfer list directly (layerInitialMessage is built on top of it). It can fail with Schema.SchemaError.

import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"
const Payload = Schema.Struct({ token: Schema.String })
const make = RpcWorker.makeInitialMessage(
Payload,
Effect.succeed({ token: "secret" })
)
// => Effect<readonly [data: unknown, transferables: ReadonlyArray<Transferable>], SchemaError, ...>

initialMessage(schema) (worker side) reads the protocol’s bootstrap payload and decodes it with the supplied schema. It requires the server Protocol service and fails with NoSuchElementError when no initial message is present, or Schema.SchemaError when decoding fails.

import { Effect, Schema } from "effect"
import { RpcWorker } from "effect/unstable/rpc"
const Bootstrap = Schema.Struct({ tenantId: Schema.String })
const boot = RpcWorker.initialMessage(Bootstrap)
// => Effect<{ tenantId: string }, NoSuchElementError | SchemaError, Protocol>

A streaming RPC still has an ordinary RPC exit, but its success schema is wrapped in an RpcSchema.Stream marker that records the stream element schema and the stream error schema. Rpc.make(tag, { stream: true }) installs this marker for you, so you rarely call RpcSchema.Stream directly — but transports and the cluster/reactivity layers read it to encode and decode chunks.

The key semantic: for a streaming RPC the immediate exit succeeds with void; the stream elements and stream errors travel separately as Chunk responses (see the wire protocol below).

Stream(success, error) builds the streaming success marker from a success element schema and a stream error schema. The resulting schema describes a Stream<A, E> value.

import { Schema } from "effect"
import { RpcSchema } from "effect/unstable/rpc"
const marker = RpcSchema.Stream(Schema.Number, Schema.String)
// => an RpcSchema.Stream<Schema.Number, Schema.String>
// equivalent to what Rpc.make("name", { success: Schema.Number,
// error: Schema.String, stream: true }) installs for you

A type guard that returns true for schemas created by RpcSchema.Stream. Protocol code uses it to branch between one-shot and streaming responses, and to reach the stored success / error schemas.

import { Schema } from "effect"
import { RpcSchema } from "effect/unstable/rpc"
const streamed = RpcSchema.Stream(Schema.Number, Schema.String)
RpcSchema.isStreamSchema(streamed) // => true
RpcSchema.isStreamSchema(Schema.Number) // => false
if (RpcSchema.isStreamSchema(streamed)) {
streamed.success // => Schema.Number (the element schema)
streamed.error // => Schema.String (the stream error schema)
}

A Context.Service annotation that tags interruptions originating from a remote client aborting — disconnecting or cancelling an in-flight request. The server attaches RpcSchema.ClientAbort.annotation to the interrupt cause, so handler finalizers can distinguish a client-initiated cancel from other interruptions.

import { Cause, Context, Effect, Option } from "effect"
import { RpcSchema } from "effect/unstable/rpc"
// `Effect.onError` hands the finalizer the failure `Cause`, which carries the
// interrupt annotation the server attaches when a client aborts.
const handler = Effect.never.pipe(
Effect.onError((cause) =>
Effect.gen(function* () {
const annotations = Cause.annotations(cause)
const aborted = Context.getOption(annotations, RpcSchema.ClientAbort)
if (Option.isSome(aborted)) {
// => Option.some(true) when the client disconnected/cancelled
yield* Effect.log("client aborted the request")
}
})
)
)

RpcMessage is the protocol vocabulary that sits below RpcClient and RpcServer. If you are writing a custom transport or serializer, these are the envelopes you move. Each message comes in two forms:

  • Decoded messages carry branded ids (RequestId), typed RPC tags, typed payloads, and Headers. These flow through in-process channels.
  • Encoded (*Encoded) messages use string ids and unknown payloads that have already crossed the schema serialization boundary. These flow across transport boundaries.

A single request is identified by one RequestId from its first Request, through any Chunk batches, to the terminal Exit, plus any Acks and Interrupts. Transports must preserve request ids exactly across the encoded/decoded boundary, or responses, acks, and interrupts will be routed to the wrong call.

These are protocol data types, so the reference below is a labelled list rather than a runnable example per interface.

The decoded union is FromClient = Request | Ack | Interrupt | Eof; the encoded union is FromClientEncoded = RequestEncoded | AckEncoded | InterruptEncoded | Ping | Eof.

  • Request<A> / RequestEncoded — starts work. Carries id, tag, payload, headers, and optional traceId / spanId / sampled trace context. Decoded uses a branded RequestId and typed tag/payload; encoded uses a string id and unknown payload.
  • Ack / AckEncoded — acknowledges a streamed response chunk for stream back pressure. It is not a call-completion signal. Carries requestId.
  • Interrupt / InterruptEncoded — cancels an in-flight RPC. Decoded carries requestId and the interrupting fiber ids (interruptors); encoded carries just the string requestId.
  • Eof — the client has finished sending input for the current connection or request batch. It does not replace the terminal Exit response.
  • Ping — a client-to-server keepalive used by liveness-monitoring transports (encoded family only).

The decoded union is FromServer = ResponseChunk | ResponseExit | ResponseDefect | ClientEnd; the encoded union is FromServerEncoded = ResponseChunkEncoded | ResponseExitEncoded | ResponseDefectEncoded | Pong | ClientProtocolError.

  • ResponseChunk<A> / ResponseChunkEncoded (_tag: "Chunk") — a non-empty batch of stream chunk values for a request. Decoded also carries the clientId.
  • ResponseExit<A> / ResponseExitEncoded (_tag: "Exit") — the terminal response carrying the RPC’s Exit. The encoded exit uses ExitEncoded<A, E> whose failure side is an array of Fail (typed error), Die (defect), or Interrupt (with optional fiberId) entries.
  • ResponseDefect / ResponseDefectEncoded (_tag: "Defect") — a connection-level (protocol) defect affecting the client connection, not tied to a single request’s exit.
  • ClientEnd — the client connection has ended (decoded only; carries clientId).
  • ClientProtocolError — reports an RpcClientError to affected in-flight requests (encoded family only).
  • Pong — keepalive response to a Ping (encoded family only).

A RequestId is a bigint branded as "~effect/rpc/RpcMessage/RequestId". The RequestId(id) constructor coerces a bigint or a string into the branded type — strings are parsed with BigInt.

import { RpcMessage } from "effect/unstable/rpc"
const a = RpcMessage.RequestId(42n)
const b = RpcMessage.RequestId("42")
a === b // => true (both are the branded bigint 42n)

RpcMessage.constEof / constPing / constPong

Section titled “RpcMessage.constEof / constPing / constPong”

Reusable singleton message values, so transports don’t reallocate them.

import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.constEof // => { _tag: "Eof" }
RpcMessage.constPing // => { _tag: "Ping" }
RpcMessage.constPong // => { _tag: "Pong" }

A constructor that builds a transport-encoded Defect response, encoding the input with Schema.Defect (so arbitrary thrown values survive the wire).

import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.ResponseDefectEncoded(new Error("boom"))
// => { _tag: "Defect", defect: <Schema.Defect-encoded Error> }

Builds an encoded terminal Exit response whose exit is a die (defect) for a specific requestId. The defect is encoded with Schema.Defect. Use this to fail a single request with a defect rather than tearing down the connection.

import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.ResponseExitDieEncoded({
requestId: RpcMessage.RequestId(7n),
defect: new Error("handler crashed")
})
// => { _tag: "Exit", requestId: "7", exit: { _tag: "Failure",
// cause: [{ _tag: "Die", defect: <encoded Error> }] } }

RpcServer.makeNoSerialization and RpcClient.makeNoSerialization wire a server and client (or an already-decoded channel) using decoded FromClient / FromServer messages directly — no bytes, no serialization. This is how RpcTest connects a generated client straight to handlers, and it is the foundation for in-process or already-decoded transports.

The pattern is symmetrical: each side is created with a callback that delivers its outgoing messages, and returns a write (server) / write (client) function for delivering the other side’s messages back in.

import { Effect } from "effect"
import { RpcClient, RpcServer } from "effect/unstable/rpc"
import type { Rpc, RpcGroup } from "effect/unstable/rpc"
// `group` is your RpcGroup. Both halves are scoped Effects.
const wire = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) =>
Effect.gen(function* () {
// Created lazily so the server can reference the client's `write`.
let client!: Effect.Success<
ReturnType<typeof RpcClient.makeNoSerialization<Rpcs, never, false>>
>
// Server: handlers run here; decoded responses are pushed to the client.
const server = yield* RpcServer.makeNoSerialization(group, {
onFromServer: (response) => client.write(response)
})
// Client: requests are pushed to the server as decoded messages.
client = yield* RpcClient.makeNoSerialization(group, {
supportsAck: true,
onFromClient: ({ message }) => server.write(0, message)
})
return client.client
})

RpcServer.makeNoSerialization(group, options) returns an RpcServer with write(clientId, message) and disconnect(clientId). The onFromServer callback receives every decoded FromServer response. It accepts the same tuning options as the higher-level server — concurrency, disableTracing, disableClientAcks, spanPrefix, and disableFatalDefects (see below).

RpcClient.makeNoSerialization(group, options) returns { client, write }, where client is the generated RPC client and write(message) delivers a decoded FromServer message to it. The onFromClient callback receives each outgoing FromClient message (plus its context and a discard flag). The flatten option matches RpcTest and RpcClient.make.

To build a genuinely new transport (not just in-process), implement the Protocol services. Both are created through a make hook that buffers writes until the run loop installs a writer:

  • RpcClient.Protocol.make (withRunClient) — you supply a function (write, clientIds) => Effect<Omit<Protocol["Service"], "run">>. Call write to push decoded-from-encoded server responses back to a client; return the rest of the protocol service (send, supportsAck, supportsTransferables).
  • RpcServer.Protocol.make (withRun) — you supply a function (write) => Effect<Omit<Protocol["Service"], "run">> and return send, end, disconnects, clientIds, initialMessage, and the capability flags. Call write(clientId, data) to feed encoded client messages into the server.
import { Effect, Option, Queue } from "effect"
import { RpcServer } from "effect/unstable/rpc"
const myServerProtocol = RpcServer.Protocol.make(
Effect.fnUntraced(function* (writeRequest) {
// ... wire up your transport, calling `writeRequest(clientId, encoded)`
// for each incoming encoded `FromClientEncoded` message ...
const disconnects = yield* Queue.make<number>()
return {
send: (_clientId, _response, _transferables) => Effect.void,
end: (_clientId) => Effect.void,
disconnects, // a Queue.Dequeue<number>
clientIds: Effect.succeed(new Set<number>()),
initialMessage: Effect.succeed(Option.none<unknown>()),
supportsAck: true,
supportsTransferables: false,
supportsSpanPropagation: false
}
})
)

By default a defect thrown in a handler (an unexpected error outside the RPC’s typed error channel) is treated as fatal at the protocol level: it becomes a connection-level Defect response rather than a normal request exit. This is usually what you want — an unexpected crash should not look like an ordinary result.

The disableFatalDefects option (on RpcServer.make, makeNoSerialization, layer, and layerHttp) flips that behaviour: when true, defects stay as ordinary request exits (a Die in the encoded ExitEncoded cause) and the connection survives. Use it when callers should observe per-request defects instead of seeing the connection torn down.

import { RpcServer } from "effect/unstable/rpc"
import type { Rpc, RpcGroup } from "effect/unstable/rpc"
// Defects become ordinary request exits, not connection-level protocol defects.
const server = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) =>
RpcServer.makeNoSerialization(group, {
onFromServer: () => undefined as any,
disableFatalDefects: true
})

Rpc.DefectSchema and the per-RPC defect schema

Section titled “Rpc.DefectSchema and the per-RPC defect schema”

Every Rpc carries a defectSchema (default Schema.Defect). You can override it per RPC with the defect option on Rpc.make. The schema must satisfy the Rpc.DefectSchema constraint: it must encode/decode without any services (DecodingServices and EncodingServices are never), since defects are serialized at the protocol layer with no context available.

import { Schema } from "effect"
import { Rpc } from "effect/unstable/rpc"
// Constrain how defects are represented on the wire for this RPC.
const Risky = Rpc.make("Risky", {
success: Schema.String,
defect: Schema.String // a DefectSchema: no encode/decode services
})
Risky.defectSchema // => Schema.String

Rpc.exitSchema(rpc) builds the Schema.Exit used to encode and decode an RPC’s results. The failure side folds together the RPC’s own error schema, any middleware error schemas, and (for streaming RPCs) the stream error schema; streaming RPCs use Schema.Void for the exit success value. The result is cached per RPC definition. This is the exact schema the client and server use at the Exit boundary.

import { Schema } from "effect"
import { Rpc } from "effect/unstable/rpc"
const GetUser = Rpc.make("GetUser", {
success: Schema.Struct({ id: Schema.String }),
error: Schema.String
})
const exit = Rpc.exitSchema(GetUser)
// => Schema.Exit<success: {id:string}, error: string | (middleware errors),
// defect: Schema.Defect>