Wire Formats: MsgPack, NDJSON, SSE
The effect/unstable/encoding group turns byte and text streams into typed
values and back. Three modules cover the wire formats Effect uses internally for
RPC, sockets, and HTTP streaming:
Msgpack— compact binary framing (MessagePack) for RPC transports, socket streams, caches, or database columns.Ndjson— newline-delimited JSON, one complete JSON value per line, for logs and append-only streams.Sse— the Server-Sent EventsEventSourcewire format for unidirectional server-to-client HTTP streams.
Each module exposes the same three flavors of API:
- Low-level
Channels (encode/decode) that you splice into aStreamwithStream.pipeThroughChannel. They operate on already-agreed value shapes. - Schema-bound variants (
encodeSchema/decodeSchema, …) that validate and transform each record against aSchemaat the boundary. duplexhelpers that wrap a bidirectional byte/text channel (e.g. a socket) so outgoing values are encoded and incoming bytes are decoded.
Common case
Section titled “Common case”All three modules are wired into streams the same way: build a channel, then pipe your stream through it. The schema variants are curried — call them with a schema first, then call the returned thunk to get the channel.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const Event = Schema.Struct({ id: Schema.Number, name: Schema.String})
// decode: text/bytes -> validated valuesconst decoded = Stream.make('{"id":1,"name":"a"}\n{"id":2,"name":"b"}\n').pipe( Stream.pipeThroughChannel(Ndjson.decodeSchemaString(Event)()), Stream.runCollect)// => [{ id: 1, name: "a" }, { id: 2, name: "b" }]
// encode: values -> NDJSON stringsconst encoded = Stream.make({ id: 1, name: "a" }, { id: 2, name: "b" }).pipe( Stream.pipeThroughChannel(Ndjson.encodeSchemaString(Event)()), Stream.runCollect)// => ['{"id":1,"name":"a"}\n{"id":2,"name":"b"}\n']The schema round-trip pattern is identical across modules: encode runs
Schema encoding before serialization, decode runs Schema decoding after
parsing. Failures surface as SchemaError (validation) or the module’s own
*Error (serialization).
MessagePack — Msgpack
Section titled “MessagePack — Msgpack”MessagePack is a compact binary format. Each value passed to encode becomes one
MessagePack frame; decode accepts arbitrary Uint8Array chunks and buffers
incomplete frames until enough bytes arrive. Import from
effect/unstable/encoding.
import { Msgpack } from "effect/unstable/encoding"encode
Section titled “encode”Channel that packs non-empty chunks of values into MessagePack Uint8Array
frames. Fails with MsgPackError (kind: "Pack") if a value cannot be packed.
import { Stream } from "effect"import { Msgpack } from "effect/unstable/encoding"
const bytes = Stream.make({ hello: "world" }, [1, 2, 3]).pipe( Stream.pipeThroughChannel(Msgpack.encode()), Stream.runCollect)// => [Uint8Array[...], Uint8Array[...]] // one frame per valuedecode
Section titled “decode”Channel that unpacks Uint8Array chunks back into values, buffering partial
frames across chunks. Invalid data fails with MsgPackError (kind: "Unpack").
import { Stream } from "effect"import { Msgpack } from "effect/unstable/encoding"
// round-trip through encode then decodeconst values = Stream.make({ hello: "world" }).pipe( Stream.pipeThroughChannel(Msgpack.encode()), Stream.pipeThroughChannel(Msgpack.decode()), Stream.runCollect)// => [{ hello: "world" }]encodeSchema
Section titled “encodeSchema”Schema-aware encoder: each value is encoded with the schema and then packed.
Curried — pass the schema, then call the returned thunk. Can fail with
SchemaError or MsgPackError.
import { Schema, Stream } from "effect"import { Msgpack } from "effect/unstable/encoding"
const User = Schema.Struct({ id: Schema.Number, name: Schema.String })
const packed = Stream.make({ id: 1, name: "Ada" }).pipe( Stream.pipeThroughChannel(Msgpack.encodeSchema(User)()), Stream.runCollect)// => [Uint8Array[...]] // validated, then MessagePack-packeddecodeSchema
Section titled “decodeSchema”Schema-aware decoder: bytes are unpacked into unknown, then each value is
decoded with the schema. Fails with MsgPackError or SchemaError.
import { Schema, Stream } from "effect"import { Msgpack } from "effect/unstable/encoding"
const User = Schema.Struct({ id: Schema.Number, name: Schema.String })
const users = Stream.make({ id: 1, name: "Ada" }).pipe( Stream.pipeThroughChannel(Msgpack.encodeSchema(User)()), Stream.pipeThroughChannel(Msgpack.decodeSchema(User)()), Stream.runCollect)// => [{ id: 1, name: "Ada" }]duplex
Section titled “duplex”Wraps a bidirectional byte channel (e.g. a socket) so outgoing values are packed
to MessagePack bytes before reaching it and incoming bytes are unpacked into
values. Use this for request/response over a Uint8Array transport.
import { Msgpack } from "effect/unstable/encoding"
// `socket` is a Channel<Uint8Array[], _, _, Uint8Array[], MsgPackError, _>declare const socket: any
// values out -> bytes; bytes in -> valuesconst wrapped = Msgpack.duplex(socket)// => Channel<unknown[], MsgPackError | _, _, unknown[], _, _>duplexSchema
Section titled “duplexSchema”Schema-aware duplex: outgoing values are encoded with inputSchema then
packed; incoming bytes are unpacked then decoded with outputSchema. Takes an
{ inputSchema, outputSchema } options object (dual — data-first or data-last).
import { Schema } from "effect"import { Msgpack } from "effect/unstable/encoding"
declare const socket: anyconst Request = Schema.Struct({ method: Schema.String })const Response = Schema.Struct({ status: Schema.Number })
const rpc = Msgpack.duplexSchema(socket, { inputSchema: Request, outputSchema: Response})// => Channel<Response[], MsgPackError | SchemaError | _, _, Request[], _, _, ...>schema
Section titled “schema”Builds a Schema that stores a value as MessagePack bytes: decoding a
Uint8Array payload yields the target type, encoding the target type yields
bytes. Useful for DB columns or message envelopes that should hold packed values.
import { Schema } from "effect"import { Msgpack } from "effect/unstable/encoding"
const Payload = Schema.Struct({ id: Schema.Number })const Stored = Msgpack.schema(Payload) // Schema<{ id: number }, Uint8Array>
const bytes = Schema.encodeUnknownSync(Stored)({ id: 7 })// => Uint8Array[...]const value = Schema.decodeUnknownSync(Stored)(bytes)// => { id: 7 }transformation
Section titled “transformation”The underlying SchemaTransformation between unknown and a MessagePack
Uint8Array that schema is built from. MessagePack codec failures become
InvalidValue schema issues. Use it directly when composing custom schema
pipelines with Schema.decodeTo.
import { Schema } from "effect"import { Msgpack } from "effect/unstable/encoding"
// equivalent to Msgpack.schema(target):const Bytes = Schema.instanceOf(Uint8Array)const Stored = Bytes.pipe( Schema.decodeTo(Schema.Struct({ id: Schema.Number }), Msgpack.transformation))// => decodes Uint8Array -> { id: number }, encodes back to bytesMsgPackError
Section titled “MsgPackError”Data.TaggedError("MsgPackError") raised by the channels. The kind field is
"Pack" or "Unpack"; cause preserves the original error; message returns
the kind. Catch with Stream.catchTag / Effect.catchTag.
import { Stream } from "effect"import { Msgpack } from "effect/unstable/encoding"
const enc = new TextEncoder()const recovered = Stream.make(enc.encode("not-msgpack-but-also-not-a-frame")).pipe( Stream.pipeThroughChannel(Msgpack.decode()), Stream.catchTag("MsgPackError", (err) => Stream.succeed({ recovered: true, kind: err.kind })), Stream.runCollect)// => [{ recovered: true, kind: "Unpack" }] // when a frame is invalidNDJSON — Ndjson
Section titled “NDJSON — Ndjson”NDJSON represents a stream as one complete JSON value per line. Byte helpers
handle UTF-8 transport chunks; string helpers handle already-decoded text.
Encoders append a trailing newline as record framing. Import from
effect/unstable/encoding.
import { Ndjson } from "effect/unstable/encoding"encodeString
Section titled “encodeString”Channel that serializes each value with JSON.stringify, joins with newlines, and
appends a trailing newline — emitting NDJSON strings.
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const out = Stream.make({ a: 1 }, { a: 2 }).pipe( Stream.pipeThroughChannel(Ndjson.encodeString()), Stream.runCollect)// => ['{"a":1}\n{"a":2}\n']encode
Section titled “encode”Same as encodeString but emits UTF-8 bytes (Uint8Array). Use for binary
I/O such as sockets or file descriptors.
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const out = Stream.make({ a: 1 }).pipe( Stream.pipeThroughChannel(Ndjson.encode()), Stream.runCollect)// => [Uint8Array of '{"a":1}\n']encodeSchema
Section titled “encodeSchema”Schema-aware byte encoder: values are encoded with the schema (applying transformations such as date formatting) then written as UTF-8 NDJSON bytes.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const out = Stream.make({ id: 1 }).pipe( Stream.pipeThroughChannel(Ndjson.encodeSchema(Row)()), Stream.runCollect)// => [Uint8Array of '{"id":1}\n']encodeSchemaString
Section titled “encodeSchemaString”Schema-aware string encoder: schema-encodes then writes NDJSON strings instead of bytes.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const out = Stream.make({ id: 1 }, { id: 2 }).pipe( Stream.pipeThroughChannel(Ndjson.encodeSchemaString(Row)()), Stream.runCollect)// => ['{"id":1}\n{"id":2}\n']decodeString
Section titled “decodeString”Channel that splits incoming strings on newlines (lines may span chunks) and
JSON.parses each line into unknown. Failures raise NdjsonError
(kind: "Unpack").
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const values = Stream.make('{"a":1}\n{"a":2}\n').pipe( Stream.pipeThroughChannel(Ndjson.decodeString()), Stream.runCollect)// => [{ a: 1 }, { a: 2 }]
// skip blank lines instead of failing:Ndjson.decodeString({ ignoreEmptyLines: true })decode
Section titled “decode”Channel that decodes UTF-8 byte chunks (handling text decoding internally) and
parses them as NDJSON. Same ignoreEmptyLines option as decodeString.
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const enc = new TextEncoder()const values = Stream.make(enc.encode('{"a":1}\n')).pipe( Stream.pipeThroughChannel(Ndjson.decode()), Stream.runCollect)// => [{ a: 1 }]decodeSchema
Section titled “decodeSchema”Schema-aware byte decoder: decodes UTF-8 bytes, parses each line, then decodes
each value with the schema. Fails with NdjsonError or SchemaError.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })const enc = new TextEncoder()
const rows = Stream.make(enc.encode('{"id":1}\n')).pipe( Stream.pipeThroughChannel(Ndjson.decodeSchema(Row)()), Stream.runCollect)// => [{ id: 1 }]decodeSchemaString
Section titled “decodeSchemaString”Schema-aware string decoder: parses each line as JSON then decodes each value with
the schema. The string counterpart to decodeSchema.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const rows = Stream.make('{"id":1}\n{"id":2}\n').pipe( Stream.pipeThroughChannel(Ndjson.decodeSchemaString(Row)()), Stream.runCollect)// => [{ id: 1 }, { id: 2 }]duplex
Section titled “duplex”Wraps a bidirectional byte channel: outgoing values become UTF-8 NDJSON
bytes, incoming bytes are parsed as NDJSON. Dual — accepts an optional
{ ignoreEmptyLines } either as a trailing argument or via the curried form.
import { Ndjson } from "effect/unstable/encoding"
declare const socket: any // Channel<Uint8Array[], _, _, Uint8Array[], NdjsonError, _>const conn = Ndjson.duplex(socket, { ignoreEmptyLines: true })// => Channel<unknown[], NdjsonError | _, _, unknown[], _, _>duplexString
Section titled “duplexString”Wraps a bidirectional string channel with NDJSON encoding/decoding. Same
shape as duplex but over text rather than bytes.
import { Ndjson } from "effect/unstable/encoding"
declare const textChannel: any // Channel<string[], ...>const conn = Ndjson.duplexString(textChannel)// => Channel<unknown[], NdjsonError | _, _, unknown[], _, _>duplexSchema
Section titled “duplexSchema”Schema-aware byte duplex: outgoing values are encoded with inputSchema;
incoming bytes are parsed and decoded with outputSchema. Options object also
accepts ignoreEmptyLines.
import { Schema } from "effect"import { Ndjson } from "effect/unstable/encoding"
declare const socket: anyconst Req = Schema.Struct({ q: Schema.String })const Res = Schema.Struct({ hits: Schema.Number })
const conn = Ndjson.duplexSchema(socket, { inputSchema: Req, outputSchema: Res, ignoreEmptyLines: true})// => Channel<Res[], NdjsonError | SchemaError | _, _, Req[], _, _, ...>duplexSchemaString
Section titled “duplexSchemaString”Schema-aware string duplex: the text counterpart to duplexSchema, wrapping a
bidirectional string channel.
import { Schema } from "effect"import { Ndjson } from "effect/unstable/encoding"
declare const textChannel: anyconst Req = Schema.Struct({ q: Schema.String })const Res = Schema.Struct({ hits: Schema.Number })
const conn = Ndjson.duplexSchemaString(textChannel, { inputSchema: Req, outputSchema: Res})// => Channel<Res[], NdjsonError | SchemaError | _, _, Req[], _, _, ...>NdjsonError
Section titled “NdjsonError”Data.TaggedError("NdjsonError") raised by the channels. kind is "Pack"
(encoding) or "Unpack" (decoding); cause holds the underlying exception;
message returns the kind.
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
const recovered = Stream.make("not-valid-json\n").pipe( Stream.pipeThroughChannel(Ndjson.decodeString()), Stream.catchTag("NdjsonError", (err) => Stream.succeed({ recovered: true, kind: err.kind })), Stream.runCollect)// => [{ recovered: true, kind: "Unpack" }]Server-Sent Events — Sse
Section titled “Server-Sent Events — Sse”SSE is line-oriented text, not a framed binary protocol. A blank line dispatches
an event; repeated data: lines are joined with newlines. retry: directives
are control messages: decoders surface them as Retry failures so callers can
reconnect with the requested delay. Import from effect/unstable/encoding.
import { Sse } from "effect/unstable/encoding"The decoder handles UTF-8 byte order marks, CRLF and LF line endings, and
preserves the last event ID. The default event name is message; the encoder
omits the event: line for that default.
decode
Section titled “decode”Channel that parses SSE text chunks into Event values. Incomplete lines are
buffered across chunks. A retry: directive fails the channel with Retry.
import { Stream } from "effect"import { Sse } from "effect/unstable/encoding"
const events = Stream.make("event: ping\ndata: hello\n\n").pipe( Stream.pipeThroughChannel(Sse.decode()), Stream.runCollect)// => [{ _tag: "Event", id: undefined, event: "ping", data: "hello" }]decodeSchema
Section titled “decodeSchema”Channel that decodes each parsed event with a Schema over the untagged event
shape ({ id?, event, data }). Use this to validate/transform the raw SSE fields.
import { Schema, Stream } from "effect"import { Sse } from "effect/unstable/encoding"
// schema sees { id?: string; event: string; data: string }const schema = Schema.Struct({ id: Schema.UndefinedOr(Schema.String), event: Schema.String, data: Schema.String})
const events = Stream.make("data: hi\n\n").pipe( Stream.pipeThroughChannel(Sse.decodeSchema(schema)), Stream.runCollect)// => [{ id: undefined, event: "message", data: "hi" }]decodeDataSchema
Section titled “decodeDataSchema”Channel that JSON-decodes each event’s data field with a Schema, preserving
the event name and optional id. Use when data carries a JSON payload.
import { Schema, Stream } from "effect"import { Sse } from "effect/unstable/encoding"
const Payload = Schema.Struct({ count: Schema.Number })
const events = Stream.make('event: tick\ndata: {"count":3}\n\n').pipe( Stream.pipeThroughChannel(Sse.decodeDataSchema(Payload)), Stream.runCollect)// => [{ id: undefined, event: "tick", data: { count: 3 } }]encode
Section titled “encode”Channel that renders Event values as SSE text. If the upstream fails with
Retry, the retry directive is written and the encoder completes.
import { Stream } from "effect"import { Sse } from "effect/unstable/encoding"
const text = Stream.make<Sse.Event>({ _tag: "Event", event: "ping", id: "1", data: "hello"}).pipe( Stream.pipeThroughChannel(Sse.encode()), Stream.runCollect)// => ["id: 1\nevent: ping\ndata: hello\n\n"]encodeSchema
Section titled “encodeSchema”Channel that schema-encodes domain values into the untagged SSE event shape,
transforms them to Event, and writes SSE text. The schema must produce
{ id?, event, data }.
import { Schema, Stream } from "effect"import { Sse } from "effect/unstable/encoding"
const schema = Schema.Struct({ id: Schema.UndefinedOr(Schema.String), event: Schema.String, data: Schema.String})
const text = Stream.make({ id: undefined, event: "message", data: "hi" }).pipe( Stream.pipeThroughChannel(Sse.encodeSchema(schema)), Stream.runCollect)// => ["data: hi\n\n"] // "message" event name is omittedmakeParser
Section titled “makeParser”Creates a stateful, imperative parser. Call feed(chunk) with text; each parsed
Event or Retry is passed to your callback. Call reset() to clear buffered
state. The building block behind decode.
import { Sse } from "effect/unstable/encoding"
const events: Array<Sse.AnyEvent> = []const parser = Sse.makeParser((event) => events.push(event))
parser.feed("event: ping\n")parser.feed("data: hi\n\n")// events => [{ _tag: "Event", id: undefined, event: "ping", data: "hi" }]parser.reset() // clears buffered partial-line stateParser
Section titled “Parser”The interface returned by makeParser: { feed(chunk: string): void; reset(): void }.
import { Sse } from "effect/unstable/encoding"
const parser: Sse.Parser = Sse.makeParser(() => {})// parser.feed("..."), parser.reset()Encoder
Section titled “Encoder”The interface for rendering an AnyEvent (an Event or Retry) as SSE text:
{ write(event: AnyEvent): string }.
import { Sse } from "effect/unstable/encoding"
const myEncoder: Sse.Encoder = { write: (event) => (event._tag === "Event" ? `data: ${event.data}\n\n` : "")}encoder
Section titled “encoder”The default Encoder implementation. Renders Event values as id / event /
data lines (omitting event: for the message default) and Retry values as
retry: directives.
import { Sse } from "effect/unstable/encoding"
Sse.encoder.write({ _tag: "Event", event: "message", id: undefined, data: "hi" })// => "data: hi\n\n"
Sse.encoder.write({ _tag: "Event", event: "ping", id: "7", data: "a\nb" })// => "id: 7\nevent: ping\ndata: a\ndata: b\n\n" // multiline data split across linesThe tagged model for an SSE message: { _tag: "Event"; event: string; id: string | undefined; data: string }. Sse.Event is both the TypeScript interface and the matching Schema.Struct (with _tag: Schema.tag("Event")).
import { Schema } from "effect"import { Sse } from "effect/unstable/encoding"
const value: Sse.Event = { _tag: "Event", event: "message", id: undefined, data: "hello"}
// Sse.Event is also a Schema:Schema.decodeUnknownSync(Sse.Event)({ _tag: "Event", event: "message", id: undefined, data: "hi"})// => { _tag: "Event", event: "message", id: undefined, data: "hi" }EventEncoded
Section titled “EventEncoded”The untagged SSE payload shape ({ event; id; data }), available both as a
TypeScript interface and as a Schema.Struct. decodeSchema decodes events
through this shape; .fields is reused by decodeDataSchema.
import { Schema } from "effect"import { Sse } from "effect/unstable/encoding"
// the schema used internally to validate raw SSE fields:Schema.decodeUnknownSync(Sse.EventEncoded)({ id: undefined, event: "ping", data: "x"})// => { id: undefined, event: "ping", data: "x" }transformEvent
Section titled “transformEvent”A SchemaTransformation from the untagged { id?, event, data } shape to the
tagged Event model. Used internally by encodeSchema; available for custom
schema pipelines.
import { Schema } from "effect"import { Sse } from "effect/unstable/encoding"
// decode untagged -> tagged Event, encode tagged -> untaggedconst toEvent = Sse.Event.pipe(Schema.decodeTo(Sse.EventEncoded, Sse.transformEvent))// => Schema bridging EventEncoded <-> EventData.TaggedClass("Retry") with { duration: Duration; lastEventId: string | undefined }.
Decoders emit it as a failure to request reconnection; encoders serialize it as a
retry: line. Provides Retry.is (a type guard) and Retry.filter (separates
retries from events, returning a Result).
import { Duration } from "effect"import { Sse } from "effect/unstable/encoding"
const retry = new Sse.Retry({ duration: Duration.millis(3000), lastEventId: "42"})
Sse.Retry.is(retry) // => trueSse.encoder.write(retry) // => "retry: 3000\n\n"When decoding, a retry: line fails the channel with Retry, which you can
catch to reconnect:
import { Stream } from "effect"import { Sse } from "effect/unstable/encoding"
const handled = Stream.make("retry: 5000\n\n").pipe( Stream.pipeThroughChannel(Sse.decode()), Stream.catchIf(Sse.Retry.is, (r) => Stream.succeed({ reconnectAfterMs: r.duration })), Stream.runCollect)// => [{ reconnectAfterMs: Duration(5000ms) }]AnyEvent
Section titled “AnyEvent”The union Event | Retry — every value an Encoder can render and a Parser
callback can receive.
import { Sse } from "effect/unstable/encoding"
const render = (event: Sse.AnyEvent): string => Sse.encoder.write(event)// accepts both Event and Retry values