Skip to content

Wire Formats: MsgPack, NDJSON, SSE

The effect/unstable/encoding group turns byte and text streams into typed values and back. Three modules cover the wire formats Effect uses internally for RPC, sockets, and HTTP streaming:

  • Msgpack — compact binary framing (MessagePack) for RPC transports, socket streams, caches, or database columns.
  • Ndjson — newline-delimited JSON, one complete JSON value per line, for logs and append-only streams.
  • Sse — the Server-Sent Events EventSource wire format for unidirectional server-to-client HTTP streams.

Each module exposes the same three flavors of API:

  1. Low-level Channels (encode / decode) that you splice into a Stream with Stream.pipeThroughChannel. They operate on already-agreed value shapes.
  2. Schema-bound variants (encodeSchema / decodeSchema, …) that validate and transform each record against a Schema at the boundary.
  3. duplex helpers that wrap a bidirectional byte/text channel (e.g. a socket) so outgoing values are encoded and incoming bytes are decoded.

All three modules are wired into streams the same way: build a channel, then pipe your stream through it. The schema variants are curried — call them with a schema first, then call the returned thunk to get the channel.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const Event = Schema.Struct({
id: Schema.Number,
name: Schema.String
})
// decode: text/bytes -> validated values
const decoded = Stream.make('{"id":1,"name":"a"}\n{"id":2,"name":"b"}\n').pipe(
Stream.pipeThroughChannel(Ndjson.decodeSchemaString(Event)()),
Stream.runCollect
)
// => [{ id: 1, name: "a" }, { id: 2, name: "b" }]
// encode: values -> NDJSON strings
const encoded = Stream.make({ id: 1, name: "a" }, { id: 2, name: "b" }).pipe(
Stream.pipeThroughChannel(Ndjson.encodeSchemaString(Event)()),
Stream.runCollect
)
// => ['{"id":1,"name":"a"}\n{"id":2,"name":"b"}\n']

The schema round-trip pattern is identical across modules: encode runs Schema encoding before serialization, decode runs Schema decoding after parsing. Failures surface as SchemaError (validation) or the module’s own *Error (serialization).


MessagePack is a compact binary format. Each value passed to encode becomes one MessagePack frame; decode accepts arbitrary Uint8Array chunks and buffers incomplete frames until enough bytes arrive. Import from effect/unstable/encoding.

import { Msgpack } from "effect/unstable/encoding"

Channel that packs non-empty chunks of values into MessagePack Uint8Array frames. Fails with MsgPackError (kind: "Pack") if a value cannot be packed.

import { Stream } from "effect"
import { Msgpack } from "effect/unstable/encoding"
const bytes = Stream.make({ hello: "world" }, [1, 2, 3]).pipe(
Stream.pipeThroughChannel(Msgpack.encode()),
Stream.runCollect
)
// => [Uint8Array[...], Uint8Array[...]] // one frame per value

Channel that unpacks Uint8Array chunks back into values, buffering partial frames across chunks. Invalid data fails with MsgPackError (kind: "Unpack").

import { Stream } from "effect"
import { Msgpack } from "effect/unstable/encoding"
// round-trip through encode then decode
const values = Stream.make({ hello: "world" }).pipe(
Stream.pipeThroughChannel(Msgpack.encode()),
Stream.pipeThroughChannel(Msgpack.decode()),
Stream.runCollect
)
// => [{ hello: "world" }]

Schema-aware encoder: each value is encoded with the schema and then packed. Curried — pass the schema, then call the returned thunk. Can fail with SchemaError or MsgPackError.

import { Schema, Stream } from "effect"
import { Msgpack } from "effect/unstable/encoding"
const User = Schema.Struct({ id: Schema.Number, name: Schema.String })
const packed = Stream.make({ id: 1, name: "Ada" }).pipe(
Stream.pipeThroughChannel(Msgpack.encodeSchema(User)()),
Stream.runCollect
)
// => [Uint8Array[...]] // validated, then MessagePack-packed

Schema-aware decoder: bytes are unpacked into unknown, then each value is decoded with the schema. Fails with MsgPackError or SchemaError.

