# Workers

The `effect/unstable/workers` modules move work onto a **separate thread** — a
browser Web Worker, a Node `worker_thread`, a Bun worker, a `MessagePort`, or
even a child process — while keeping Effect's typed errors and interruption
intact. The same building blocks describe both sides of the boundary: the
**main thread** that sends input and consumes output, and the **worker side**
that runs a handler and replies.

These modules are deliberately **low-level**. They define a platform-neutral
protocol and the services a runtime adapter must implement; they do **not**
ship a high-level worker pool or a ready-to-use spawner. A platform package
(for example a `NodeWorker` adapter) wires the runtime-specific
`postMessage`/listen logic into a [`WorkerPlatform`](#workerplatform) and
provides the [`Spawner`](#spawner) that actually creates threads.
**Caution:** The `effect/unstable/workers` modules are **unstable**: // `WorkerPlatform` is the main-thread service. A platform adapter provides it
// together with a `Spawner` that knows how to create the native worker.
const program = Effect.gen(function* () {
  const platform = yield* Worker.WorkerPlatform

  // Spawn an Effect-based worker client for worker id 0. Input messages are
  // typed `I`, output messages `O`.
  const worker = yield* platform.spawn<string, number>(0)

  // `send` queues a value to the worker. Buffered until `run` observes the
  // ready signal, so the worker MUST be running for messages to leave.
  yield* worker.send(21)

  // `run` is long-lived: it installs the handler and keeps the message loop
  // alive (returns `Effect<never, ...>`), owning the port's cleanup scope.
  yield* worker.run((output: string) =>
    Effect.log(`worker said: ${output}`)
  )
}).pipe(
  // Worker failures surface as a typed `WorkerError`.
  Effect.catch((error: WorkerError.WorkerError) =>
    Effect.logError(`worker failed: ${error.message}`)
  )
)
```

## The model

Communication is split across two services that mirror each other:

| Side | Service | Yields | Role |
| --- | --- | --- | --- |
| Main thread | [`Worker.WorkerPlatform`](#workerplatform) | [`Worker`](#worker) | send input, run a handler over output |
| Main thread | [`Worker.Spawner`](#spawner) | `SpawnerFn` | locate/create the native worker for an id |
| Worker side | [`WorkerRunner.WorkerRunnerPlatform`](#workerrunnerplatform) | [`WorkerRunner`](#workerrunner) | receive requests, send replies |

`send` and `run` are intentionally separate. Calling `send` before the worker
is ready does **not** fail — the value is buffered and flushed once `run`
observes the platform ready signal. This means **a worker you never `run` will
silently never deliver its buffered messages**.

Under the hood both sides speak a tiny tagged protocol so the platform adapter
can distinguish control frames from data frames:

- Client ([`Worker.PlatformMessage`](#workerplatformmessage)): `[0]` = ready,
  `[1, payload]` = data.
- Runner ([`WorkerRunner.PlatformMessage`](#workerrunnerplatformmessage)):
  `[0, payload]` = request, `[1]` = close a port.

Incoming messages are handled **concurrently** inside `run`'s fiber set. If you
need ordering or back pressure, add a queue, semaphore, or protocol
acknowledgement yourself.

## Constructing the platform

You only build a platform when writing a runtime adapter. There are two entry
points depending on how much the runtime already gives you.

If the adapter already exposes the low-level send/run protocol (i.e. it can
hand you raw [`PlatformMessage`](#workerplatformmessage) values), wrap it with
[`makeUnsafe`](#makeunsafe):

```ts
const worker = Worker.makeUnsafe({
  send: (message, _transfers) => Effect.void, // postMessage([0, message], transfers)
  run: (handler) =>
    // Deliver `[0]` for ready, `[1, payload]` for each data frame.
    handler([1, 42]).pipe(Effect.as(undefined as never))
})
```

For most adapters you instead implement `setup` (create the port, scope its
cleanup) and `listen` (wire native events to `emit`) and let
[`makePlatform`](#makeplatform) assemble the buffering, ready-handshake, and
scope management:

```ts
// `W` is the native worker type produced by the Spawner (e.g. a Web Worker).
interface NativeWorker {
  postMessage(m: unknown, t?: unknown): void
}

const platform = Worker.makePlatform<NativeWorker>()({
  setup: ({ worker, scope }) =>
    // Return the port object exposing `postMessage`; register cleanup on scope.
    Effect.succeed(worker),
  listen: ({ port, emit, deferred, scope }) =>
    // Translate native message/error events into `emit(platformMessage)` /
    // `Deferred.fail(deferred, new WorkerError(...))`.
    Effect.void
})
```

The native-worker lookup is provided separately via the [`Spawner`](#spawner)
service, typically with [`layerSpawner`](#layerspawner):

```ts
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner(
  (id: number) => new Worker(new URL("./my-worker.ts", import.meta.url))
)
```

## Transferring values

Worker messages always go through structured clone. For large payloads or
`MessagePort`s you can instead **transfer** the backing resource so ownership
moves to the receiver and no copy is made. The [`Transferable`](#transferable)
module integrates this with `Schema`: a wrapped schema reports its transferable
resources to a [`Collector`](#collector) during encoding, and the worker
platform passes the collected list as the `postMessage` transfer list.

```ts
// A message whose `payload` field carries a Uint8Array by transfer, not copy.
const Message = Schema.Struct({
  id: Schema.Number,
  payload: Transferable.Uint8Array
})

const encodeWithTransfers = Effect.gen(function* () {
  // Provide a fresh collector around each encode.
  const collector = yield* Transferable.makeCollector

  const encoded = yield* Schema.encode(Message)({
    id: 1,
    payload: new Uint8Array([1, 2, 3])
  }).pipe(Effect.provideService(Transferable.Collector, collector))

  // Read the resources to hand to `postMessage(encoded, transfers)`.
  const transfers = collector.readUnsafe()
  // => [ArrayBuffer] — the Uint8Array's backing buffer
  return { encoded, transfers }
})
```

<Aside type="caution">
Transfer annotations do not make a value structured-cloneable — they only avoid
the copy. After `postMessage`, transferred buffers are **detached** from the
sender. Collecting a typed array's `.buffer` transfers the whole backing buffer,
including bytes used by other views.

---

# Reference

## Worker (main thread)

From `effect/unstable/workers` (`Worker`). The client side: send input, run a
handler over output.

### WorkerPlatform

Service that spawns Effect-based [`Worker`](#worker) clients for a numeric
worker id. `spawn` requires a [`Spawner`](#spawner) in context and fails with
[`WorkerError`](#workererror).

```ts
Effect.gen(function* () {
  const platform = yield* Worker.WorkerPlatform
  const worker = yield* platform.spawn<number, number>(0)
  // => Worker<number, number>
})
```

### Worker

The Effect-based worker abstraction. `send(message, transfers?)` queues input
(buffered until ready); `run(handler, { onSpawn? })` is long-lived, returns
`Effect<never, E | WorkerError, R>`, and runs `onSpawn` when the worker reports
readiness.

```ts
declare const worker: Worker.Worker<string, number>

const useWorker = Effect.gen(function* () {
  yield* worker.send(10) // => void (queued)
  yield* worker.run((out) => Effect.log(out), {
    onSpawn: Effect.log("worker ready")
  })
  // never returns normally — keeps the message loop alive
})
```

### makeUnsafe

Wraps platform-specific `send`/`run` functions into a [`Worker`](#worker),
translating the ready frame (`[0]`) to the optional `onSpawn` effect and data
frames (`[1, payload]`) to the handler.

```ts
const worker = Worker.makeUnsafe({
  send: () => Effect.void,
  run: (handler) => handler([1, "data"]).pipe(Effect.as(undefined as never))
})
// => Worker<any, any>
```

### Worker.PlatformMessage

The client-side wire protocol carried between the worker and the platform
adapter: `[0]` signals readiness, `[1, payload]` carries data.

```ts
const ready: Worker.PlatformMessage = [0] // worker is ready
const data: Worker.PlatformMessage = [1, { result: 42 }] // a data frame
```

### Spawner

Service tag holding the [`SpawnerFn`](#spawnerfn) that creates or locates a
native worker for a given id. Required by `WorkerPlatform.spawn`.

```ts
Effect.gen(function* () {
  const spawn = yield* Worker.Spawner
  const native = spawn(0) // => the native worker for id 0
})
```

### SpawnerFn

The function type behind [`Spawner`](#spawner): `(id: number) => W`, where `W`
is the runtime's native worker type.

```ts
const spawner: Worker.SpawnerFn<{ id: number }> = (id) => ({ id })
// => { id: 0 } for spawner(0)
```

### layerSpawner

Builds a `Layer` that provides the [`Spawner`](#spawner) service from a
[`SpawnerFn`](#spawnerfn).

```ts
const SpawnerLive: Layer.Layer<Worker.Spawner> = Worker.layerSpawner(
  (id: number) => new Worker(new URL(`./w${id}.ts`, import.meta.url))
)
// => Layer<Spawner>
```

### makePlatform

Curried constructor (`makePlatform<W>()(options)`) that turns runtime `setup`
and `listen` hooks into a [`WorkerPlatform`](#workerplatform) service value.
It buffers sent messages until the ready handshake completes and scopes port
cleanup to each `run`. `setup` returns the port (any object with
`postMessage`); `listen` wires native events to `emit` and fails the provided
`Deferred` with a [`WorkerError`](#workererror) on transport failure.

```ts
const platform = Worker.makePlatform<{ postMessage: (m: unknown) => void }>()({
  setup: ({ worker }) => Effect.succeed(worker),
  listen: () => Effect.void
})
// => WorkerPlatform["Service"]
```

## WorkerRunner (worker side)

From `effect/unstable/workers` (`WorkerRunner`). The server side that runs
**inside** the worker-like runtime.

### WorkerRunnerPlatform

Service that starts a platform-specific [`WorkerRunner`](#workerrunner) for the
current runtime. `start()` fails with [`WorkerError`](#workererror).

```ts
Effect.gen(function* () {
  const platform = yield* WorkerRunner.WorkerRunnerPlatform
  const runner = yield* platform.start<number, string>()
  // => WorkerRunner<number, string>
})
```

### WorkerRunner

The runner abstraction inside the worker. `run(handler)` attaches a handler
keyed by `portId` (returning an Effect or `void`); `send` / `sendUnsafe` reply
to a port; the optional `disconnects` queue emits port ids on disconnect.

```ts
declare const runner: WorkerRunner.WorkerRunner<number, string>

const serve = Effect.gen(function* () {
  yield* runner.run((portId, input) =>
    // Parse the input and send the doubled number back to the originating port.
    runner.send(portId, Number(input) * 2)
  )
  // => Effect<void, WorkerError, R>
})
```

### WorkerRunner.PlatformMessage

The runner-side wire protocol: `[0, payload]` delivers a request, `[1]` closes
a port.

```ts
const request: WorkerRunner.PlatformMessage<string> = [0, "hello"] // a request
const close: WorkerRunner.PlatformMessage<string> = [1] // close the port
```

## Transferable

From `effect/unstable/workers` (`Transferable`). Schema-integrated transfer-list
collection.

### Collector

Service that accumulates `Transferable` objects during message encoding.
Exposes effectful and `Unsafe` variants of `addAll`, `read`, and `clear`.

```ts
Effect.gen(function* () {
  const collector = yield* Transferable.Collector
  yield* collector.addAll([new ArrayBuffer(8)])
  const items = yield* collector.read
  // => [ArrayBuffer(8)]
})
```

### makeCollectorUnsafe

Creates a fresh mutable [`Collector`](#collector) service synchronously. Use
when you are outside an Effect.

```ts
const collector = Transferable.makeCollectorUnsafe()
collector.addAllUnsafe([new ArrayBuffer(4)])
collector.readUnsafe() // => [ArrayBuffer(4)]
collector.clearUnsafe() // => [ArrayBuffer(4)], collector now empty
```

### makeCollector

Effect that creates a fresh [`Collector`](#collector) service. Provide one
around each message encode.

```ts
const program = Effect.gen(function* () {
  const collector = yield* Transferable.makeCollector
  // => Collector service
})
```

### addAll

Adds transferables to the [`Collector`](#collector) **if one is present** in
context; a no-op otherwise. Useful inside custom schema getters.

```ts
const eff = Transferable.addAll([new ArrayBuffer(16)])
// => Effect<void> — records into the ambient Collector, or does nothing
```

### getterAddAll

Creates a `SchemaGetter` that records transferables derived from a value into
the ambient collector while passing the value through unchanged. The primitive
behind [`schema`](#schema).

```ts
const getter = Transferable.getterAddAll((buf: ArrayBuffer) => [buf])
// => Getter<ArrayBuffer, ArrayBuffer> that also collects `buf`
```

### Transferable

The schema interface returned by [`schema`](#schema): a `decodeTo` wrapper whose
encode path records transferables while preserving the wrapped schema's decoded
type.

```ts
// Transferable<S> is the type of a schema wrapped with `Transferable.schema`.
declare const t: Transferable.Transferable<Schema.Uint8Array>
```

### schema

Wraps a schema so encoding selects transferables from the **encoded** value and
records them in the ambient [`Collector`](#collector). Dual: pipeable or
data-first.

```ts
// Transfer the backing ArrayBuffer of any encoded Uint8Array field.
const Bytes = Transferable.schema(
  Schema.Uint8Array,
  (u8) => [u8.buffer]
)

Effect.gen(function* () {
  const collector = yield* Transferable.makeCollector
  yield* Schema.encode(Bytes)(new Uint8Array([1, 2])).pipe(
    Effect.provideService(Transferable.Collector, collector)
  )
  collector.readUnsafe() // => [ArrayBuffer] (the u8.buffer)
})
```

### ImageData

Built-in [`Transferable`](#transferable) schema for `ImageData`, transferring
its `data.buffer` (the pixel buffer).

```ts
const Img = Transferable.ImageData
// encoding an ImageData collects [imageData.data.buffer]
```

### MessagePort

Built-in [`Transferable`](#transferable) schema for `MessagePort`, transferring
the port itself.

```ts
const Port = Transferable.MessagePort
// encoding a MessagePort collects [port]
```

### Uint8Array

Built-in [`Transferable`](#transferable) schema for `Uint8Array`, transferring
its backing `ArrayBuffer`.

```ts
const Bytes = Transferable.Uint8Array
// encoding a Uint8Array collects [u8.buffer]
```

## WorkerError

From `effect/unstable/workers` (`WorkerError`). The typed error model shared by
both sides. The outer [`WorkerError`](#workererror) wraps one tagged
[`WorkerErrorReason`](#workererrorreason) and delegates `message` and `cause`
to it.

### WorkerError

The error exposed in worker effect error channels. Construct from a `reason`;
`message` and `cause` are forwarded from that reason. Built with
`Schema.ErrorClass`, so it is also a schema.

```ts
const err = new WorkerError.WorkerError({
  reason: new WorkerError.WorkerSendError({
    message: "Failed to send message to worker",
    cause: new Error("DataCloneError")
  })
})
err.message // => "Failed to send message to worker"
err._tag // => "WorkerError"
```

### WorkerErrorReason

Union of the four tagged reasons, and a `Schema.Union` of their schemas for
encode/decode of structured diagnostics.

```ts
// The schema union, e.g. for decoding a reason from another thread.
const ReasonSchema = WorkerError.WorkerErrorReason
// => Schema.Union<[WorkerSpawnError, WorkerSendError, WorkerReceiveError, WorkerUnknownError]>
```

### WorkerSpawnError

Reason for failures while spawning or setting up a worker — often means the
runner code is not executing in the expected worker context.

```ts
const reason = new WorkerError.WorkerSpawnError({
  message: "Worker failed to start"
})
reason._tag // => "WorkerSpawnError"
```

### WorkerSendError

Reason for failures while sending a message — frequently a payload that is
invalid for the structured-clone or transfer-list rules.

```ts
const reason = new WorkerError.WorkerSendError({
  message: "Failed to send message to worker"
})
reason._tag // => "WorkerSendError"
```

### WorkerReceiveError

Reason for failures while receiving or handling an inbound message — inbound
event errors, `messageerror`, or worker exit.

```ts
const reason = new WorkerError.WorkerReceiveError({
  message: "messageerror event"
})
reason._tag // => "WorkerReceiveError"
```

### WorkerUnknownError

Reason for an unclassified worker failure that does not fit another case.

```ts
const reason = new WorkerError.WorkerUnknownError({
  message: "Unexpected worker failure"
})
reason._tag // => "WorkerUnknownError"
```

### isWorkerError

Refinement returning `true` when a value is a [`WorkerError`](#workererror).

```ts
const e = new WorkerError.WorkerError({
  reason: new WorkerError.WorkerUnknownError({ message: "boom" })
})
WorkerError.isWorkerError(e) // => true
WorkerError.isWorkerError(new Error("x")) // => false
```

### TypeId

The branding identifier (`"~effect/workers/WorkerError"`) used by
[`isWorkerError`](#isworkererror).

```ts
// WorkerError.TypeId is the literal type used to brand WorkerError values.
type Id = WorkerError.TypeId
```