Skip to content

Workers

The effect/unstable/workers modules move work onto a separate thread — a browser Web Worker, a Node worker_thread, a Bun worker, a MessagePort, or even a child process — while keeping Effect’s typed errors and interruption intact. The same building blocks describe both sides of the boundary: the main thread that sends input and consumes output, and the worker side that runs a handler and replies.

These modules are deliberately low-level. They define a platform-neutral protocol and the services a runtime adapter must implement; they do not ship a high-level worker pool or a ready-to-use spawner. A platform package (for example a NodeWorker adapter) wires the runtime-specific postMessage/listen logic into a WorkerPlatform and provides the Spawner that actually creates threads.

import { Effect } from "effect"
import { Worker, WorkerError } from "effect/unstable/workers"
// `WorkerPlatform` is the main-thread service. A platform adapter provides it
// together with a `Spawner` that knows how to create the native worker.
const program = Effect.gen(function* () {
const platform = yield* Worker.WorkerPlatform
// Spawn an Effect-based worker client for worker id 0. Input messages are
// typed `I`, output messages `O`.
const worker = yield* platform.spawn<string, number>(0)
// `send` queues a value to the worker. Buffered until `run` observes the
// ready signal, so the worker MUST be running for messages to leave.
yield* worker.send(21)
// `run` is long-lived: it installs the handler and keeps the message loop
// alive (returns `Effect<never, ...>`), owning the port's cleanup scope.
yield* worker.run((output: string) =>
Effect.log(`worker said: ${output}`)
)
}).pipe(
// Worker failures surface as a typed `WorkerError`.
Effect.catch((error: WorkerError.WorkerError) =>
Effect.logError(`worker failed: ${error.message}`)
)
)

Communication is split across two services that mirror each other:

SideServiceYieldsRole
Main threadWorker.WorkerPlatformWorkersend input, run a handler over output
Main threadWorker.SpawnerSpawnerFnlocate/create the native worker for an id
Worker sideWorkerRunner.WorkerRunnerPlatformWorkerRunnerreceive requests, send replies

send and run are intentionally separate. Calling send before the worker is ready does not fail — the value is buffered and flushed once run observes the platform ready signal. This means a worker you never run will silently never deliver its buffered messages.

Under the hood both sides speak a tiny tagged protocol so the platform adapter can distinguish control frames from data frames:

Incoming messages are handled concurrently inside run’s fiber set. If you need ordering or back pressure, add a queue, semaphore, or protocol acknowledgement yourself.

You only build a platform when writing a runtime adapter. There are two entry points depending on how much the runtime already gives you.

If the adapter already exposes the low-level send/run protocol (i.e. it can hand you raw PlatformMessage values), wrap it with makeUnsafe:

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
const worker = Worker.makeUnsafe({
send: (message, _transfers) => Effect.void, // postMessage([0, message], transfers)
run: (handler) =>
// Deliver `[0]` for ready, `[1, payload]` for each data frame.
handler([1, 42]).pipe(Effect.as(undefined as never))
})

For most adapters you instead implement setup (create the port, scope its cleanup) and listen (wire native events to emit) and let makePlatform assemble the buffering, ready-handshake, and scope management:

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
// `W` is the native worker type produced by the Spawner (e.g. a Web Worker).
interface NativeWorker {
postMessage(m: unknown, t?: unknown): void
}
const platform = Worker.makePlatform<NativeWorker>()({
setup: ({ worker, scope }) =>
// Return the port object exposing `postMessage`; register cleanup on scope.
Effect.succeed(worker),
listen: ({ port, emit, deferred, scope }) =>
// Translate native message/error events into `emit(platformMessage)` /
// `Deferred.fail(deferred, new WorkerError(...))`.
Effect.void
})

The native-worker lookup is provided separately via the Spawner service, typically with layerSpawner:

import { Layer } from "effect"
import { Worker } from "effect/unstable/workers"
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner(
(id: number) => new Worker(new URL("./my-worker.ts", import.meta.url))
)

Worker messages always go through structured clone. For large payloads or MessagePorts you can instead transfer the backing resource so ownership moves to the receiver and no copy is made. The Transferable module integrates this with Schema: a wrapped schema reports its transferable resources to a Collector during encoding, and the worker platform passes the collected list as the postMessage transfer list.

import { Effect, Schema } from "effect"
import { Transferable } from "effect/unstable/workers"
// A message whose `payload` field carries a Uint8Array by transfer, not copy.
const Message = Schema.Struct({
id: Schema.Number,
payload: Transferable.Uint8Array
})
const encodeWithTransfers = Effect.gen(function* () {
// Provide a fresh collector around each encode.
const collector = yield* Transferable.makeCollector
const encoded = yield* Schema.encode(Message)({
id: 1,
payload: new Uint8Array([1, 2, 3])
}).pipe(Effect.provideService(Transferable.Collector, collector))
// Read the resources to hand to `postMessage(encoded, transfers)`.
const transfers = collector.readUnsafe()
// => [ArrayBuffer] — the Uint8Array's backing buffer
return { encoded, transfers }
})