import { Schema, Stream } from "effect"
import { Msgpack } from "effect/unstable/encoding"
const User = Schema.Struct({ id: Schema.Number, name: Schema.String })
const users = Stream.make({ id: 1, name: "Ada" }).pipe(
Stream.pipeThroughChannel(Msgpack.encodeSchema(User)()),
Stream.pipeThroughChannel(Msgpack.decodeSchema(User)()),
Stream.runCollect
)
// => [{ id: 1, name: "Ada" }]

Wraps a bidirectional byte channel (e.g. a socket) so outgoing values are packed to MessagePack bytes before reaching it and incoming bytes are unpacked into values. Use this for request/response over a Uint8Array transport.

import { Msgpack } from "effect/unstable/encoding"
// `socket` is a Channel<Uint8Array[], _, _, Uint8Array[], MsgPackError, _>
declare const socket: any
// values out -> bytes; bytes in -> values
const wrapped = Msgpack.duplex(socket)
// => Channel<unknown[], MsgPackError | _, _, unknown[], _, _>

Schema-aware duplex: outgoing values are encoded with inputSchema then packed; incoming bytes are unpacked then decoded with outputSchema. Takes an { inputSchema, outputSchema } options object (dual — data-first or data-last).

import { Schema } from "effect"
import { Msgpack } from "effect/unstable/encoding"
declare const socket: any
const Request = Schema.Struct({ method: Schema.String })
const Response = Schema.Struct({ status: Schema.Number })
const rpc = Msgpack.duplexSchema(socket, {
inputSchema: Request,
outputSchema: Response
})
// => Channel<Response[], MsgPackError | SchemaError | _, _, Request[], _, _, ...>

Builds a Schema that stores a value as MessagePack bytes: decoding a Uint8Array payload yields the target type, encoding the target type yields bytes. Useful for DB columns or message envelopes that should hold packed values.

import { Schema } from "effect"
import { Msgpack } from "effect/unstable/encoding"
const Payload = Schema.Struct({ id: Schema.Number })
const Stored = Msgpack.schema(Payload) // Schema<{ id: number }, Uint8Array>
const bytes = Schema.encodeUnknownSync(Stored)({ id: 7 })
// => Uint8Array[...]
const value = Schema.decodeUnknownSync(Stored)(bytes)
// => { id: 7 }

The underlying SchemaTransformation between unknown and a MessagePack Uint8Array that schema is built from. MessagePack codec failures become InvalidValue schema issues. Use it directly when composing custom schema pipelines with Schema.decodeTo.

import { Schema } from "effect"
import { Msgpack } from "effect/unstable/encoding"
// equivalent to Msgpack.schema(target):
const Bytes = Schema.instanceOf(Uint8Array)
const Stored = Bytes.pipe(
Schema.decodeTo(Schema.Struct({ id: Schema.Number }), Msgpack.transformation)
)
// => decodes Uint8Array -> { id: number }, encodes back to bytes

Data.TaggedError("MsgPackError") raised by the channels. The kind field is "Pack" or "Unpack"; cause preserves the original error; message returns the kind. Catch with Stream.catchTag / Effect.catchTag.

import { Stream } from "effect"
import { Msgpack } from "effect/unstable/encoding"
const enc = new TextEncoder()
const recovered = Stream.make(enc.encode("not-msgpack-but-also-not-a-frame")).pipe(
Stream.pipeThroughChannel(Msgpack.decode()),
Stream.catchTag("MsgPackError", (err) =>
Stream.succeed({ recovered: true, kind: err.kind })),
Stream.runCollect
)
// => [{ recovered: true, kind: "Unpack" }] // when a frame is invalid

NDJSON represents a stream as one complete JSON value per line. Byte helpers handle UTF-8 transport chunks; string helpers handle already-decoded text. Encoders append a trailing newline as record framing. Import from effect/unstable/encoding.

import { Ndjson } from "effect/unstable/encoding"

Channel that serializes each value with JSON.stringify, joins with newlines, and appends a trailing newline — emitting NDJSON strings.

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const out = Stream.make({ a: 1 }, { a: 2 }).pipe(
Stream.pipeThroughChannel(Ndjson.encodeString()),
Stream.runCollect
)
// => ['{"a":1}\n{"a":2}\n']

