Sockets
A Socket is a full-duplex byte connection — a WebSocket, a TCP stream, or any
transport that lets you read and write frames. Instead of exposing a raw
send/onmessage API, the Socket module models a connection as something you
turn into a Channel: you feed it a stream of outbound chunks, and it
yields a stream of inbound chunks. Because it is a Channel, backpressure,
interruption, and resource cleanup are all handled by Effect — when the
surrounding scope closes, the connection closes.
A socket exchanges three kinds of values:
- strings — sent as text data frames.
Uint8Array— sent as binary data frames.CloseEvent— requests an orderly close with a code and optional reason.
Incoming data can be consumed as raw frames (runRaw, preserving string vs
binary), as bytes (run, encoding text as UTF-8), or as strings (runString,
decoding binary with TextDecoder).
Connecting a WebSocket client
Section titled “Connecting a WebSocket client”makeWebSocket builds a Socket from a URL. It needs a WebSocketConstructor
service so it knows how to create the underlying socket; in the browser, or in
Node/Bun where WebSocket is a global, provide
layerWebSocketConstructorGlobal.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.gen(function*() { // `makeWebSocket` returns an Effect<Socket, never, WebSocketConstructor>. // The connection is not opened until you `run` it. const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org")
// Acquire a scoped writer, send a frame, and print every reply as a string. yield* Effect.scoped(Effect.gen(function*() { const write = yield* socket.writer yield* socket.runString((message) => { console.log("received:", message) }, { onOpen: write("hello") }) }))})
// Provide the constructor (browser / Node 22+ / Bun all have a global WebSocket).program.pipe( Effect.provide(Socket.layerWebSocketConstructorGlobal), Effect.runPromise)run/runString/runRaw block (as an Effect) for the lifetime of the
connection. They complete when the peer closes cleanly and fail with a
SocketError otherwise. The onOpen option lets you run startup writes only
once the connection is actually open — writes issued before that are gated until
the socket opens.
Sending and receiving via a Channel
Section titled “Sending and receiving via a Channel”The Channel adapters are the idiomatic way to drive a socket, because they let
you wire a Stream of outbound chunks straight into the connection and consume
inbound chunks as a stream. Socket.toChannel produces a binary channel
(incoming strings encoded to UTF-8); Socket.toChannelString produces a string
channel (incoming bytes decoded).
import { Array, Channel, Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const echo = Effect.gen(function*() { const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org")
// A channel: outbound = NonEmptyReadonlyArray<Uint8Array | string | CloseEvent>, // inbound = NonEmptyReadonlyArray<string>. const channel = Socket.toChannelString<never>(socket)
// Drive it: send one batch of frames, log each decoded reply. yield* Channel.fromArray([Array.of("ping")]).pipe( Channel.pipeTo(channel), Channel.runForEach((messages) => Effect.sync(() => console.log("got:", messages)) ) )})
echo.pipe(Effect.provide(Socket.layerWebSocketConstructorGlobal), Effect.runPromise)If the Socket is provided as a service rather than constructed inline, use
Socket.makeChannel() to pull it straight from the environment, or
Socket.makeWebSocketChannel(url) to build a channel from a URL in one step
(requiring only WebSocketConstructor).
Wrapping an existing transport
Section titled “Wrapping an existing transport”To adapt a non-WebSocket transport, build a Socket directly with
Socket.make from a runRaw read loop and a scoped writer. Higher-level
read loops (run, runString) are derived for you. If your transport is
expressed as web streams, Socket.fromTransformStream adapts an
InputTransformStream (a readable/writable pair).
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
// Adapt a WHATWG TransformStream-style transport.const socket = Socket.fromTransformStream( Effect.succeed<Socket.InputTransformStream>({ readable: new ReadableStream<Uint8Array>(), writable: new WritableStream<Uint8Array>() }))Accepting connections with SocketServer
Section titled “Accepting connections with SocketServer”SocketServer is the server side: a service exposing the bound address and a
run accept loop that hands every accepted connection to your handler as a
Socket. Concrete transports are provided by platform layers — for example
NodeSocketServer.layer (TCP) and NodeSocketServer.layerWebSocket from
@effect/platform-node.
import { Array, Channel, Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"import * as SocketServer from "effect/unstable/socket/SocketServer"
// Echo server: every connection's inbound stream is piped back out.const echoServer = Effect.gen(function*() { const server = yield* SocketServer.SocketServer
// `address` reflects the *bound* address — important when listening on port 0. if (server.address._tag === "TcpAddress") { yield* Effect.log(`listening on ${server.address.hostname}:${server.address.port}`) }
// `run` never returns normally; it loops until interrupted or scope close. yield* server.run((socket) => Effect.gen(function*() { const write = yield* socket.writer // Echo each inbound string frame back to the client. yield* socket.runString((message) => write(message)) }) )})The handler receives a generic Socket, so you choose the framing: raw frames,
bytes, strings, or a higher-level protocol channel. Connection-level failures
surface in the handler; only server open/run failures become
SocketServerError.
Error handling
Section titled “Error handling”Every socket failure is wrapped in a single SocketError, whose reason is one
of four tagged errors. The most common is SocketCloseError, used to model the
close event itself — by default any close code fails the run, but you can
treat some codes as clean completion with the closeCodeIsError option.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.gen(function*() { // Treat code 1000 (normal closure) as a clean finish, not an error. const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org", { closeCodeIsError: (code) => code !== 1000 }) yield* socket.runString((m) => console.log(m))}).pipe( // `SocketError` is tagged, so `Effect.catchTag` narrows it. Effect.catchTag("SocketError", (error) => Effect.logError(`socket failed: ${error.message}`) ))Because the reason is a discriminated union, you can branch on error.reason._tag
to distinguish read, write, open, and close failures.
Socket reference
Section titled “Socket reference”All of the following are exported from effect/unstable/socket/Socket.
Socket (service)
Section titled “Socket (service)”Context.Service tag for a bidirectional socket transport. Yield it to read and
write frames through the Effect environment.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const useSocket = Effect.gen(function*() { const socket = yield* Socket.Socket // => Socket return socket})Socket (interface)
Section titled “Socket (interface)”The connection itself. Members:
run(handler, options?)— read loop deliveringUint8Arrayframes (text frames encoded to UTF-8). ReturnsEffect<void, SocketError | E, R>.runString(handler, options?)— read loop deliveringstringframes (binary frames decoded withTextDecoder).runRaw(handler, options?)— read loop deliveringstring | Uint8Array, preserving the transport’s frame type.writer— aEffect<(chunk: Uint8Array | string | CloseEvent) => Effect<void, SocketError>, never, Scope>; acquire it in scope, then call it to send frames or request a close.
Each run* accepts { onOpen?: Effect<void> } to run an effect once the
connection opens (e.g. an initial handshake write).
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const drive = (socket: Socket.Socket) => Effect.scoped(Effect.gen(function*() { const write = yield* socket.writer yield* socket.run((bytes) => { console.log("bytes in:", bytes.length) }, { onOpen: write(new Uint8Array([1, 2, 3])) }) }))isSocket
Section titled “isSocket”Type guard for Socket values.
import * as Socket from "effect/unstable/socket/Socket"
declare const value: unknownSocket.isSocket(value) // => booleanBuilds a Socket from a runRaw read loop and a scoped writer. The run
(bytes) and runString loops are derived automatically unless you supply your
own.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const socket: Socket.Socket = Socket.make({ runRaw: (handler, opts) => Effect.gen(function*() { if (opts?.onOpen) yield* opts.onOpen handler("frame from somewhere") }), writer: Effect.succeed((chunk) => Effect.sync(() => console.log("out:", chunk)))})toChannel
Section titled “toChannel”Converts a Socket into a binary channel: inbound frames become
NonEmptyReadonlyArray<Uint8Array> (text frames UTF-8 encoded); outbound is
NonEmptyReadonlyArray<Uint8Array | string | CloseEvent>.
import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socketconst channel = Socket.toChannel<never>(socket)// => Channel<NonEmptyReadonlyArray<Uint8Array>, SocketError, ...>toChannelString
Section titled “toChannelString”Converts a Socket into a string channel, decoding binary frames with an
optional encoding. Dual: call it directly with the socket, or data-last with
just the encoding to get a Socket -> Channel function for pipe.
import { pipe } from "effect"import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socketconst utf16 = Socket.toChannelString(socket, "utf-16le")// => Channel<NonEmptyReadonlyArray<string>, SocketError, ...>
// Data-last form:const utf16b = pipe(socket, Socket.toChannelString("utf-16le"))toChannelMap
Section titled “toChannelMap”The most general adapter: map each incoming frame (string | Uint8Array) to a
value of your choice. toChannel and toChannelString are built on it.
import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socket// Tag each frame with its byte/char length.const channel = Socket.toChannelMap(socket, (data) => data.length)// => Channel<NonEmptyReadonlyArray<number>, SocketError, ...>toChannelWith
Section titled “toChannelWith”Like toChannel but lets you fix the upstream (outbound) error type IE via a
type argument, returning a Socket -> Channel function.
import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socketconst toCh = Socket.toChannelWith<Error>()const channel = toCh(socket) // outbound error channel includes ErrormakeChannel
Section titled “makeChannel”Builds a binary channel from the Socket service in the environment — the
service-style counterpart of toChannel.
import * as Socket from "effect/unstable/socket/Socket"
const channel = Socket.makeChannel()// => Channel<..., requires Socket service>makeWebSocket
Section titled “makeWebSocket”Creates a Socket backed by a WebSocket URL (or an Effect<string> URL),
requiring a WebSocketConstructor. Options: protocols, openTimeout,
closeCodeIsError.
import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.makeWebSocket("wss://example.com/ws", { protocols: ["chat"], openTimeout: "5 seconds"})// => Effect<Socket, never, WebSocketConstructor>fromWebSocket
Section titled “fromWebSocket”Builds a Socket from a scoped Effect that acquires a globalThis.WebSocket.
Use it when you need to construct the socket yourself (custom headers, an
existing instance) rather than from a URL.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.fromWebSocket( Effect.acquireRelease( Effect.sync(() => new WebSocket("wss://example.com")), (ws) => Effect.sync(() => ws.close(1000)) ))// => Effect<Socket, never, never>makeWebSocketChannel
Section titled “makeWebSocketChannel”One-step binary channel from a WebSocket URL, requiring WebSocketConstructor.
import * as Socket from "effect/unstable/socket/Socket"
const channel = Socket.makeWebSocketChannel("wss://example.com/ws")// => Channel<NonEmptyReadonlyArray<Uint8Array>, SocketError, ..., WebSocketConstructor>layerWebSocket
Section titled “layerWebSocket”A Layer providing the Socket service backed by a WebSocket URL. Compose it
with layerWebSocketConstructorGlobal.
import { Layer } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const SocketLive = Socket.layerWebSocket("wss://example.com/ws").pipe( Layer.provide(Socket.layerWebSocketConstructorGlobal))// => Layer<Socket>WebSocket (service)
Section titled “WebSocket (service)”Context service holding the active globalThis.WebSocket while a
WebSocket-backed run is handling events — useful inside a handler to inspect or
control the raw socket.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const readyState = Socket.WebSocket.use((ws) => ws.readyState)// => Effect<number, never, WebSocket>WebSocketConstructor (service)
Section titled “WebSocketConstructor (service)”Context service for constructing WebSocket instances from a URL and optional
protocols. Required by makeWebSocket and friends.
import { Layer } from "effect"import * as Socket from "effect/unstable/socket/Socket"
// Custom constructor (e.g. the `ws` package in Node).const layer = Layer.succeed(Socket.WebSocketConstructor)( (url, protocols) => new WebSocket(url, protocols))layerWebSocketConstructorGlobal
Section titled “layerWebSocketConstructorGlobal”Layer providing WebSocketConstructor from globalThis.WebSocket — the right
choice in the browser, Bun, and recent Node.
import * as Socket from "effect/unstable/socket/Socket"
Socket.layerWebSocketConstructorGlobal// => Layer<WebSocketConstructor>fromTransformStream
Section titled “fromTransformStream”Builds a Socket from a scoped InputTransformStream (a readable/writable
pair). Strings written out are UTF-8 encoded; CloseEvent values use close-code
classification.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.fromTransformStream( Effect.succeed<Socket.InputTransformStream>({ readable: new ReadableStream<Uint8Array>(), writable: new WritableStream<Uint8Array>() }))// => Effect<Socket, never, never>InputTransformStream
Section titled “InputTransformStream”The shape fromTransformStream consumes: a readable of bytes/strings and a
writable of bytes.
import * as Socket from "effect/unstable/socket/Socket"
declare const pair: Socket.InputTransformStreampair.readable // ReadableStream<Uint8Array | string>pair.writable // WritableStream<Uint8Array>CloseEvent
Section titled “CloseEvent”A close request carrying a code (default 1000) and optional reason. Send
one through a writer to close the connection.
import * as Socket from "effect/unstable/socket/Socket"
const ev = new Socket.CloseEvent(1001, "going away")ev.toString() // => "1001: going away"isCloseEvent
Section titled “isCloseEvent”Type guard for CloseEvent values.
import * as Socket from "effect/unstable/socket/Socket"
Socket.isCloseEvent(new Socket.CloseEvent()) // => truedefaultCloseCodeIsError
Section titled “defaultCloseCodeIsError”The default close-code classifier: returns true for every code, so by default
all closes fail a run. Override via the closeCodeIsError option.
import * as Socket from "effect/unstable/socket/Socket"
Socket.defaultCloseCodeIsError(1000) // => trueSendQueueCapacity
Section titled “SendQueueCapacity”Context.Reference for the outbound send-queue capacity, defaulting to 16.
import { Effect } from "effect"import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.void.pipe( Effect.provideService(Socket.SendQueueCapacity, 64))Socket errors
Section titled “Socket errors”SocketError
Section titled “SocketError”The single tagged error (_tag: "SocketError") that wraps every socket failure;
its reason field holds the specific cause and its message mirrors the
reason’s message.
import * as Socket from "effect/unstable/socket/Socket"
const err = new Socket.SocketError({ reason: new Socket.SocketReadError({ cause: new Error("boom") })})err._tag // => "SocketError"err.reason._tag // => "SocketReadError"SocketReadError
Section titled “SocketReadError”Reason for a failure while reading from the socket. Carries the underlying
cause.
import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketReadError({ cause: new Error("recv failed") }).message// => "An error occurred during Read"SocketWriteError
Section titled “SocketWriteError”Reason for a failure while writing to the socket. Carries the underlying
cause.
import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketWriteError({ cause: new Error("send failed") }).message// => "An error occurred during Write"SocketOpenError
Section titled “SocketOpenError”Reason for a failure while opening, with kind: "Unknown" | "Timeout".
import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketOpenError({ kind: "Timeout", cause: new Error() }).message// => 'timeout waiting for "open"'SocketCloseError
Section titled “SocketCloseError”Reason representing a close event, carrying code and optional closeReason.
The static SocketCloseError.filterClean(isClean) separates clean closes from
real failures and powers the closeCodeIsError option.
import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketCloseError({ code: 1001, closeReason: "bye" }).message// => "1001: bye"SocketErrorReason
Section titled “SocketErrorReason”Both the runtime Schema.Union and the type-level union of the four reasons
above (SocketReadError | SocketWriteError | SocketOpenError | SocketCloseError).
import * as Socket from "effect/unstable/socket/Socket"
Socket.SocketErrorReason // Schema.Union of the four reasonsisSocketError
Section titled “isSocketError”Type guard for SocketError values. Equivalent to the static
SocketError.is.
import * as Socket from "effect/unstable/socket/Socket"
declare const u: unknownSocket.isSocketError(u) // => booleanSocketServer reference
Section titled “SocketServer reference”Exported from effect/unstable/socket/SocketServer.
SocketServer (service)
Section titled “SocketServer (service)”Context.Service exposing the bound address and a run(handler) accept loop.
run returns Effect<never, SocketServerError, R> — it stays alive until
interrupted or its scope closes. Each accepted connection is passed to the
handler as a Socket.
import { Effect } from "effect"import * as SocketServer from "effect/unstable/socket/SocketServer"
const program = Effect.gen(function*() { const server = yield* SocketServer.SocketServer yield* server.run((socket) => socket.runString((line) => Effect.log(`recv: ${line}`)) )})SocketServerError
Section titled “SocketServerError”Tagged error (_tag: "SocketServerError") wrapping a server-level failure
reason and exposing its cause.
import * as SocketServer from "effect/unstable/socket/SocketServer"
const err = new SocketServer.SocketServerError({ reason: new SocketServer.SocketServerOpenError({ cause: new Error("EADDRINUSE") })})err.message // => "Open"SocketServerOpenError
Section titled “SocketServerOpenError”Reason for a failure while opening/binding the server (e.g. a port in use).
import * as SocketServer from "effect/unstable/socket/SocketServer"
new SocketServer.SocketServerOpenError({ cause: new Error() }).message // => "Open"SocketServerUnknownError
Section titled “SocketServerUnknownError”Reason for an uncategorized server failure.
import * as SocketServer from "effect/unstable/socket/SocketServer"
new SocketServer.SocketServerUnknownError({ cause: new Error() }).message // => "Unknown"SocketServerErrorReason
Section titled “SocketServerErrorReason”Type-level union of the two server reasons:
SocketServerOpenError | SocketServerUnknownError.
Address
Section titled “Address”The bound address of a server: either a TcpAddress or a UnixAddress.
Discriminate on _tag.
import * as SocketServer from "effect/unstable/socket/SocketServer"
const describe = (a: SocketServer.Address) => a._tag === "TcpAddress" ? `${a.hostname}:${a.port}` : a.pathTcpAddress
Section titled “TcpAddress”A TCP address: { _tag: "TcpAddress", hostname, port }. Read it from
server.address after binding — when you listen on port 0, this reflects the
actual ephemeral port.
import * as SocketServer from "effect/unstable/socket/SocketServer"
const addr: SocketServer.TcpAddress = { _tag: "TcpAddress", hostname: "127.0.0.1", port: 0}UnixAddress
Section titled “UnixAddress”A Unix-domain socket address: { _tag: "UnixAddress", path }.
import * as SocketServer from "effect/unstable/socket/SocketServer"
const addr: SocketServer.UnixAddress = { _tag: "UnixAddress", path: "/tmp/app.sock" }