Skip to content

The Workflow Engine

A Workflow value is just a description. To actually run it you need a WorkflowEngine — the runtime boundary that registers workflow handlers, starts or observes executions by a stable execution ID, drives activities, durable deferreds, and durable clocks, and persists everything in between.

Effect ships two engines:

  • WorkflowEngine.layerMemory — an in-memory engine for tests and local development. State lives in process memory and is lost when the process exits.
  • ClusterWorkflowEngine.layer — a durable, production engine that persists every step as cluster entity messages (see Production: the cluster engine).

Both expose the exact same typed WorkflowEngine service, so the workflow code you write does not change between development and production — you only swap the layer.

The typical flow has three parts:

  1. Define a workflow with Workflow.make.

  2. Register its handler by building a layer with workflow.toLayer(handler). Provide that layer together with a WorkflowEngine layer.

  3. Run it by calling workflow.execute(payload) inside an Effect.gen, then Effect.runPromise the whole thing.

import { Effect, Layer, Schema } from "effect"
import { Activity, Workflow, WorkflowEngine } from "effect/unstable/workflow"
// 1. Define the workflow. `idempotencyKey` turns the payload into the stable
// execution ID: executing twice with the same key observes the SAME run.
const SendEmail = Workflow.make({
name: "SendEmail",
payload: { to: Schema.String, subject: Schema.String },
success: Schema.String,
idempotencyKey: (payload) => `${payload.to}-${payload.subject}`
})
// 2. Register the handler. Each step that should survive a crash/replay is an
// Activity, identified by name so its result is memoized across retries.
const SendEmailLayer = SendEmail.toLayer(
Effect.fn("SendEmail")(function* (payload) {
const messageId = yield* Activity.make({
name: "deliver",
success: Schema.String,
execute: Effect.gen(function* () {
yield* Effect.log(`Delivering to ${payload.to}`)
return `msg_${payload.to}`
})
})
return messageId
})
)
// Compose the engine + the registered workflow into one layer.
const AppLayer = SendEmailLayer.pipe(
Layer.provideMerge(WorkflowEngine.layerMemory)
)
// 3. Run it. `execute` derives the executionId, starts (or observes) the run,
// and resolves with the decoded success value.
const program = Effect.gen(function* () {
const result = yield* SendEmail.execute({
to: "ada@example.com",
subject: "Welcome"
})
yield* Effect.log(`Workflow finished: ${result}`)
// => Workflow finished: msg_ada@example.com
})
Effect.runPromise(program.pipe(Effect.provide(AppLayer)))

Pass { discard: true } to start a workflow without waiting for its result. The effect resolves immediately with the execution ID (a string) instead of the success value, which you can later use with poll, interrupt, or resume.

const program = Effect.gen(function* () {
const executionId = yield* SendEmail.execute(
{ to: "ada@example.com", subject: "Welcome" },
{ discard: true }
)
// => "a1b2c3..." (the stable execution id)
return executionId
})

Understanding the lifecycle explains why workflows are durable rather than just retried.

  • Registration. workflow.toLayer(handler) calls engine.register when the layer is built. This wires the handler into the engine so the engine can start or resume the workflow later, even after a restart.

  • Execution by stable ID. workflow.execute(payload) computes a deterministic executionId from idempotencyKey, then calls engine.execute. Reusing an execution ID observes the existing execution instead of starting duplicate work — this is the core idempotency guarantee. Activities are likewise keyed by name and attempt, so a replay reuses already-completed activity results.

  • Suspension and retry. When a workflow blocks on a durable deferred, a child workflow, or a durable clock, it suspends (the Suspended result) rather than holding a fiber open. The engine retries suspended executions on the suspendedRetrySchedule (default: exponential backoff starting at 200ms, capped at a 30s spaced retry) until they can make progress.

  • Resume. External completions — a deferred being fulfilled, a clock firing, or a manual workflow.resume(executionId) — re-run the handler from the start; memoized activity and deferred results let it fast-forward to where it left off.

  • Interruption. workflow.interrupt(executionId) stops a run while honoring compensation finalizers (Workflow.withCompensation) and cleaning up child workflows. The engine’s lower-level interruptUnsafe stops work more directly but may bypass compensation and orphan child workflows — prefer interrupt whenever cleanup matters.