Same as encodeString but emits UTF-8 bytes (Uint8Array). Use for binary I/O such as sockets or file descriptors.

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const out = Stream.make({ a: 1 }).pipe(
Stream.pipeThroughChannel(Ndjson.encode()),
Stream.runCollect
)
// => [Uint8Array of '{"a":1}\n']

Schema-aware byte encoder: values are encoded with the schema (applying transformations such as date formatting) then written as UTF-8 NDJSON bytes.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const out = Stream.make({ id: 1 }).pipe(
Stream.pipeThroughChannel(Ndjson.encodeSchema(Row)()),
Stream.runCollect
)
// => [Uint8Array of '{"id":1}\n']

Schema-aware string encoder: schema-encodes then writes NDJSON strings instead of bytes.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const out = Stream.make({ id: 1 }, { id: 2 }).pipe(
Stream.pipeThroughChannel(Ndjson.encodeSchemaString(Row)()),
Stream.runCollect
)
// => ['{"id":1}\n{"id":2}\n']

Channel that splits incoming strings on newlines (lines may span chunks) and JSON.parses each line into unknown. Failures raise NdjsonError (kind: "Unpack").

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const values = Stream.make('{"a":1}\n{"a":2}\n').pipe(
Stream.pipeThroughChannel(Ndjson.decodeString()),
Stream.runCollect
)
// => [{ a: 1 }, { a: 2 }]
// skip blank lines instead of failing:
Ndjson.decodeString({ ignoreEmptyLines: true })

Channel that decodes UTF-8 byte chunks (handling text decoding internally) and parses them as NDJSON. Same ignoreEmptyLines option as decodeString.

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const enc = new TextEncoder()
const values = Stream.make(enc.encode('{"a":1}\n')).pipe(
Stream.pipeThroughChannel(Ndjson.decode()),
Stream.runCollect
)
// => [{ a: 1 }]

Schema-aware byte decoder: decodes UTF-8 bytes, parses each line, then decodes each value with the schema. Fails with NdjsonError or SchemaError.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const enc = new TextEncoder()
const rows = Stream.make(enc.encode('{"id":1}\n')).pipe(
Stream.pipeThroughChannel(Ndjson.decodeSchema(Row)()),
Stream.runCollect
)
// => [{ id: 1 }]

Schema-aware string decoder: parses each line as JSON then decodes each value with the schema. The string counterpart to decodeSchema.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const Row = Schema.Struct({ id: Schema.Number })
const rows = Stream.make('{"id":1}\n{"id":2}\n').pipe(
Stream.pipeThroughChannel(Ndjson.decodeSchemaString(Row)()),
Stream.runCollect
)
// => [{ id: 1 }, { id: 2 }]

Wraps a bidirectional byte channel: outgoing values become UTF-8 NDJSON bytes, incoming bytes are parsed as NDJSON. Dual — accepts an optional { ignoreEmptyLines } either as a trailing argument or via the curried form.

import { Ndjson } from "effect/unstable/encoding"
declare const socket: any // Channel<Uint8Array[], _, _, Uint8Array[], NdjsonError, _>
const conn = Ndjson.duplex(socket, { ignoreEmptyLines: true })
// => Channel<unknown[], NdjsonError | _, _, unknown[], _, _>

Wraps a bidirectional string channel with NDJSON encoding/decoding. Same shape as duplex but over text rather than bytes.

import { Ndjson } from "effect/unstable/encoding"
declare const textChannel: any // Channel<string[], ...>
const conn = Ndjson.duplexString(textChannel)
// => Channel<unknown[], NdjsonError | _, _, unknown[], _, _>

Schema-aware byte duplex: outgoing values are encoded with inputSchema; incoming bytes are parsed and decoded with outputSchema. Options object also accepts ignoreEmptyLines.

import { Schema } from "effect"
import { Ndjson } from "effect/unstable/encoding"
declare const socket: any
const Req = Schema.Struct({ q: Schema.String })
const Res = Schema.Struct({ hits: Schema.Number })
const conn = Ndjson.duplexSchema(socket, {
inputSchema: Req,
outputSchema: Res,
ignoreEmptyLines: true
})
// => Channel<Res[], NdjsonError | SchemaError | _, _, Req[], _, _, ...>