From effect/unstable/workers (Worker). The client side: send input, run a handler over output.

Service that spawns Effect-based Worker clients for a numeric worker id. spawn requires a Spawner in context and fails with WorkerError.

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
Effect.gen(function* () {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn<number, number>(0)
// => Worker<number, number>
})

The Effect-based worker abstraction. send(message, transfers?) queues input (buffered until ready); run(handler, { onSpawn? }) is long-lived, returns Effect<never, E | WorkerError, R>, and runs onSpawn when the worker reports readiness.

import { Effect } from "effect"
import type { Worker } from "effect/unstable/workers"
declare const worker: Worker.Worker<string, number>
const useWorker = Effect.gen(function* () {
yield* worker.send(10) // => void (queued)
yield* worker.run((out) => Effect.log(out), {
onSpawn: Effect.log("worker ready")
})
// never returns normally — keeps the message loop alive
})

Wraps platform-specific send/run functions into a Worker, translating the ready frame ([0]) to the optional onSpawn effect and data frames ([1, payload]) to the handler.

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
const worker = Worker.makeUnsafe({
send: () => Effect.void,
run: (handler) => handler([1, "data"]).pipe(Effect.as(undefined as never))
})
// => Worker<any, any>

The client-side wire protocol carried between the worker and the platform adapter: [0] signals readiness, [1, payload] carries data.

import type { Worker } from "effect/unstable/workers"
const ready: Worker.PlatformMessage = [0] // worker is ready
const data: Worker.PlatformMessage = [1, { result: 42 }] // a data frame

Service tag holding the SpawnerFn that creates or locates a native worker for a given id. Required by WorkerPlatform.spawn.

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
Effect.gen(function* () {
const spawn = yield* Worker.Spawner
const native = spawn(0) // => the native worker for id 0
})

The function type behind Spawner: (id: number) => W, where W is the runtime’s native worker type.

import type { Worker } from "effect/unstable/workers"
const spawner: Worker.SpawnerFn<{ id: number }> = (id) => ({ id })
// => { id: 0 } for spawner(0)

Builds a Layer that provides the Spawner service from a SpawnerFn.

import { Layer } from "effect"
import { Worker } from "effect/unstable/workers"
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner(
(id: number) => new Worker(new URL(`./w${id}.ts`, import.meta.url))
)
// => Layer<Spawner>

Curried constructor (makePlatform<W>()(options)) that turns runtime setup and listen hooks into a WorkerPlatform service value. It buffers sent messages until the ready handshake completes and scopes port cleanup to each run. setup returns the port (any object with postMessage); listen wires native events to emit and fails the provided Deferred with a WorkerError on transport failure.

import { Effect } from "effect"
import { Worker } from "effect/unstable/workers"
const platform = Worker.makePlatform<{ postMessage: (m: unknown) => void }>()({
setup: ({ worker }) => Effect.succeed(worker),
listen: () => Effect.void
})
// => WorkerPlatform["Service"]

From effect/unstable/workers (WorkerRunner). The server side that runs inside the worker-like runtime.

Service that starts a platform-specific WorkerRunner for the current runtime. start() fails with WorkerError.

import { Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"
Effect.gen(function* () {
const platform = yield* WorkerRunner.WorkerRunnerPlatform
const runner = yield* platform.start<number, string>()
// => WorkerRunner<number, string>
})

The runner abstraction inside the worker. run(handler) attaches a handler keyed by portId (returning an Effect or void); send / sendUnsafe reply to a port; the optional disconnects queue emits port ids on disconnect.

import { Effect } from "effect"
import type { WorkerRunner } from "effect/unstable/workers"
declare const runner: WorkerRunner.WorkerRunner<number, string>
const serve = Effect.gen(function* () {
yield* runner.run((portId, input) =>
// Parse the input and send the doubled number back to the originating port.
runner.send(portId, Number(input) * 2)
)
// => Effect<void, WorkerError, R>
})

The runner-side wire protocol: [0, payload] delivers a request, [1] closes a port.

import type { WorkerRunner } from "effect/unstable/workers"
const request: WorkerRunner.PlatformMessage<string> = [0, "hello"] // a request
const close: WorkerRunner.PlatformMessage<string> = [1] // close the port

From effect/unstable/workers (Transferable). Schema-integrated transfer-list collection.

Service that accumulates Transferable objects during message encoding. Exposes effectful and Unsafe variants of addAll, read, and clear.

import { Effect } from "effect"
import { Transferable } from "effect/unstable/workers"
Effect.gen(function* () {
const collector = yield* Transferable.Collector
yield* collector.addAll([new ArrayBuffer(8)])
const items = yield* collector.read
// => [ArrayBuffer(8)]
})

Creates a fresh mutable Collector service synchronously. Use when you are outside an Effect.

import { Transferable } from "effect/unstable/workers"
const collector = Transferable.makeCollectorUnsafe()
collector.addAllUnsafe([new ArrayBuffer(4)])
collector.readUnsafe() // => [ArrayBuffer(4)]
collector.clearUnsafe() // => [ArrayBuffer(4)], collector now empty

Effect that creates a fresh Collector service. Provide one around each message encode.

import { Effect } from "effect"
import { Transferable } from "effect/unstable/workers"
const program = Effect.gen(function* () {
const collector = yield* Transferable.makeCollector
// => Collector service
})

Adds transferables to the Collector if one is present in context; a no-op otherwise. Useful inside custom schema getters.

import { Effect } from "effect"
import { Transferable } from "effect/unstable/workers"
const eff = Transferable.addAll([new ArrayBuffer(16)])
// => Effect<void> — records into the ambient Collector, or does nothing

Creates a SchemaGetter that records transferables derived from a value into the ambient collector while passing the value through unchanged. The primitive behind schema.

import { Transferable } from "effect/unstable/workers"
const getter = Transferable.getterAddAll((buf: ArrayBuffer) => [buf])
// => Getter<ArrayBuffer, ArrayBuffer> that also collects `buf`

The schema interface returned by schema: a decodeTo wrapper whose encode path records transferables while preserving the wrapped schema’s decoded type.

import type { Schema } from "effect"
import type { Transferable } from "effect/unstable/workers"
// Transferable<S> is the type of a schema wrapped with `Transferable.schema`.
declare const t: Transferable.Transferable<Schema.Uint8Array>

Wraps a schema so encoding selects transferables from the encoded value and records them in the ambient Collector. Dual: pipeable or data-first.

import { Effect, Schema } from "effect"
import { Transferable } from "effect/unstable/workers"
// Transfer the backing ArrayBuffer of any encoded Uint8Array field.
const Bytes = Transferable.schema(
Schema.Uint8Array,
(u8) => [u8.buffer]
)
Effect.gen(function* () {
const collector = yield* Transferable.makeCollector
yield* Schema.encode(Bytes)(new Uint8Array([1, 2])).pipe(
Effect.provideService(Transferable.Collector, collector)
)
collector.readUnsafe() // => [ArrayBuffer] (the u8.buffer)
})

Built-in Transferable schema for ImageData, transferring its data.buffer (the pixel buffer).

import { Transferable } from "effect/unstable/workers"
const Img = Transferable.ImageData
// encoding an ImageData collects [imageData.data.buffer]

Built-in Transferable schema for MessagePort, transferring the port itself.

import { Transferable } from "effect/unstable/workers"
const Port = Transferable.MessagePort
// encoding a MessagePort collects [port]

Built-in Transferable schema for Uint8Array, transferring its backing ArrayBuffer.

import { Transferable } from "effect/unstable/workers"
const Bytes = Transferable.Uint8Array
// encoding a Uint8Array collects [u8.buffer]

From effect/unstable/workers (WorkerError). The typed error model shared by both sides. The outer WorkerError wraps one tagged WorkerErrorReason and delegates message and cause to it.

The error exposed in worker effect error channels. Construct from a reason; message and cause are forwarded from that reason. Built with Schema.ErrorClass, so it is also a schema.

import { WorkerError } from "effect/unstable/workers"
const err = new WorkerError.WorkerError({
reason: new WorkerError.WorkerSendError({
message: "Failed to send message to worker",
cause: new Error("DataCloneError")
})
})
err.message // => "Failed to send message to worker"
err._tag // => "WorkerError"

Union of the four tagged reasons, and a Schema.Union of their schemas for encode/decode of structured diagnostics.

import { WorkerError } from "effect/unstable/workers"
// The schema union, e.g. for decoding a reason from another thread.
const ReasonSchema = WorkerError.WorkerErrorReason
// => Schema.Union<[WorkerSpawnError, WorkerSendError, WorkerReceiveError, WorkerUnknownError]>

Reason for failures while spawning or setting up a worker — often means the runner code is not executing in the expected worker context.

import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerSpawnError({
message: "Worker failed to start"
})
reason._tag // => "WorkerSpawnError"

Reason for failures while sending a message — frequently a payload that is invalid for the structured-clone or transfer-list rules.

import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerSendError({
message: "Failed to send message to worker"
})
reason._tag // => "WorkerSendError"

Reason for failures while receiving or handling an inbound message — inbound event errors, messageerror, or worker exit.

import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerReceiveError({
message: "messageerror event"
})
reason._tag // => "WorkerReceiveError"

Reason for an unclassified worker failure that does not fit another case.

import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerUnknownError({
message: "Unexpected worker failure"
})
reason._tag // => "WorkerUnknownError"

Refinement returning true when a value is a WorkerError.

import { WorkerError } from "effect/unstable/workers"
const e = new WorkerError.WorkerError({
reason: new WorkerError.WorkerUnknownError({ message: "boom" })
})
WorkerError.isWorkerError(e) // => true
WorkerError.isWorkerError(new Error("x")) // => false

The branding identifier ("~effect/workers/WorkerError") used by isWorkerError.

import type { WorkerError } from "effect/unstable/workers"
// WorkerError.TypeId is the literal type used to brand WorkerError values.
type Id = WorkerError.TypeId