For durable execution, use ClusterWorkflowEngine from effect/unstable/cluster/ClusterWorkflowEngine. It implements the same WorkflowEngine service on top of cluster sharding and message storage, so every execution, activity, deferred completion, resume, interrupt, and durable clock wake-up is a persisted cluster entity message keyed by workflow name and execution ID.

import { ClusterWorkflowEngine } from "effect/unstable/cluster/ClusterWorkflowEngine"
// Provides a durable WorkflowEngine; requires Sharding + MessageStorage.
const EngineLayer = ClusterWorkflowEngine.layer

It exports:

  • ClusterWorkflowEngine.make — an Effect that builds the engine, requiring Sharding and MessageStorage.
  • ClusterWorkflowEngine.layer — a Layer<WorkflowEngine, never, Sharding | MessageStorage> that provides the engine and registers the durable-clock entity used for wake-ups.

Durable clock wake-ups and deferred completions are routed through the workflow’s shard group (derived from the execution ID), so a workflow’s continuations land on the shard responsible for it. Because the entity address is derived from the workflow name and execution ID, those must stay stable across deploys — changing them changes where state is persisted.

WorkflowEngine is a Context.Service whose methods form the contract every engine implementation must satisfy. Most users never call these directly — they go through Workflow, Activity, DurableDeferred, and DurableClock, which talk to the engine for you. The methods are listed here so you understand the contract (and so backend authors know what to implement).

import { WorkflowEngine } from "effect/unstable/workflow"
// The service tag itself is `WorkflowEngine.WorkflowEngine`;
// `layerMemory`, `makeUnsafe`, and the `WorkflowInstance` service
// live on the same namespace.

Registers a workflow definition together with its execute handler, scoped to the provided Scope. Invoked for you by workflow.toLayer(...); you rarely call it directly.

// Used internally by `workflow.toLayer(handler)`:
const layer = SendEmail.toLayer(handler) // => Layer that calls engine.register

Starts a new execution or observes an existing one for the given executionId, returning the success value (or the execution ID when discard is true). Accepts an optional suspendedRetrySchedule. Called for you by workflow.execute(payload).

const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
return yield* engine.execute(SendEmail, {
executionId: "send-ada-welcome",
payload: { to: "ada@example.com", subject: "Welcome" }
})
// => "msg_ada@example.com"
})

Returns the current Workflow.Result for an execution as an OptionNone if the execution is unknown or has not produced a result yet, otherwise Some(Complete | Suspended). Non-blocking.

import { Option } from "effect"
const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
const result = yield* engine.poll(SendEmail, "send-ada-welcome")
if (Option.isSome(result) && result.value._tag === "Complete") {
// result.value.exit is the Exit<Success, Error>
}
// => Option.none() while still running / never started
})

Requests interruption of an execution, running compensation finalizers and cleaning up child workflows. The safe way to stop a workflow. Surfaced as workflow.interrupt(executionId).

const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
yield* engine.interrupt(SendEmail, "send-ada-welcome")
// => void (compensation + child cleanup honored)
})

Interrupts an execution more directly, potentially skipping compensation finalizers and orphaning child workflows. Use only when you knowingly accept that tradeoff.

const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
yield* engine.interruptUnsafe(SendEmail, "send-ada-welcome")
// => void (may bypass compensation/child cleanup)
})

Re-drives a suspended execution so it can make progress (e.g. after you know an external condition is now satisfied). Surfaced as workflow.resume(executionId).

const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
yield* engine.resume(SendEmail, "send-ada-welcome")
// => void
})

Runs a single activity attempt and returns its Workflow.Result, decoding the stored exit. Engine-internal: called by Activity when you yield* an activity inside a workflow handler — you do not call it yourself.

