Skip to content

Encoding

Streams of bytes or text rarely arrive as the values you actually want to work with — they arrive as NDJSON lines, MessagePack frames, or a socket’s raw chunks. The Ndjson and Msgpack modules (under effect/unstable/encoding) provide Channels that decode raw input into structured values and encode values back out. You splice a channel into a stream with Stream.pipeThroughChannel, so decoding/encoding becomes just another stage in the pipeline.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
// Raw NDJSON text — one JSON object per line — as it might arrive from a file
// or socket.
const raw = Stream.make(
'{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"start"}\n' +
'{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"oops"}\n'
)
// `Ndjson.decodeString()` is a Channel that splits incoming strings on newlines
// and `JSON.parse`s each line. Splice it in with `Stream.pipeThroughChannel`.
export const decoded = raw.pipe(
Stream.pipeThroughChannel(Ndjson.decodeString()),
Stream.runCollect
)
// => [{ timestamp, level, message }, { ... }]

A raw JSON.parse gives you unknown. To get a validated, typed value — and to apply transformations such as parsing an ISO timestamp into a DateTime.Utc — use the *SchemaString variants. They decode each line, parse the JSON, and validate it against a Schema in a single channel.

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
// `DateTimeUtcFromString` decodes the ISO-8601 string into a `DateTime.Utc`.
class LogEntry extends Schema.Class<LogEntry>("LogEntry")({
timestamp: Schema.DateTimeUtcFromString,
level: Schema.Literals(["info", "warn", "error"]),
message: Schema.String
}) {}
const raw = Stream.make(
'{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"start"}\n' +
'{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"oops"}\n'
)
// Each emitted value is a fully-validated `LogEntry`, with `timestamp` already a
// `DateTime.Utc`. A line that fails validation surfaces as an `NdjsonError`.
export const decodedTyped = raw.pipe(
Stream.pipeThroughChannel(Ndjson.decodeSchemaString(LogEntry)()),
Stream.runCollect
)

The encode channels go the other way — values in, NDJSON lines out. Ndjson.encodeString() serialises whatever you give it; encodeSchemaString runs each value through the schema first, applying any encoding transformations (such as formatting the DateTime back to an ISO string).

import { DateTime, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
declare const LogEntry: typeof import("./log-entry.js").LogEntry
const entries = Stream.make(
new LogEntry({
timestamp: DateTime.makeUnsafe("2025-06-01T00:00:00Z"),
level: "info",
message: "start"
})
)
// Encode each `LogEntry` through the schema, then serialise to an NDJSON line.
// The resulting stream emits ready-to-write strings.
export const encoded = entries.pipe(
Stream.pipeThroughChannel(Ndjson.encodeSchemaString(LogEntry)()),
Stream.runCollect
)

For binary I/O (TCP sockets, file descriptors) use the non-string variants: Ndjson.decode() consumes Uint8Array chunks and Ndjson.encode() produces them, handling text encoding internally. Msgpack works identically — swap Ndjson for Msgpack and use Msgpack.decode() / Msgpack.encode() (and their Schema variants) for the compact binary format.

Because decode and encode are both just channels, the common decode → transform → re-encode shape reads as one linear pipeline:

import { Schema, Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
declare const LogEntry: typeof import("./log-entry.js").LogEntry
const ndjsonInput = Stream.make(
'{"timestamp":"2025-06-01T00:00:00Z","level":"info","message":"ok"}\n' +
'{"timestamp":"2025-06-01T00:00:01Z","level":"error","message":"fail"}\n' +
'{"timestamp":"2025-06-01T00:00:02Z","level":"warn","message":"slow"}\n'
)
export const errorsOnly = ndjsonInput.pipe(
// 1. Decode each line into a validated LogEntry.
Stream.pipeThroughChannel(Ndjson.decodeSchemaString(LogEntry)()),
// 2. Keep only error-level entries (ordinary stream transform).
Stream.filter((entry: InstanceType<typeof LogEntry>) => entry.level === "error"),
// 3. Re-encode the survivors back to NDJSON strings.
Stream.pipeThroughChannel(Ndjson.encodeSchemaString(LogEntry)()),
Stream.runCollect
)

A malformed line raises Ndjson.NdjsonError, whose kind field is "Unpack" for decode failures and "Pack" for encode failures, with the underlying exception in cause. Catch it with Stream.catchTag (see Error handling):

import { Stream } from "effect"
import { Ndjson } from "effect/unstable/encoding"
export const recovered = Stream.make("not-valid-json\n").pipe(
Stream.pipeThroughChannel(Ndjson.decodeString()),
Stream.catchTag("NdjsonError", (err) =>
Stream.succeed({ recovered: true, kind: err.kind })
),
Stream.runCollect
)

NDJSON files often contain blank lines; pass { ignoreEmptyLines: true } to decodeString to skip them instead of raising an error.