Schema-aware string duplex: the text counterpart to duplexSchema, wrapping a bidirectional string channel.

import { Schema } from "effect"
import { Ndjson } from "effect/unstable/encoding"
declare const textChannel: any
const Req = Schema.Struct({ q: Schema.String })
const Res = Schema.Struct({ hits: Schema.Number })
const conn = Ndjson.duplexSchemaString(textChannel, {
inputSchema: Req,
outputSchema: Res
})
// => Channel<Res[], NdjsonError | SchemaError | _, _, Req[], _, _, ...>

Data.TaggedError("NdjsonError") raised by the channels. kind is "Pack" (encoding) or "Unpack" (decoding); cause holds the underlying exception; message returns the kind.

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
const recovered = Stream.make("not-valid-json\n").pipe(
Stream.pipeThroughChannel(Ndjson.decodeString()),
Stream.catchTag("NdjsonError", (err) =>
Stream.succeed({ recovered: true, kind: err.kind })),
Stream.runCollect
)
// => [{ recovered: true, kind: "Unpack" }]

SSE is line-oriented text, not a framed binary protocol. A blank line dispatches an event; repeated data: lines are joined with newlines. retry: directives are control messages: decoders surface them as Retry failures so callers can reconnect with the requested delay. Import from effect/unstable/encoding.

import { Sse } from "effect/unstable/encoding"

The decoder handles UTF-8 byte order marks, CRLF and LF line endings, and preserves the last event ID. The default event name is message; the encoder omits the event: line for that default.

Channel that parses SSE text chunks into Event values. Incomplete lines are buffered across chunks. A retry: directive fails the channel with Retry.

import { Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
const events = Stream.make("event: ping\ndata: hello\n\n").pipe(
Stream.pipeThroughChannel(Sse.decode()),
Stream.runCollect
)
// => [{ _tag: "Event", id: undefined, event: "ping", data: "hello" }]

Channel that decodes each parsed event with a Schema over the untagged event shape ({ id?, event, data }). Use this to validate/transform the raw SSE fields.

import { Schema, Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
// schema sees { id?: string; event: string; data: string }
const schema = Schema.Struct({
id: Schema.UndefinedOr(Schema.String),
event: Schema.String,
data: Schema.String
})
const events = Stream.make("data: hi\n\n").pipe(
Stream.pipeThroughChannel(Sse.decodeSchema(schema)),
Stream.runCollect
)
// => [{ id: undefined, event: "message", data: "hi" }]

Channel that JSON-decodes each event’s data field with a Schema, preserving the event name and optional id. Use when data carries a JSON payload.

import { Schema, Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
const Payload = Schema.Struct({ count: Schema.Number })
const events = Stream.make('event: tick\ndata: {"count":3}\n\n').pipe(
Stream.pipeThroughChannel(Sse.decodeDataSchema(Payload)),
Stream.runCollect
)
// => [{ id: undefined, event: "tick", data: { count: 3 } }]

Channel that renders Event values as SSE text. If the upstream fails with Retry, the retry directive is written and the encoder completes.

import { Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
const text = Stream.make<Sse.Event>({
_tag: "Event",
event: "ping",
id: "1",
data: "hello"
}).pipe(
Stream.pipeThroughChannel(Sse.encode()),
Stream.runCollect
)
// => ["id: 1\nevent: ping\ndata: hello\n\n"]

Channel that schema-encodes domain values into the untagged SSE event shape, transforms them to Event, and writes SSE text. The schema must produce { id?, event, data }.

import { Schema, Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
const schema = Schema.Struct({
id: Schema.UndefinedOr(Schema.String),
event: Schema.String,
data: Schema.String
})
const text = Stream.make({ id: undefined, event: "message", data: "hi" }).pipe(
Stream.pipeThroughChannel(Sse.encodeSchema(schema)),
Stream.runCollect
)
// => ["data: hi\n\n"] // "message" event name is omitted

Creates a stateful, imperative parser. Call feed(chunk) with text; each parsed Event or Retry is passed to your callback. Call reset() to clear buffered state. The building block behind decode.

import { Sse } from "effect/unstable/encoding"
const events: Array<Sse.AnyEvent> = []
const parser = Sse.makeParser((event) => events.push(event))
parser.feed("event: ping\n")
parser.feed("data: hi\n\n")
// events => [{ _tag: "Event", id: undefined, event: "ping", data: "hi" }]
parser.reset() // clears buffered partial-line state

The interface returned by makeParser: { feed(chunk: string): void; reset(): void }.

import { Sse } from "effect/unstable/encoding"
const parser: Sse.Parser = Sse.makeParser(() => {})
// parser.feed("..."), parser.reset()

The interface for rendering an AnyEvent (an Event or Retry) as SSE text: { write(event: AnyEvent): string }.

import { Sse } from "effect/unstable/encoding"
const myEncoder: Sse.Encoder = {
write: (event) => (event._tag === "Event" ? `data: ${event.data}\n\n` : "")
}

The default Encoder implementation. Renders Event values as id / event / data lines (omitting event: for the message default) and Retry values as retry: directives.

import { Sse } from "effect/unstable/encoding"
Sse.encoder.write({ _tag: "Event", event: "message", id: undefined, data: "hi" })
// => "data: hi\n\n"
Sse.encoder.write({ _tag: "Event", event: "ping", id: "7", data: "a\nb" })
// => "id: 7\nevent: ping\ndata: a\ndata: b\n\n" // multiline data split across lines

The tagged model for an SSE message: { _tag: "Event"; event: string; id: string | undefined; data: string }. Sse.Event is both the TypeScript interface and the matching Schema.Struct (with _tag: Schema.tag("Event")).

import { Schema } from "effect"
import { Sse } from "effect/unstable/encoding"
const value: Sse.Event = {
_tag: "Event",
event: "message",
id: undefined,
data: "hello"
}
// Sse.Event is also a Schema:
Schema.decodeUnknownSync(Sse.Event)({
_tag: "Event",
event: "message",
id: undefined,
data: "hi"
})
// => { _tag: "Event", event: "message", id: undefined, data: "hi" }

The untagged SSE payload shape ({ event; id; data }), available both as a TypeScript interface and as a Schema.Struct. decodeSchema decodes events through this shape; .fields is reused by decodeDataSchema.

import { Schema } from "effect"
import { Sse } from "effect/unstable/encoding"
// the schema used internally to validate raw SSE fields:
Schema.decodeUnknownSync(Sse.EventEncoded)({
id: undefined,
event: "ping",
data: "x"
})
// => { id: undefined, event: "ping", data: "x" }

A SchemaTransformation from the untagged { id?, event, data } shape to the tagged Event model. Used internally by encodeSchema; available for custom schema pipelines.

import { Schema } from "effect"
import { Sse } from "effect/unstable/encoding"
// decode untagged -> tagged Event, encode tagged -> untagged
const toEvent = Sse.Event.pipe(Schema.decodeTo(Sse.EventEncoded, Sse.transformEvent))
// => Schema bridging EventEncoded <-> Event

Data.TaggedClass("Retry") with { duration: Duration; lastEventId: string | undefined }. Decoders emit it as a failure to request reconnection; encoders serialize it as a retry: line. Provides Retry.is (a type guard) and Retry.filter (separates retries from events, returning a Result).

import { Duration } from "effect"
import { Sse } from "effect/unstable/encoding"
const retry = new Sse.Retry({
duration: Duration.millis(3000),
lastEventId: "42"
})
Sse.Retry.is(retry) // => true
Sse.encoder.write(retry) // => "retry: 3000\n\n"

When decoding, a retry: line fails the channel with Retry, which you can catch to reconnect:

import { Stream } from "effect"
import { Sse } from "effect/unstable/encoding"
const handled = Stream.make("retry: 5000\n\n").pipe(
Stream.pipeThroughChannel(Sse.decode()),
Stream.catchIf(Sse.Retry.is, (r) =>
Stream.succeed({ reconnectAfterMs: r.duration })),
Stream.runCollect
)
// => [{ reconnectAfterMs: Duration(5000ms) }]

The union Event | Retry — every value an Encoder can render and a Parser callback can receive.

import { Sse } from "effect/unstable/encoding"
const render = (event: Sse.AnyEvent): string => Sse.encoder.write(event)
// accepts both Event and Retry values