Skip to content

Sockets

A Socket is a full-duplex byte connection — a WebSocket, a TCP stream, or any transport that lets you read and write frames. Instead of exposing a raw send/onmessage API, the Socket module models a connection as something you turn into a Channel: you feed it a stream of outbound chunks, and it yields a stream of inbound chunks. Because it is a Channel, backpressure, interruption, and resource cleanup are all handled by Effect — when the surrounding scope closes, the connection closes.

A socket exchanges three kinds of values:

  • strings — sent as text data frames.
  • Uint8Array — sent as binary data frames.
  • CloseEvent — requests an orderly close with a code and optional reason.

Incoming data can be consumed as raw frames (runRaw, preserving string vs binary), as bytes (run, encoding text as UTF-8), or as strings (runString, decoding binary with TextDecoder).

makeWebSocket builds a Socket from a URL. It needs a WebSocketConstructor service so it knows how to create the underlying socket; in the browser, or in Node/Bun where WebSocket is a global, provide layerWebSocketConstructorGlobal.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.gen(function*() {
// `makeWebSocket` returns an Effect<Socket, never, WebSocketConstructor>.
// The connection is not opened until you `run` it.
const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org")
// Acquire a scoped writer, send a frame, and print every reply as a string.
yield* Effect.scoped(Effect.gen(function*() {
const write = yield* socket.writer
yield* socket.runString((message) => {
console.log("received:", message)
}, { onOpen: write("hello") })
}))
})
// Provide the constructor (browser / Node 22+ / Bun all have a global WebSocket).
program.pipe(
Effect.provide(Socket.layerWebSocketConstructorGlobal),
Effect.runPromise
)

run/runString/runRaw block (as an Effect) for the lifetime of the connection. They complete when the peer closes cleanly and fail with a SocketError otherwise. The onOpen option lets you run startup writes only once the connection is actually open — writes issued before that are gated until the socket opens.

The Channel adapters are the idiomatic way to drive a socket, because they let you wire a Stream of outbound chunks straight into the connection and consume inbound chunks as a stream. Socket.toChannel produces a binary channel (incoming strings encoded to UTF-8); Socket.toChannelString produces a string channel (incoming bytes decoded).