// Triggered for you by yielding an Activity in a handler:
const messageId = yield* myActivity // engine.activityExecute(myActivity, attempt)

Tries to read the persisted result of a DurableDeferred, returning Option<Exit>. Engine-internal, used by DurableDeferred await logic; requires a WorkflowInstance in context.

// Used by DurableDeferred.await — not called directly.

Persists the result (an Exit) of a DurableDeferred and resumes any workflows waiting on it. Backs DurableDeferred.done / succeed / fail.

import { Exit } from "effect"
const program = Effect.gen(function* () {
const engine = yield* WorkflowEngine.WorkflowEngine
yield* engine.deferredDone(myDeferred, {
workflowName: "SendEmail",
executionId: "send-ada-welcome",
deferredName: myDeferred.name,
exit: Exit.succeed("ok")
})
// => void (waiting workflow resumes)
})

Schedules a future wake-up for a DurableClock, completing the clock’s deferred when the duration elapses. Backs DurableClock sleeps; engine-internal.

// Used by DurableClock — the engine wakes the workflow after `clock.duration`.

WorkflowInstance is a Context.Service holding the per-execution runtime state for a single run. It is provided to handlers by the engine; you read it when writing low-level engine integrations or helpers like Workflow.suspend.

Its fields:

  • executionId — the stable ID of this run.
  • workflow — the Workflow definition being executed.
  • scope — a Scope.Closeable representing the workflow’s lifetime; closed only when the workflow completes.
  • suspended — whether the workflow has requested suspension.
  • interrupted — whether interruption has been requested.
  • cause — the stored failure cause when SuspendOnFailure triggered.
  • activityState{ count, latch }, used to coordinate in-flight activities.

Builds a fresh WorkflowInstance (new scope, flags cleared, zeroed activity state) for a workflow and execution ID. Used by engine implementations to seed each run.

import { WorkflowEngine } from "effect/unstable/workflow"
const instance = WorkflowEngine.WorkflowInstance.initial(
SendEmail,
"send-ada-welcome"
)
// => { executionId: "send-ada-welcome", suspended: false, interrupted: false, ... }

The two shipped engines are both built on a single lower-level constructor. If you are integrating a new persistence backend, this is your surface.

Wraps a low-level Encoded implementation into a fully typed WorkflowEngine service, adding schema decoding/encoding, parent-child interruption linking, and the suspended-retry loop. “Unsafe” because the Encoded implementation is trusted to correctly persist, resume, and encode state.

import { WorkflowEngine } from "effect/unstable/workflow"
const engine = WorkflowEngine.makeUnsafe({
register: (workflow, execute) => {
/* persist registration */
return Effect.void as any
},
execute: (workflow, options) => {
/* start or observe execution by options.executionId */
return Effect.void as any
}
// ...remaining Encoded methods
})
// => a typed WorkflowEngine["Service"]

The Encoded interface is the persistence-integration contract — the same operations as WorkflowEngine, but over encoded payloads/results (so the backend stays schema-agnostic):

MethodResponsibility
registerStore the workflow + encoded execute handler, scoped to a Scope.
executeStart or observe an execution by executionId; with discard, return void, otherwise the encoded Workflow.Result. Receives the optional parent instance.
pollReturn the current encoded Workflow.Result as Option.
interruptRequest interruption, honoring compensation and child cleanup.
interruptUnsafeStop work directly; may bypass compensation/child cleanup.
resumeRe-drive a suspended execution.
activityExecuteExecute one activity attempt, returning an encoded Workflow.Result.
deferredResultRead a DurableDeferred’s persisted encoded Exit, if any.
deferredDonePersist a deferred’s encoded Exit and resume waiters.
scheduleClockSchedule a DurableClock wake-up after clock.duration.

A Layer<WorkflowEngine> providing the in-memory engine described above. No dependencies, no durability — ideal for tests and local iteration.

import { WorkflowEngine } from "effect/unstable/workflow"
const layer = WorkflowEngine.layerMemory
// => Layer.Layer<WorkflowEngine> (state in process memory only)