Advanced RPC
The everyday RPC surface — defining RPCs, wiring a client and server, and choosing a transport — is built on a handful of lower-level modules. You rarely touch them directly, but they become necessary when you:
- need to send startup data to a worker before the first request (config, credentials, transferable handles),
- want to wire a client and server in process with no bytes on the wire,
- are writing a custom transport or serializer and need the message protocol, or
- need to control how defects flow back to callers.
This page collects those surfaces. It is example-first for the things you call
(RpcWorker, the no-serialization channels, defect options) and a labelled
reference for the protocol data types (RpcMessage) that custom transports
exchange.
Worker bootstrap messages (RpcWorker)
Section titled “Worker bootstrap messages (RpcWorker)”A worker-backed RPC client sometimes needs to hand the worker a one-time
envelope of data before any RPC request is handled: per-worker configuration,
credentials, feature flags, a preloaded cache, or transferable handles
(MessagePort, ArrayBuffer, …). RpcWorker provides a typed channel for
exactly that.
The initial message is encoded with a schema’s JSON codec on the client side,
posted with the worker’s postMessage, and decoded with the same schema inside
the worker. It is separate from RpcSerialization — changing how
requests/responses are serialized does not change how this bootstrap payload is
encoded — and it crosses the structured-clone boundary, so any transferables
you collect are moved to the worker (the sender can no longer use them).
import { Effect, Layer, Schema } from "effect"import { RpcWorker } from "effect/unstable/rpc"
// The shape of the startup data both sides agree on.const Bootstrap = Schema.Struct({ tenantId: Schema.String, features: Schema.Array(Schema.String)})
// CLIENT SIDE: provide the typed startup data as a layer. The build effect runs// once; its result is encoded and stored in context for the worker client// runtime to post. This layer provides the `RpcWorker.InitialMessage` service.const InitialMessageLayer = RpcWorker.layerInitialMessage(Bootstrap, Effect.succeed({ tenantId: "acme", features: ["beta-ui", "fast-path"]}))
// WORKER SIDE: read and decode the bootstrap payload. Requires the server// `Protocol`, and fails with `NoSuchElementError` if no initial message was// sent or `Schema.SchemaError` if decoding fails.const program = Effect.gen(function* () { const boot = yield* RpcWorker.initialMessage(Bootstrap) yield* Effect.log(`booting tenant ${boot.tenantId}`) // => decoded as { tenantId: string; features: ReadonlyArray<string> }})RpcWorker.InitialMessage
Section titled “RpcWorker.InitialMessage”A Context.Service holding the encoded initial message paired with the
transferables that should be posted with it. The service value is an Effect
that, when run, yields readonly [data: unknown, transfers: ReadonlyArray<Transferable>].
You normally obtain it via layerInitialMessage rather than constructing it
by hand.
import { RpcWorker } from "effect/unstable/rpc"
RpcWorker.InitialMessage.key// => "effect/rpc/RpcWorker/InitialMessage"RpcWorker.layerInitialMessage
Section titled “RpcWorker.layerInitialMessage”layerInitialMessage(schema, build) (client side) builds a Layer providing
InitialMessage. It runs the build effect, encodes the value with the
schema’s JSON codec, collects transferables, and stores the result in context.
Schema encoding failures are turned into a defect (the build effect’s error
channel is never).
import { Effect, Schema } from "effect"import { RpcWorker } from "effect/unstable/rpc"
const Config = Schema.Struct({ region: Schema.String })
const layer = RpcWorker.layerInitialMessage( Config, Effect.succeed({ region: "eu-west-1" }))// => Layer<RpcWorker.InitialMessage, never, never>RpcWorker.makeInitialMessage
Section titled “RpcWorker.makeInitialMessage”makeInitialMessage(schema, effect) runs effect, encodes its result with the
schema’s JSON codec, and returns the encoded value together with the list of
transferables collected during encoding. Use this when you want the encoded
payload and transfer list directly (layerInitialMessage is built on top of
it). It can fail with Schema.SchemaError.
import { Effect, Schema } from "effect"import { RpcWorker } from "effect/unstable/rpc"
const Payload = Schema.Struct({ token: Schema.String })
const make = RpcWorker.makeInitialMessage( Payload, Effect.succeed({ token: "secret" }))// => Effect<readonly [data: unknown, transferables: ReadonlyArray<Transferable>], SchemaError, ...>RpcWorker.initialMessage
Section titled “RpcWorker.initialMessage”initialMessage(schema) (worker side) reads the protocol’s bootstrap payload
and decodes it with the supplied schema. It requires the server Protocol
service and fails with NoSuchElementError when no initial message is present,
or Schema.SchemaError when decoding fails.
import { Effect, Schema } from "effect"import { RpcWorker } from "effect/unstable/rpc"
const Bootstrap = Schema.Struct({ tenantId: Schema.String })
const boot = RpcWorker.initialMessage(Bootstrap)// => Effect<{ tenantId: string }, NoSuchElementError | SchemaError, Protocol>Streaming internals (RpcSchema)
Section titled “Streaming internals (RpcSchema)”A streaming RPC still has an ordinary RPC exit, but its success schema is
wrapped in an RpcSchema.Stream marker that records the stream element schema
and the stream error schema. Rpc.make(tag, { stream: true }) installs this
marker for you, so you rarely call RpcSchema.Stream directly — but transports
and the cluster/reactivity layers read it to encode and decode chunks.
The key semantic: for a streaming RPC the immediate exit succeeds with
void; the stream elements and stream errors travel separately as Chunk
responses (see the wire protocol below).
RpcSchema.Stream
Section titled “RpcSchema.Stream”Stream(success, error) builds the streaming success marker from a success
element schema and a stream error schema. The resulting schema describes a
Stream<A, E> value.
import { Schema } from "effect"import { RpcSchema } from "effect/unstable/rpc"
const marker = RpcSchema.Stream(Schema.Number, Schema.String)// => an RpcSchema.Stream<Schema.Number, Schema.String>// equivalent to what Rpc.make("name", { success: Schema.Number,// error: Schema.String, stream: true }) installs for youRpcSchema.isStreamSchema
Section titled “RpcSchema.isStreamSchema”A type guard that returns true for schemas created by RpcSchema.Stream.
Protocol code uses it to branch between one-shot and streaming responses, and to
reach the stored success / error schemas.
import { Schema } from "effect"import { RpcSchema } from "effect/unstable/rpc"
const streamed = RpcSchema.Stream(Schema.Number, Schema.String)RpcSchema.isStreamSchema(streamed) // => trueRpcSchema.isStreamSchema(Schema.Number) // => false
if (RpcSchema.isStreamSchema(streamed)) { streamed.success // => Schema.Number (the element schema) streamed.error // => Schema.String (the stream error schema)}RpcSchema.ClientAbort
Section titled “RpcSchema.ClientAbort”A Context.Service annotation that tags interruptions originating from a remote
client aborting — disconnecting or cancelling an in-flight request. The server
attaches RpcSchema.ClientAbort.annotation to the interrupt cause, so handler
finalizers can distinguish a client-initiated cancel from other interruptions.
import { Cause, Context, Effect, Option } from "effect"import { RpcSchema } from "effect/unstable/rpc"
// `Effect.onError` hands the finalizer the failure `Cause`, which carries the// interrupt annotation the server attaches when a client aborts.const handler = Effect.never.pipe( Effect.onError((cause) => Effect.gen(function* () { const annotations = Cause.annotations(cause) const aborted = Context.getOption(annotations, RpcSchema.ClientAbort) if (Option.isSome(aborted)) { // => Option.some(true) when the client disconnected/cancelled yield* Effect.log("client aborted the request") } }) ))The wire protocol (RpcMessage)
Section titled “The wire protocol (RpcMessage)”RpcMessage is the protocol vocabulary that sits below RpcClient and
RpcServer. If you are writing a custom transport or serializer, these are the
envelopes you move. Each message comes in two forms:
- Decoded messages carry branded ids (
RequestId), typed RPC tags, typed payloads, andHeaders. These flow through in-process channels. - Encoded (
*Encoded) messages use string ids andunknownpayloads that have already crossed the schema serialization boundary. These flow across transport boundaries.
A single request is identified by one RequestId from its first Request,
through any Chunk batches, to the terminal Exit, plus any Acks and
Interrupts. Transports must preserve request ids exactly across the
encoded/decoded boundary, or responses, acks, and interrupts will be routed to
the wrong call.
These are protocol data types, so the reference below is a labelled list rather than a runnable example per interface.
Client to server messages
Section titled “Client to server messages”The decoded union is FromClient = Request | Ack | Interrupt | Eof; the encoded
union is FromClientEncoded = RequestEncoded | AckEncoded | InterruptEncoded | Ping | Eof.
Request<A>/RequestEncoded— starts work. Carriesid,tag,payload,headers, and optionaltraceId/spanId/sampledtrace context. Decoded uses a brandedRequestIdand typedtag/payload; encoded uses a stringidandunknownpayload.Ack/AckEncoded— acknowledges a streamed response chunk for stream back pressure. It is not a call-completion signal. CarriesrequestId.Interrupt/InterruptEncoded— cancels an in-flight RPC. Decoded carriesrequestIdand the interrupting fiber ids (interruptors); encoded carries just the stringrequestId.Eof— the client has finished sending input for the current connection or request batch. It does not replace the terminalExitresponse.Ping— a client-to-server keepalive used by liveness-monitoring transports (encoded family only).
Server to client messages
Section titled “Server to client messages”The decoded union is FromServer = ResponseChunk | ResponseExit | ResponseDefect | ClientEnd;
the encoded union is FromServerEncoded = ResponseChunkEncoded | ResponseExitEncoded | ResponseDefectEncoded | Pong | ClientProtocolError.
ResponseChunk<A>/ResponseChunkEncoded(_tag: "Chunk") — a non-empty batch of stream chunk values for a request. Decoded also carries theclientId.ResponseExit<A>/ResponseExitEncoded(_tag: "Exit") — the terminal response carrying the RPC’sExit. The encodedexitusesExitEncoded<A, E>whose failure side is an array ofFail(typed error),Die(defect), orInterrupt(with optionalfiberId) entries.ResponseDefect/ResponseDefectEncoded(_tag: "Defect") — a connection-level (protocol) defect affecting the client connection, not tied to a single request’s exit.ClientEnd— the client connection has ended (decoded only; carriesclientId).ClientProtocolError— reports anRpcClientErrorto affected in-flight requests (encoded family only).Pong— keepalive response to aPing(encoded family only).
RpcMessage.RequestId
Section titled “RpcMessage.RequestId”A RequestId is a bigint branded as "~effect/rpc/RpcMessage/RequestId". The
RequestId(id) constructor coerces a bigint or a string into the branded
type — strings are parsed with BigInt.
import { RpcMessage } from "effect/unstable/rpc"
const a = RpcMessage.RequestId(42n)const b = RpcMessage.RequestId("42")a === b // => true (both are the branded bigint 42n)RpcMessage.constEof / constPing / constPong
Section titled “RpcMessage.constEof / constPing / constPong”Reusable singleton message values, so transports don’t reallocate them.
import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.constEof // => { _tag: "Eof" }RpcMessage.constPing // => { _tag: "Ping" }RpcMessage.constPong // => { _tag: "Pong" }RpcMessage.ResponseDefectEncoded
Section titled “RpcMessage.ResponseDefectEncoded”A constructor that builds a transport-encoded Defect response, encoding the
input with Schema.Defect (so arbitrary thrown values survive the wire).
import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.ResponseDefectEncoded(new Error("boom"))// => { _tag: "Defect", defect: <Schema.Defect-encoded Error> }RpcMessage.ResponseExitDieEncoded
Section titled “RpcMessage.ResponseExitDieEncoded”Builds an encoded terminal Exit response whose exit is a die (defect) for a
specific requestId. The defect is encoded with Schema.Defect. Use this to
fail a single request with a defect rather than tearing down the connection.
import { RpcMessage } from "effect/unstable/rpc"
RpcMessage.ResponseExitDieEncoded({ requestId: RpcMessage.RequestId(7n), defect: new Error("handler crashed")})// => { _tag: "Exit", requestId: "7", exit: { _tag: "Failure",// cause: [{ _tag: "Die", defect: <encoded Error> }] } }Custom and in-process channels
Section titled “Custom and in-process channels”RpcServer.makeNoSerialization and RpcClient.makeNoSerialization wire a server
and client (or an already-decoded channel) using decoded FromClient /
FromServer messages directly — no bytes, no serialization. This is how
RpcTest connects a generated client straight to
handlers, and it is the foundation for in-process or already-decoded transports.
The pattern is symmetrical: each side is created with a callback that delivers
its outgoing messages, and returns a write (server) / write (client)
function for delivering the other side’s messages back in.
import { Effect } from "effect"import { RpcClient, RpcServer } from "effect/unstable/rpc"import type { Rpc, RpcGroup } from "effect/unstable/rpc"
// `group` is your RpcGroup. Both halves are scoped Effects.const wire = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) => Effect.gen(function* () { // Created lazily so the server can reference the client's `write`. let client!: Effect.Success< ReturnType<typeof RpcClient.makeNoSerialization<Rpcs, never, false>> >
// Server: handlers run here; decoded responses are pushed to the client. const server = yield* RpcServer.makeNoSerialization(group, { onFromServer: (response) => client.write(response) })
// Client: requests are pushed to the server as decoded messages. client = yield* RpcClient.makeNoSerialization(group, { supportsAck: true, onFromClient: ({ message }) => server.write(0, message) })
return client.client })RpcServer.makeNoSerialization(group, options) returns an RpcServer with
write(clientId, message) and disconnect(clientId). The onFromServer
callback receives every decoded FromServer response. It accepts the same
tuning options as the higher-level server — concurrency, disableTracing,
disableClientAcks, spanPrefix, and disableFatalDefects (see
below).
RpcClient.makeNoSerialization(group, options) returns { client, write },
where client is the generated RPC client and write(message) delivers a
decoded FromServer message to it. The onFromClient callback receives each
outgoing FromClient message (plus its context and a discard flag). The
flatten option matches RpcTest and RpcClient.make.
Building a brand-new transport
Section titled “Building a brand-new transport”To build a genuinely new transport (not just in-process), implement the
Protocol services. Both are created through a make hook that buffers writes
until the run loop installs a writer:
RpcClient.Protocol.make(withRunClient) — you supply a function(write, clientIds) => Effect<Omit<Protocol["Service"], "run">>. Callwriteto push decoded-from-encoded server responses back to a client; return the rest of the protocol service (send,supportsAck,supportsTransferables).RpcServer.Protocol.make(withRun) — you supply a function(write) => Effect<Omit<Protocol["Service"], "run">>and returnsend,end,disconnects,clientIds,initialMessage, and the capability flags. Callwrite(clientId, data)to feed encoded client messages into the server.
import { Effect, Option, Queue } from "effect"import { RpcServer } from "effect/unstable/rpc"
const myServerProtocol = RpcServer.Protocol.make( Effect.fnUntraced(function* (writeRequest) { // ... wire up your transport, calling `writeRequest(clientId, encoded)` // for each incoming encoded `FromClientEncoded` message ... const disconnects = yield* Queue.make<number>() return { send: (_clientId, _response, _transferables) => Effect.void, end: (_clientId) => Effect.void, disconnects, // a Queue.Dequeue<number> clientIds: Effect.succeed(new Set<number>()), initialMessage: Effect.succeed(Option.none<unknown>()), supportsAck: true, supportsTransferables: false, supportsSpanPropagation: false } }))Defect handling and exit schema
Section titled “Defect handling and exit schema”By default a defect thrown in a handler (an unexpected error outside the
RPC’s typed error channel) is treated as fatal at the protocol level: it
becomes a connection-level Defect response rather than a normal request exit.
This is usually what you want — an unexpected crash should not look like an
ordinary result.
disableFatalDefects
Section titled “disableFatalDefects”The disableFatalDefects option (on RpcServer.make, makeNoSerialization,
layer, and layerHttp) flips that behaviour: when true, defects stay as
ordinary request exits (a Die in the encoded ExitEncoded cause) and the
connection survives. Use it when callers should observe per-request defects
instead of seeing the connection torn down.
import { RpcServer } from "effect/unstable/rpc"import type { Rpc, RpcGroup } from "effect/unstable/rpc"
// Defects become ordinary request exits, not connection-level protocol defects.const server = <Rpcs extends Rpc.Any>(group: RpcGroup.RpcGroup<Rpcs>) => RpcServer.makeNoSerialization(group, { onFromServer: () => undefined as any, disableFatalDefects: true })Rpc.DefectSchema and the per-RPC defect schema
Section titled “Rpc.DefectSchema and the per-RPC defect schema”Every Rpc carries a defectSchema (default Schema.Defect). You can override
it per RPC with the defect option on Rpc.make. The schema must satisfy the
Rpc.DefectSchema constraint: it must encode/decode without any services
(DecodingServices and EncodingServices are never), since defects are
serialized at the protocol layer with no context available.
import { Schema } from "effect"import { Rpc } from "effect/unstable/rpc"
// Constrain how defects are represented on the wire for this RPC.const Risky = Rpc.make("Risky", { success: Schema.String, defect: Schema.String // a DefectSchema: no encode/decode services})
Risky.defectSchema // => Schema.StringRpc.exitSchema
Section titled “Rpc.exitSchema”Rpc.exitSchema(rpc) builds the Schema.Exit used to encode and decode an
RPC’s results. The failure side folds together the RPC’s own error schema, any
middleware error schemas, and (for streaming RPCs) the stream error schema;
streaming RPCs use Schema.Void for the exit success value. The result is
cached per RPC definition. This is the exact schema the client and server use at
the Exit boundary.
import { Schema } from "effect"import { Rpc } from "effect/unstable/rpc"
const GetUser = Rpc.make("GetUser", { success: Schema.Struct({ id: Schema.String }), error: Schema.String})
const exit = Rpc.exitSchema(GetUser)// => Schema.Exit<success: {id:string}, error: string | (middleware errors),// defect: Schema.Defect>