Workers
The effect/unstable/workers modules move work onto a separate thread — a
browser Web Worker, a Node worker_thread, a Bun worker, a MessagePort, or
even a child process — while keeping Effect’s typed errors and interruption
intact. The same building blocks describe both sides of the boundary: the
main thread that sends input and consumes output, and the worker side
that runs a handler and replies.
These modules are deliberately low-level. They define a platform-neutral
protocol and the services a runtime adapter must implement; they do not
ship a high-level worker pool or a ready-to-use spawner. A platform package
(for example a NodeWorker adapter) wires the runtime-specific
postMessage/listen logic into a WorkerPlatform and
provides the Spawner that actually creates threads.
import { Effect } from "effect"import { Worker, WorkerError } from "effect/unstable/workers"
// `WorkerPlatform` is the main-thread service. A platform adapter provides it// together with a `Spawner` that knows how to create the native worker.const program = Effect.gen(function* () { const platform = yield* Worker.WorkerPlatform
// Spawn an Effect-based worker client for worker id 0. Input messages are // typed `I`, output messages `O`. const worker = yield* platform.spawn<string, number>(0)
// `send` queues a value to the worker. Buffered until `run` observes the // ready signal, so the worker MUST be running for messages to leave. yield* worker.send(21)
// `run` is long-lived: it installs the handler and keeps the message loop // alive (returns `Effect<never, ...>`), owning the port's cleanup scope. yield* worker.run((output: string) => Effect.log(`worker said: ${output}`) )}).pipe( // Worker failures surface as a typed `WorkerError`. Effect.catch((error: WorkerError.WorkerError) => Effect.logError(`worker failed: ${error.message}`) ))The model
Section titled “The model”Communication is split across two services that mirror each other:
| Side | Service | Yields | Role |
|---|---|---|---|
| Main thread | Worker.WorkerPlatform | Worker | send input, run a handler over output |
| Main thread | Worker.Spawner | SpawnerFn | locate/create the native worker for an id |
| Worker side | WorkerRunner.WorkerRunnerPlatform | WorkerRunner | receive requests, send replies |
send and run are intentionally separate. Calling send before the worker
is ready does not fail — the value is buffered and flushed once run
observes the platform ready signal. This means a worker you never run will
silently never deliver its buffered messages.
Under the hood both sides speak a tiny tagged protocol so the platform adapter can distinguish control frames from data frames:
- Client (
Worker.PlatformMessage):[0]= ready,[1, payload]= data. - Runner (
WorkerRunner.PlatformMessage):[0, payload]= request,[1]= close a port.
Incoming messages are handled concurrently inside run’s fiber set. If you
need ordering or back pressure, add a queue, semaphore, or protocol
acknowledgement yourself.
Constructing the platform
Section titled “Constructing the platform”You only build a platform when writing a runtime adapter. There are two entry points depending on how much the runtime already gives you.
If the adapter already exposes the low-level send/run protocol (i.e. it can
hand you raw PlatformMessage values), wrap it with
makeUnsafe:
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
const worker = Worker.makeUnsafe({ send: (message, _transfers) => Effect.void, // postMessage([0, message], transfers) run: (handler) => // Deliver `[0]` for ready, `[1, payload]` for each data frame. handler([1, 42]).pipe(Effect.as(undefined as never))})For most adapters you instead implement setup (create the port, scope its
cleanup) and listen (wire native events to emit) and let
makePlatform assemble the buffering, ready-handshake, and
scope management:
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
// `W` is the native worker type produced by the Spawner (e.g. a Web Worker).interface NativeWorker { postMessage(m: unknown, t?: unknown): void}
const platform = Worker.makePlatform<NativeWorker>()({ setup: ({ worker, scope }) => // Return the port object exposing `postMessage`; register cleanup on scope. Effect.succeed(worker), listen: ({ port, emit, deferred, scope }) => // Translate native message/error events into `emit(platformMessage)` / // `Deferred.fail(deferred, new WorkerError(...))`. Effect.void})The native-worker lookup is provided separately via the Spawner
service, typically with layerSpawner:
import { Layer } from "effect"import { Worker } from "effect/unstable/workers"
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner( (id: number) => new Worker(new URL("./my-worker.ts", import.meta.url)))Transferring values
Section titled “Transferring values”Worker messages always go through structured clone. For large payloads or
MessagePorts you can instead transfer the backing resource so ownership
moves to the receiver and no copy is made. The Transferable
module integrates this with Schema: a wrapped schema reports its transferable
resources to a Collector during encoding, and the worker
platform passes the collected list as the postMessage transfer list.
import { Effect, Schema } from "effect"import { Transferable } from "effect/unstable/workers"
// A message whose `payload` field carries a Uint8Array by transfer, not copy.const Message = Schema.Struct({ id: Schema.Number, payload: Transferable.Uint8Array})
const encodeWithTransfers = Effect.gen(function* () { // Provide a fresh collector around each encode. const collector = yield* Transferable.makeCollector
const encoded = yield* Schema.encode(Message)({ id: 1, payload: new Uint8Array([1, 2, 3]) }).pipe(Effect.provideService(Transferable.Collector, collector))
// Read the resources to hand to `postMessage(encoded, transfers)`. const transfers = collector.readUnsafe() // => [ArrayBuffer] — the Uint8Array's backing buffer return { encoded, transfers }})Reference
Section titled “Reference”Worker (main thread)
Section titled “Worker (main thread)”From effect/unstable/workers (Worker). The client side: send input, run a
handler over output.
WorkerPlatform
Section titled “WorkerPlatform”Service that spawns Effect-based Worker clients for a numeric
worker id. spawn requires a Spawner in context and fails with
WorkerError.
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
Effect.gen(function* () { const platform = yield* Worker.WorkerPlatform const worker = yield* platform.spawn<number, number>(0) // => Worker<number, number>})Worker
Section titled “Worker”The Effect-based worker abstraction. send(message, transfers?) queues input
(buffered until ready); run(handler, { onSpawn? }) is long-lived, returns
Effect<never, E | WorkerError, R>, and runs onSpawn when the worker reports
readiness.
import { Effect } from "effect"import type { Worker } from "effect/unstable/workers"
declare const worker: Worker.Worker<string, number>
const useWorker = Effect.gen(function* () { yield* worker.send(10) // => void (queued) yield* worker.run((out) => Effect.log(out), { onSpawn: Effect.log("worker ready") }) // never returns normally — keeps the message loop alive})makeUnsafe
Section titled “makeUnsafe”Wraps platform-specific send/run functions into a Worker,
translating the ready frame ([0]) to the optional onSpawn effect and data
frames ([1, payload]) to the handler.
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
const worker = Worker.makeUnsafe({ send: () => Effect.void, run: (handler) => handler([1, "data"]).pipe(Effect.as(undefined as never))})// => Worker<any, any>Worker.PlatformMessage
Section titled “Worker.PlatformMessage”The client-side wire protocol carried between the worker and the platform
adapter: [0] signals readiness, [1, payload] carries data.
import type { Worker } from "effect/unstable/workers"
const ready: Worker.PlatformMessage = [0] // worker is readyconst data: Worker.PlatformMessage = [1, { result: 42 }] // a data frameSpawner
Section titled “Spawner”Service tag holding the SpawnerFn that creates or locates a
native worker for a given id. Required by WorkerPlatform.spawn.
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
Effect.gen(function* () { const spawn = yield* Worker.Spawner const native = spawn(0) // => the native worker for id 0})SpawnerFn
Section titled “SpawnerFn”The function type behind Spawner: (id: number) => W, where W
is the runtime’s native worker type.
import type { Worker } from "effect/unstable/workers"
const spawner: Worker.SpawnerFn<{ id: number }> = (id) => ({ id })// => { id: 0 } for spawner(0)layerSpawner
Section titled “layerSpawner”Builds a Layer that provides the Spawner service from a
SpawnerFn.
import { Layer } from "effect"import { Worker } from "effect/unstable/workers"
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner( (id: number) => new Worker(new URL(`./w${id}.ts`, import.meta.url)))// => Layer<Spawner>makePlatform
Section titled “makePlatform”Curried constructor (makePlatform<W>()(options)) that turns runtime setup
and listen hooks into a WorkerPlatform service value.
It buffers sent messages until the ready handshake completes and scopes port
cleanup to each run. setup returns the port (any object with
postMessage); listen wires native events to emit and fails the provided
Deferred with a WorkerError on transport failure.
import { Effect } from "effect"import { Worker } from "effect/unstable/workers"
const platform = Worker.makePlatform<{ postMessage: (m: unknown) => void }>()({ setup: ({ worker }) => Effect.succeed(worker), listen: () => Effect.void})// => WorkerPlatform["Service"]WorkerRunner (worker side)
Section titled “WorkerRunner (worker side)”From effect/unstable/workers (WorkerRunner). The server side that runs
inside the worker-like runtime.
WorkerRunnerPlatform
Section titled “WorkerRunnerPlatform”Service that starts a platform-specific WorkerRunner for the
current runtime. start() fails with WorkerError.
import { Effect } from "effect"import { WorkerRunner } from "effect/unstable/workers"
Effect.gen(function* () { const platform = yield* WorkerRunner.WorkerRunnerPlatform const runner = yield* platform.start<number, string>() // => WorkerRunner<number, string>})WorkerRunner
Section titled “WorkerRunner”The runner abstraction inside the worker. run(handler) attaches a handler
keyed by portId (returning an Effect or void); send / sendUnsafe reply
to a port; the optional disconnects queue emits port ids on disconnect.
import { Effect } from "effect"import type { WorkerRunner } from "effect/unstable/workers"
declare const runner: WorkerRunner.WorkerRunner<number, string>
const serve = Effect.gen(function* () { yield* runner.run((portId, input) => // Parse the input and send the doubled number back to the originating port. runner.send(portId, Number(input) * 2) ) // => Effect<void, WorkerError, R>})WorkerRunner.PlatformMessage
Section titled “WorkerRunner.PlatformMessage”The runner-side wire protocol: [0, payload] delivers a request, [1] closes
a port.
import type { WorkerRunner } from "effect/unstable/workers"
const request: WorkerRunner.PlatformMessage<string> = [0, "hello"] // a requestconst close: WorkerRunner.PlatformMessage<string> = [1] // close the portTransferable
Section titled “Transferable”From effect/unstable/workers (Transferable). Schema-integrated transfer-list
collection.
Collector
Section titled “Collector”Service that accumulates Transferable objects during message encoding.
Exposes effectful and Unsafe variants of addAll, read, and clear.
import { Effect } from "effect"import { Transferable } from "effect/unstable/workers"
Effect.gen(function* () { const collector = yield* Transferable.Collector yield* collector.addAll([new ArrayBuffer(8)]) const items = yield* collector.read // => [ArrayBuffer(8)]})makeCollectorUnsafe
Section titled “makeCollectorUnsafe”Creates a fresh mutable Collector service synchronously. Use
when you are outside an Effect.
import { Transferable } from "effect/unstable/workers"
const collector = Transferable.makeCollectorUnsafe()collector.addAllUnsafe([new ArrayBuffer(4)])collector.readUnsafe() // => [ArrayBuffer(4)]collector.clearUnsafe() // => [ArrayBuffer(4)], collector now emptymakeCollector
Section titled “makeCollector”Effect that creates a fresh Collector service. Provide one
around each message encode.
import { Effect } from "effect"import { Transferable } from "effect/unstable/workers"
const program = Effect.gen(function* () { const collector = yield* Transferable.makeCollector // => Collector service})addAll
Section titled “addAll”Adds transferables to the Collector if one is present in
context; a no-op otherwise. Useful inside custom schema getters.
import { Effect } from "effect"import { Transferable } from "effect/unstable/workers"
const eff = Transferable.addAll([new ArrayBuffer(16)])// => Effect<void> — records into the ambient Collector, or does nothinggetterAddAll
Section titled “getterAddAll”Creates a SchemaGetter that records transferables derived from a value into
the ambient collector while passing the value through unchanged. The primitive
behind schema.
import { Transferable } from "effect/unstable/workers"
const getter = Transferable.getterAddAll((buf: ArrayBuffer) => [buf])// => Getter<ArrayBuffer, ArrayBuffer> that also collects `buf`Transferable
Section titled “Transferable”The schema interface returned by schema: a decodeTo wrapper whose
encode path records transferables while preserving the wrapped schema’s decoded
type.
import type { Schema } from "effect"import type { Transferable } from "effect/unstable/workers"
// Transferable<S> is the type of a schema wrapped with `Transferable.schema`.declare const t: Transferable.Transferable<Schema.Uint8Array>schema
Section titled “schema”Wraps a schema so encoding selects transferables from the encoded value and
records them in the ambient Collector. Dual: pipeable or
data-first.
import { Effect, Schema } from "effect"import { Transferable } from "effect/unstable/workers"
// Transfer the backing ArrayBuffer of any encoded Uint8Array field.const Bytes = Transferable.schema( Schema.Uint8Array, (u8) => [u8.buffer])
Effect.gen(function* () { const collector = yield* Transferable.makeCollector yield* Schema.encode(Bytes)(new Uint8Array([1, 2])).pipe( Effect.provideService(Transferable.Collector, collector) ) collector.readUnsafe() // => [ArrayBuffer] (the u8.buffer)})ImageData
Section titled “ImageData”Built-in Transferable schema for ImageData, transferring
its data.buffer (the pixel buffer).
import { Transferable } from "effect/unstable/workers"
const Img = Transferable.ImageData// encoding an ImageData collects [imageData.data.buffer]MessagePort
Section titled “MessagePort”Built-in Transferable schema for MessagePort, transferring
the port itself.
import { Transferable } from "effect/unstable/workers"
const Port = Transferable.MessagePort// encoding a MessagePort collects [port]Uint8Array
Section titled “Uint8Array”Built-in Transferable schema for Uint8Array, transferring
its backing ArrayBuffer.
import { Transferable } from "effect/unstable/workers"
const Bytes = Transferable.Uint8Array// encoding a Uint8Array collects [u8.buffer]WorkerError
Section titled “WorkerError”From effect/unstable/workers (WorkerError). The typed error model shared by
both sides. The outer WorkerError wraps one tagged
WorkerErrorReason and delegates message and cause
to it.
WorkerError
Section titled “WorkerError”The error exposed in worker effect error channels. Construct from a reason;
message and cause are forwarded from that reason. Built with
Schema.ErrorClass, so it is also a schema.
import { WorkerError } from "effect/unstable/workers"
const err = new WorkerError.WorkerError({ reason: new WorkerError.WorkerSendError({ message: "Failed to send message to worker", cause: new Error("DataCloneError") })})err.message // => "Failed to send message to worker"err._tag // => "WorkerError"WorkerErrorReason
Section titled “WorkerErrorReason”Union of the four tagged reasons, and a Schema.Union of their schemas for
encode/decode of structured diagnostics.
import { WorkerError } from "effect/unstable/workers"
// The schema union, e.g. for decoding a reason from another thread.const ReasonSchema = WorkerError.WorkerErrorReason// => Schema.Union<[WorkerSpawnError, WorkerSendError, WorkerReceiveError, WorkerUnknownError]>WorkerSpawnError
Section titled “WorkerSpawnError”Reason for failures while spawning or setting up a worker — often means the runner code is not executing in the expected worker context.
import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerSpawnError({ message: "Worker failed to start"})reason._tag // => "WorkerSpawnError"WorkerSendError
Section titled “WorkerSendError”Reason for failures while sending a message — frequently a payload that is invalid for the structured-clone or transfer-list rules.
import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerSendError({ message: "Failed to send message to worker"})reason._tag // => "WorkerSendError"WorkerReceiveError
Section titled “WorkerReceiveError”Reason for failures while receiving or handling an inbound message — inbound
event errors, messageerror, or worker exit.
import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerReceiveError({ message: "messageerror event"})reason._tag // => "WorkerReceiveError"WorkerUnknownError
Section titled “WorkerUnknownError”Reason for an unclassified worker failure that does not fit another case.
import { WorkerError } from "effect/unstable/workers"
const reason = new WorkerError.WorkerUnknownError({ message: "Unexpected worker failure"})reason._tag // => "WorkerUnknownError"isWorkerError
Section titled “isWorkerError”Refinement returning true when a value is a WorkerError.
import { WorkerError } from "effect/unstable/workers"
const e = new WorkerError.WorkerError({ reason: new WorkerError.WorkerUnknownError({ message: "boom" })})WorkerError.isWorkerError(e) // => trueWorkerError.isWorkerError(new Error("x")) // => falseTypeId
Section titled “TypeId”The branding identifier ("~effect/workers/WorkerError") used by
isWorkerError.
import type { WorkerError } from "effect/unstable/workers"
// WorkerError.TypeId is the literal type used to brand WorkerError values.type Id = WorkerError.TypeId