import { Array, Channel, Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const echo = Effect.gen(function*() {
const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org")
// A channel: outbound = NonEmptyReadonlyArray<Uint8Array | string | CloseEvent>,
// inbound = NonEmptyReadonlyArray<string>.
const channel = Socket.toChannelString<never>(socket)
// Drive it: send one batch of frames, log each decoded reply.
yield* Channel.fromArray([Array.of("ping")]).pipe(
Channel.pipeTo(channel),
Channel.runForEach((messages) =>
Effect.sync(() => console.log("got:", messages))
)
)
})
echo.pipe(Effect.provide(Socket.layerWebSocketConstructorGlobal), Effect.runPromise)

If the Socket is provided as a service rather than constructed inline, use Socket.makeChannel() to pull it straight from the environment, or Socket.makeWebSocketChannel(url) to build a channel from a URL in one step (requiring only WebSocketConstructor).

To adapt a non-WebSocket transport, build a Socket directly with Socket.make from a runRaw read loop and a scoped writer. Higher-level read loops (run, runString) are derived for you. If your transport is expressed as web streams, Socket.fromTransformStream adapts an InputTransformStream (a readable/writable pair).

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
// Adapt a WHATWG TransformStream-style transport.
const socket = Socket.fromTransformStream(
Effect.succeed<Socket.InputTransformStream>({
readable: new ReadableStream<Uint8Array>(),
writable: new WritableStream<Uint8Array>()
})
)

SocketServer is the server side: a service exposing the bound address and a run accept loop that hands every accepted connection to your handler as a Socket. Concrete transports are provided by platform layers — for example NodeSocketServer.layer (TCP) and NodeSocketServer.layerWebSocket from @effect/platform-node.

import { Array, Channel, Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
import * as SocketServer from "effect/unstable/socket/SocketServer"
// Echo server: every connection's inbound stream is piped back out.
const echoServer = Effect.gen(function*() {
const server = yield* SocketServer.SocketServer
// `address` reflects the *bound* address — important when listening on port 0.
if (server.address._tag === "TcpAddress") {
yield* Effect.log(`listening on ${server.address.hostname}:${server.address.port}`)
}
// `run` never returns normally; it loops until interrupted or scope close.
yield* server.run((socket) =>
Effect.gen(function*() {
const write = yield* socket.writer
// Echo each inbound string frame back to the client.
yield* socket.runString((message) => write(message))
})
)
})

The handler receives a generic Socket, so you choose the framing: raw frames, bytes, strings, or a higher-level protocol channel. Connection-level failures surface in the handler; only server open/run failures become SocketServerError.

Every socket failure is wrapped in a single SocketError, whose reason is one of four tagged errors. The most common is SocketCloseError, used to model the close event itself — by default any close code fails the run, but you can treat some codes as clean completion with the closeCodeIsError option.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.gen(function*() {
// Treat code 1000 (normal closure) as a clean finish, not an error.
const socket = yield* Socket.makeWebSocket("wss://echo.websocket.org", {
closeCodeIsError: (code) => code !== 1000
})
yield* socket.runString((m) => console.log(m))
}).pipe(
// `SocketError` is tagged, so `Effect.catchTag` narrows it.
Effect.catchTag("SocketError", (error) =>
Effect.logError(`socket failed: ${error.message}`)
)
)

Because the reason is a discriminated union, you can branch on error.reason._tag to distinguish read, write, open, and close failures.


All of the following are exported from effect/unstable/socket/Socket.

Context.Service tag for a bidirectional socket transport. Yield it to read and write frames through the Effect environment.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const useSocket = Effect.gen(function*() {
const socket = yield* Socket.Socket // => Socket
return socket
})

The connection itself. Members:

  • run(handler, options?) — read loop delivering Uint8Array frames (text frames encoded to UTF-8). Returns Effect<void, SocketError | E, R>.
  • runString(handler, options?) — read loop delivering string frames (binary frames decoded with TextDecoder).
  • runRaw(handler, options?) — read loop delivering string | Uint8Array, preserving the transport’s frame type.
  • writer — a Effect<(chunk: Uint8Array | string | CloseEvent) => Effect<void, SocketError>, never, Scope>; acquire it in scope, then call it to send frames or request a close.

Each run* accepts { onOpen?: Effect<void> } to run an effect once the connection opens (e.g. an initial handshake write).

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const drive = (socket: Socket.Socket) =>
Effect.scoped(Effect.gen(function*() {
const write = yield* socket.writer
yield* socket.run((bytes) => {
console.log("bytes in:", bytes.length)
}, { onOpen: write(new Uint8Array([1, 2, 3])) })
}))

Type guard for Socket values.

import * as Socket from "effect/unstable/socket/Socket"
declare const value: unknown
Socket.isSocket(value) // => boolean

Builds a Socket from a runRaw read loop and a scoped writer. The run (bytes) and runString loops are derived automatically unless you supply your own.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const socket: Socket.Socket = Socket.make({
runRaw: (handler, opts) =>
Effect.gen(function*() {
if (opts?.onOpen) yield* opts.onOpen
handler("frame from somewhere")
}),
writer: Effect.succeed((chunk) => Effect.sync(() => console.log("out:", chunk)))
})

Converts a Socket into a binary channel: inbound frames become NonEmptyReadonlyArray<Uint8Array> (text frames UTF-8 encoded); outbound is NonEmptyReadonlyArray<Uint8Array | string | CloseEvent>.

import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socket
const channel = Socket.toChannel<never>(socket)
// => Channel<NonEmptyReadonlyArray<Uint8Array>, SocketError, ...>

Converts a Socket into a string channel, decoding binary frames with an optional encoding. Dual: call it directly with the socket, or data-last with just the encoding to get a Socket -> Channel function for pipe.

import { pipe } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socket
const utf16 = Socket.toChannelString(socket, "utf-16le")
// => Channel<NonEmptyReadonlyArray<string>, SocketError, ...>
// Data-last form:
const utf16b = pipe(socket, Socket.toChannelString("utf-16le"))

The most general adapter: map each incoming frame (string | Uint8Array) to a value of your choice. toChannel and toChannelString are built on it.

import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socket
// Tag each frame with its byte/char length.
const channel = Socket.toChannelMap(socket, (data) => data.length)
// => Channel<NonEmptyReadonlyArray<number>, SocketError, ...>

Like toChannel but lets you fix the upstream (outbound) error type IE via a type argument, returning a Socket -> Channel function.

import * as Socket from "effect/unstable/socket/Socket"
declare const socket: Socket.Socket
const toCh = Socket.toChannelWith<Error>()
const channel = toCh(socket) // outbound error channel includes Error

Builds a binary channel from the Socket service in the environment — the service-style counterpart of toChannel.

import * as Socket from "effect/unstable/socket/Socket"
const channel = Socket.makeChannel()
// => Channel<..., requires Socket service>

Creates a Socket backed by a WebSocket URL (or an Effect<string> URL), requiring a WebSocketConstructor. Options: protocols, openTimeout, closeCodeIsError.

import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.makeWebSocket("wss://example.com/ws", {
protocols: ["chat"],
openTimeout: "5 seconds"
})
// => Effect<Socket, never, WebSocketConstructor>

Builds a Socket from a scoped Effect that acquires a globalThis.WebSocket. Use it when you need to construct the socket yourself (custom headers, an existing instance) rather than from a URL.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.fromWebSocket(
Effect.acquireRelease(
Effect.sync(() => new WebSocket("wss://example.com")),
(ws) => Effect.sync(() => ws.close(1000))
)
)
// => Effect<Socket, never, never>

One-step binary channel from a WebSocket URL, requiring WebSocketConstructor.

import * as Socket from "effect/unstable/socket/Socket"
const channel = Socket.makeWebSocketChannel("wss://example.com/ws")
// => Channel<NonEmptyReadonlyArray<Uint8Array>, SocketError, ..., WebSocketConstructor>

A Layer providing the Socket service backed by a WebSocket URL. Compose it with layerWebSocketConstructorGlobal.

import { Layer } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const SocketLive = Socket.layerWebSocket("wss://example.com/ws").pipe(
Layer.provide(Socket.layerWebSocketConstructorGlobal)
)
// => Layer<Socket>

Context service holding the active globalThis.WebSocket while a WebSocket-backed run is handling events — useful inside a handler to inspect or control the raw socket.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const readyState = Socket.WebSocket.use((ws) => ws.readyState)
// => Effect<number, never, WebSocket>

Context service for constructing WebSocket instances from a URL and optional protocols. Required by makeWebSocket and friends.

import { Layer } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
// Custom constructor (e.g. the `ws` package in Node).
const layer = Layer.succeed(Socket.WebSocketConstructor)(
(url, protocols) => new WebSocket(url, protocols)
)

Layer providing WebSocketConstructor from globalThis.WebSocket — the right choice in the browser, Bun, and recent Node.

import * as Socket from "effect/unstable/socket/Socket"
Socket.layerWebSocketConstructorGlobal
// => Layer<WebSocketConstructor>

Builds a Socket from a scoped InputTransformStream (a readable/writable pair). Strings written out are UTF-8 encoded; CloseEvent values use close-code classification.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const socket = Socket.fromTransformStream(
Effect.succeed<Socket.InputTransformStream>({
readable: new ReadableStream<Uint8Array>(),
writable: new WritableStream<Uint8Array>()
})
)
// => Effect<Socket, never, never>

The shape fromTransformStream consumes: a readable of bytes/strings and a writable of bytes.

import * as Socket from "effect/unstable/socket/Socket"
declare const pair: Socket.InputTransformStream
pair.readable // ReadableStream<Uint8Array | string>
pair.writable // WritableStream<Uint8Array>

A close request carrying a code (default 1000) and optional reason. Send one through a writer to close the connection.

import * as Socket from "effect/unstable/socket/Socket"
const ev = new Socket.CloseEvent(1001, "going away")
ev.toString() // => "1001: going away"

Type guard for CloseEvent values.

import * as Socket from "effect/unstable/socket/Socket"
Socket.isCloseEvent(new Socket.CloseEvent()) // => true

The default close-code classifier: returns true for every code, so by default all closes fail a run. Override via the closeCodeIsError option.

import * as Socket from "effect/unstable/socket/Socket"
Socket.defaultCloseCodeIsError(1000) // => true

Context.Reference for the outbound send-queue capacity, defaulting to 16.

import { Effect } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
const program = Effect.void.pipe(
Effect.provideService(Socket.SendQueueCapacity, 64)
)

The single tagged error (_tag: "SocketError") that wraps every socket failure; its reason field holds the specific cause and its message mirrors the reason’s message.

import * as Socket from "effect/unstable/socket/Socket"
const err = new Socket.SocketError({
reason: new Socket.SocketReadError({ cause: new Error("boom") })
})
err._tag // => "SocketError"
err.reason._tag // => "SocketReadError"

Reason for a failure while reading from the socket. Carries the underlying cause.

import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketReadError({ cause: new Error("recv failed") }).message
// => "An error occurred during Read"

Reason for a failure while writing to the socket. Carries the underlying cause.

import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketWriteError({ cause: new Error("send failed") }).message
// => "An error occurred during Write"

Reason for a failure while opening, with kind: "Unknown" | "Timeout".

import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketOpenError({ kind: "Timeout", cause: new Error() }).message
// => 'timeout waiting for "open"'

Reason representing a close event, carrying code and optional closeReason. The static SocketCloseError.filterClean(isClean) separates clean closes from real failures and powers the closeCodeIsError option.

import * as Socket from "effect/unstable/socket/Socket"
new Socket.SocketCloseError({ code: 1001, closeReason: "bye" }).message
// => "1001: bye"

Both the runtime Schema.Union and the type-level union of the four reasons above (SocketReadError | SocketWriteError | SocketOpenError | SocketCloseError).

import * as Socket from "effect/unstable/socket/Socket"
Socket.SocketErrorReason // Schema.Union of the four reasons

Type guard for SocketError values. Equivalent to the static SocketError.is.

import * as Socket from "effect/unstable/socket/Socket"
declare const u: unknown
Socket.isSocketError(u) // => boolean

Exported from effect/unstable/socket/SocketServer.

Context.Service exposing the bound address and a run(handler) accept loop. run returns Effect<never, SocketServerError, R> — it stays alive until interrupted or its scope closes. Each accepted connection is passed to the handler as a Socket.

import { Effect } from "effect"
import * as SocketServer from "effect/unstable/socket/SocketServer"
const program = Effect.gen(function*() {
const server = yield* SocketServer.SocketServer
yield* server.run((socket) =>
socket.runString((line) => Effect.log(`recv: ${line}`))
)
})

Tagged error (_tag: "SocketServerError") wrapping a server-level failure reason and exposing its cause.

import * as SocketServer from "effect/unstable/socket/SocketServer"
const err = new SocketServer.SocketServerError({
reason: new SocketServer.SocketServerOpenError({ cause: new Error("EADDRINUSE") })
})
err.message // => "Open"

Reason for a failure while opening/binding the server (e.g. a port in use).

import * as SocketServer from "effect/unstable/socket/SocketServer"
new SocketServer.SocketServerOpenError({ cause: new Error() }).message // => "Open"

Reason for an uncategorized server failure.

import * as SocketServer from "effect/unstable/socket/SocketServer"
new SocketServer.SocketServerUnknownError({ cause: new Error() }).message // => "Unknown"

Type-level union of the two server reasons: SocketServerOpenError | SocketServerUnknownError.

The bound address of a server: either a TcpAddress or a UnixAddress. Discriminate on _tag.

import * as SocketServer from "effect/unstable/socket/SocketServer"
const describe = (a: SocketServer.Address) =>
a._tag === "TcpAddress" ? `${a.hostname}:${a.port}` : a.path

A TCP address: { _tag: "TcpAddress", hostname, port }. Read it from server.address after binding — when you listen on port 0, this reflects the actual ephemeral port.

import * as SocketServer from "effect/unstable/socket/SocketServer"
const addr: SocketServer.TcpAddress = {
_tag: "TcpAddress",
hostname: "127.0.0.1",
port: 0
}

A Unix-domain socket address: { _tag: "UnixAddress", path }.

import * as SocketServer from "effect/unstable/socket/SocketServer"
const addr: SocketServer.UnixAddress = { _tag: "UnixAddress", path: "/tmp/app.sock" }