Encoding
Streams of bytes or text rarely arrive as the values you actually want to work
with — they arrive as NDJSON lines, MessagePack frames, or a socket’s raw chunks.
The Ndjson and Msgpack modules (under effect/unstable/encoding) provide
Channels that decode raw input into structured values and encode values back
out. You splice a channel into a stream with Stream.pipeThroughChannel, so
decoding/encoding becomes just another stage in the pipeline.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
// Raw NDJSON text — one JSON object per line — as it might arrive from a file// or socket.const raw = Stream.make( '{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"start"}\n' + '{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"oops"}\n')
// `Ndjson.decodeString()` is a Channel that splits incoming strings on newlines// and `JSON.parse`s each line. Splice it in with `Stream.pipeThroughChannel`.export const decoded = raw.pipe( Stream.pipeThroughChannel(Ndjson.decodeString()), Stream.runCollect)// => [{ timestamp, level, message }, { ... }]Decoding with schema validation
Section titled “Decoding with schema validation”A raw JSON.parse gives you unknown. To get a validated, typed value —
and to apply transformations such as parsing an ISO timestamp into a
DateTime.Utc — use the *SchemaString variants. They decode each line, parse
the JSON, and validate it against a Schema in a single channel.
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
// `DateTimeUtcFromString` decodes the ISO-8601 string into a `DateTime.Utc`.class LogEntry extends Schema.Class<LogEntry>("LogEntry")({ timestamp: Schema.DateTimeUtcFromString, level: Schema.Literals(["info", "warn", "error"]), message: Schema.String}) {}
const raw = Stream.make( '{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"start"}\n' + '{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"oops"}\n')
// Each emitted value is a fully-validated `LogEntry`, with `timestamp` already a// `DateTime.Utc`. A line that fails validation surfaces as an `NdjsonError`.export const decodedTyped = raw.pipe( Stream.pipeThroughChannel(Ndjson.decodeSchemaString(LogEntry)()), Stream.runCollect)Encoding values back out
Section titled “Encoding values back out”The encode channels go the other way — values in, NDJSON lines out.
Ndjson.encodeString() serialises whatever you give it; encodeSchemaString
runs each value through the schema first, applying any encoding transformations
(such as formatting the DateTime back to an ISO string).
import { DateTime, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
declare const LogEntry: typeof import("./log-entry.js").LogEntry
const entries = Stream.make( new LogEntry({ timestamp: DateTime.makeUnsafe("2025-06-01T00:00:00Z"), level: "info", message: "start" }))
// Encode each `LogEntry` through the schema, then serialise to an NDJSON line.// The resulting stream emits ready-to-write strings.export const encoded = entries.pipe( Stream.pipeThroughChannel(Ndjson.encodeSchemaString(LogEntry)()), Stream.runCollect)For binary I/O (TCP sockets, file descriptors) use the non-string variants:
Ndjson.decode() consumes Uint8Array chunks and Ndjson.encode() produces
them, handling text encoding internally. Msgpack works identically — swap
Ndjson for Msgpack and use Msgpack.decode() / Msgpack.encode() (and their
Schema variants) for the compact binary format.
A realistic pipeline
Section titled “A realistic pipeline”Because decode and encode are both just channels, the common decode → transform → re-encode shape reads as one linear pipeline:
import { Schema, Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
declare const LogEntry: typeof import("./log-entry.js").LogEntry
const ndjsonInput = Stream.make( '{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"ok"}\n' + '{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"fail"}\n' + '{"timestamp":"2025-06-01T00:00:02Z","level":"warn","message":"slow"}\n')
export const errorsOnly = ndjsonInput.pipe( // 1. Decode each line into a validated LogEntry. Stream.pipeThroughChannel(Ndjson.decodeSchemaString(LogEntry)()), // 2. Keep only error-level entries (ordinary stream transform). Stream.filter((entry: InstanceType<typeof LogEntry>) => entry.level === "error"), // 3. Re-encode the survivors back to NDJSON strings. Stream.pipeThroughChannel(Ndjson.encodeSchemaString(LogEntry)()), Stream.runCollect)Handling decode errors
Section titled “Handling decode errors”A malformed line raises Ndjson.NdjsonError, whose kind field is "Unpack"
for decode failures and "Pack" for encode failures, with the underlying
exception in cause. Catch it with Stream.catchTag (see
Error handling):
import { Stream } from "effect"import { Ndjson } from "effect/unstable/encoding"
export const recovered = Stream.make("not-valid-json\n").pipe( Stream.pipeThroughChannel(Ndjson.decodeString()), Stream.catchTag("NdjsonError", (err) => Stream.succeed({ recovered: true, kind: err.kind }) ), Stream.runCollect)NDJSON files often contain blank lines; pass { ignoreEmptyLines: true } to
decodeString to skip them instead of raising an error.