Workflows & Activities
A workflow is a typed, durable description of a long-running business process.
It bundles a stable name, a struct payload schema, success/error schemas,
and an idempotencyKey function that derives a deterministic execution ID from
the payload. The actual side effects live inside activities — named,
schema-backed effects that the engine can persist and replay instead of rerunning.
Because a WorkflowEngine persists, suspends, and resumes execution state, the
golden rule is: keep external side effects at activity boundaries, and keep
names and schemas stable across deployments so persisted results can be replayed.
The modules on this page are unstable and imported from
effect/unstable/workflow. To run a workflow you also need an engine — see
Engine & Execution — and the durable
building blocks (DurableClock, DurableDeferred, DurableQueue) are covered in
Durable Primitives.
Common case
Section titled “Common case”1. Define a workflow
Section titled “1. Define a workflow”Workflow.make takes a stable name, the payload as struct fields (or a struct
schema), and an idempotencyKey that turns the payload into a stable string.
success defaults to Schema.Void and error defaults to Schema.Never.
import { Schema } from "effect";import { Workflow } from "effect/unstable/workflow";
const SendEmailWorkflow = Workflow.make({ name: "SendEmailWorkflow", // Payload fields — encoded/decoded with these schemas across the durable boundary. payload: { userId: Schema.String, template: Schema.String }, // The success value the workflow resolves to (defaults to Schema.Void). success: Schema.String, // The typed failure (defaults to Schema.Never). error: Schema.String, // Derive a STABLE id for this logical request. Same payload => same execution. idempotencyKey: ({ userId, template }) => `${userId}:${template}`});2. Implement it with toLayer
Section titled “2. Implement it with toLayer”The handler is registered with MyWorkflow.toLayer. Write the handler with
Effect.fnUntraced (not a plain function returning Effect.gen); it receives the
decoded payload and the executionId. Inside the handler you yield activities,
durable clocks, and durable deferreds.
import { Effect, Schema } from "effect";import { Activity } from "effect/unstable/workflow";
const SendEmailWorkflowLayer = SendEmailWorkflow.toLayer( Effect.fnUntraced(function* (payload, executionId) { // Side effects belong in activities so they can be persisted and replayed. const address = yield* Activity.make({ name: "LookupAddress", success: Schema.String, execute: Effect.succeed(`${payload.userId}@example.com`) });
const messageId = yield* Activity.make({ name: "SendEmail", success: Schema.String, execute: Effect.sync(() => `sent ${payload.template} to ${address}`) });
return messageId; }));3. Execute it
Section titled “3. Execute it”MyWorkflow.execute(payload) runs the workflow and returns the success value.
Passing { discard: true } returns the executionId string instead (fire and
forget). Execution requires the WorkflowEngine plus any schema encoding/decoding
services your payload and result schemas need.
import { Effect, Layer } from "effect";import { WorkflowEngine } from "effect/unstable/workflow";
const program = Effect.gen(function* () { // Returns Success["Type"] — here, a string. const messageId = yield* SendEmailWorkflow.execute({ userId: "u_123", template: "welcome" }); // => "sent welcome to u_123@example.com"
// discard: true returns the executionId instead of the result. const executionId = yield* SendEmailWorkflow.execute( { userId: "u_123", template: "welcome" }, { discard: true } ); // => a deterministic hash string
return { messageId, executionId };}).pipe( Effect.provide( // layerMemory is great for tests/local dev; use a durable engine in prod. SendEmailWorkflowLayer.pipe(Layer.provideMerge(WorkflowEngine.layerMemory)) ));Activities, in one line
Section titled “Activities, in one line”An Activity value is an Effect — you yield* it directly. The engine runs
the inner execute effect once, persists its encoded result, and on replay
returns the stored result instead of running the effect again. Make the underlying
side effect idempotent (see Activity.idempotencyKey), since a workflow resume may
re-run an activity that had not yet been persisted.
import { Effect, Schema } from "effect";import { Activity } from "effect/unstable/workflow";
const ChargeCard = Activity.make({ name: "ChargeCard", success: Schema.String, // a charge id error: Schema.String, execute: Effect.succeed("ch_abc")});// yield* ChargeCard inside a workflow gen => "ch_abc" (replayed on resume)Workflow module reference
Section titled “Workflow module reference”Workflow.make
Section titled “Workflow.make”Creates a durable workflow definition. Options: name, payload (struct fields or
a struct schema), idempotencyKey, optional success (default Schema.Void),
optional error (default Schema.Never), optional suspendedRetrySchedule
(a Schedule controlling retries while suspended), and optional annotations.
import { Schema, Schedule } from "effect";import { Workflow } from "effect/unstable/workflow";
const Order = Workflow.make({ name: "ProcessOrder", payload: { orderId: Schema.String }, success: Schema.Struct({ shipped: Schema.Boolean }), error: Schema.String, idempotencyKey: ({ orderId }) => orderId, // Retry the workflow while it is in a suspended state. suspendedRetrySchedule: Schedule.exponential("1 second")});// => Workflow<"ProcessOrder", ...>.execute(payload, { discard? })
Section titled “.execute(payload, { discard? })”Executes the workflow. Without discard it resolves to Success["Type"] (and may
fail with Error["Type"]); with { discard: true } it resolves to the
executionId string and never fails. Requires WorkflowEngine plus payload
encoding and result decoding services.
const result = yield* Order.execute({ orderId: "o_1" });// => { shipped: true }
const id = yield* Order.execute({ orderId: "o_1" }, { discard: true });// => "9f2c..." (executionId).poll(executionId)
Section titled “.poll(executionId)”Returns Option<Result<A, E>> for a previously started execution: None if it is
unknown, otherwise Some of a Complete or Suspended result.
import { Option } from "effect";
const polled = yield* Order.poll(id);if (Option.isSome(polled) && polled.value._tag === "Complete") { // polled.value.exit is the persisted Exit}// => Option.some(Complete { exit: ... }) | Option.some(Suspended {...}) | Option.none().interrupt(executionId)
Section titled “.interrupt(executionId)”Interrupts a running or suspended execution. Resolves to void and requires only
the WorkflowEngine.
yield* Order.interrupt(id);// => void.resume(executionId)
Section titled “.resume(executionId)”Manually resumes a suspended execution (for example one suspended via
SuspendOnFailure or a DurableDeferred). Resolves to void.
yield* Order.resume(id);// => void.executionId(payload)
Section titled “.executionId(payload)”Computes the deterministic execution ID for a payload without running anything —
the same value execute(..., { discard: true }) would return.
const id = yield* Order.executionId({ orderId: "o_1" });// => "9f2c..." (stable hash of name + idempotencyKey).toLayer(execute)
Section titled “.toLayer(execute)”Builds a Layer that registers the workflow’s handler with the engine. The
execute function receives (payload, executionId) and returns the success effect.
Use Effect.fnUntraced for the handler.
const layer = Order.toLayer( Effect.fnUntraced(function* (payload, executionId) { return { shipped: true }; }));// => Layer<never, never, WorkflowEngine | ...schema services>.annotate(key, value) and .annotateMerge(context)
Section titled “.annotate(key, value) and .annotateMerge(context)”Return a new workflow with extra annotations. annotate adds a single
Context.Key/value; annotateMerge merges a whole Context. Annotations are how
you toggle references like SuspendOnFailure and CaptureDefects per workflow.
import { Workflow } from "effect/unstable/workflow";
const Resumable = Order.annotate(Workflow.SuspendOnFailure, true);// => Workflow that suspends (instead of failing) on error, awaiting .resume.withCompensation(effect, (value, cause) => ...)
Section titled “.withCompensation(effect, (value, cause) => ...)”Instance method form of Workflow.withCompensation (below): registers a
compensation finalizer that runs if the entire workflow fails.
const compensated = Order.withCompensation( Effect.succeed("reserved-inventory"), (value, cause) => Effect.log(`compensating ${value}`));Workflow.withCompensation
Section titled “Workflow.withCompensation”Standalone dual that attaches saga-style compensation to a top-level effect inside
a workflow. The finalizer (value, cause) => Effect<void> runs only if the whole
workflow fails, and lets you undo a completed step.
import { Effect } from "effect";import { Workflow } from "effect/unstable/workflow";
// Inside a workflow handler:const reservationId = yield* Workflow.withCompensation( Effect.succeed("rsv_1"), (id, cause) => Effect.log(`releasing reservation ${id}`));// => "rsv_1" — release runs only if the workflow ultimately failsResults: Workflow.Result<A, E>
Section titled “Results: Workflow.Result<A, E>”The polled/decoded outcome of an execution: a union of Complete<A, E> and
Suspended.
import { Workflow } from "effect/unstable/workflow";// type R = Workflow.Result<{ shipped: boolean }, string>// => Complete<{ shipped: boolean }, string> | SuspendedWorkflow.Complete
Section titled “Workflow.Complete”Data.TaggedClass (_tag: "Complete") wrapping the persisted Exit. Use
Complete.Schema({ success, error }) to build a schema for it.
import { Exit } from "effect";import { Workflow } from "effect/unstable/workflow";
const c = new Workflow.Complete({ exit: Exit.succeed({ shipped: true }) });c._tag; // => "Complete"c.exit; // => Exit.succeed({ shipped: true })Workflow.Complete.Schema
Section titled “Workflow.Complete.Schema”Static constructor returning a schema that decodes/encodes a Complete from the
given success and error schemas (the underlying Exit is encoded with
Schema.Defect for unexpected failures).
import { Schema } from "effect";import { Workflow } from "effect/unstable/workflow";
const CompleteSchema = Workflow.Complete.Schema({ success: Schema.Number, error: Schema.String});// => schema for Complete<number, string>Workflow.Suspended
Section titled “Workflow.Suspended”Schema.Class (_tag: "Suspended") representing a paused execution, optionally
carrying the cause that triggered suspension.
import { Workflow } from "effect/unstable/workflow";
const s = new Workflow.Suspended({});s._tag; // => "Suspended"Workflow.Result(options)
Section titled “Workflow.Result(options)”Schema constructor: builds a Union of Complete.Schema and Suspended for the
given success/error schemas.
import { Schema } from "effect";import { Workflow } from "effect/unstable/workflow";
const ResultSchema = Workflow.Result({ success: Schema.Number, error: Schema.String});// => Schema.Union([Complete.Schema(...), Suspended])Workflow.ResultEncoded
Section titled “Workflow.ResultEncoded”A pre-built Schema.Codec for the JSON-encoded form of a result with generic
(Any | Void) payloads — used by proxy/engine serialization.
import { Workflow } from "effect/unstable/workflow";// Workflow.ResultEncoded : Schema.Codec<ResultEncoded<any, any>>Workflow.isResult
Section titled “Workflow.isResult”Runtime guard that returns true for both Complete and Suspended values.
import { Workflow } from "effect/unstable/workflow";
Workflow.isResult(new Workflow.Suspended({})); // => trueWorkflow.isResult({ _tag: "Complete" }); // => false (no result brand)Result types: CompleteEncoded, ResultEncoded
Section titled “Result types: CompleteEncoded, ResultEncoded”Type-level shapes for the encoded forms. CompleteEncoded<A, E> is
{ _tag: "Complete"; exit: ExitEncoded<A, E> }; ResultEncoded<A, E> is the union
with the encoded Suspended.
import type { Workflow } from "effect/unstable/workflow";// type CE = Workflow.CompleteEncoded<number, string>// type RE = Workflow.ResultEncoded<number, string>Workflow.scope
Section titled “Workflow.scope”An Effect yielding the workflow’s Scope. Unlike a normal scope, it is closed
only when the whole workflow completes — so resources you acquire on it survive
suspension and resume.
import { Workflow } from "effect/unstable/workflow";
// Inside a handler:const scope = yield* Workflow.scope;// => Scope that lives for the entire (possibly suspended) workflowWorkflow.provideScope
Section titled “Workflow.provideScope”Provides the workflow scope to a scoped effect, so its finalizers run only at full workflow completion rather than at the end of the current step.
import { Effect } from "effect";import { Workflow } from "effect/unstable/workflow";
const withResource = Workflow.provideScope( Effect.acquireRelease( Effect.succeed("conn"), () => Effect.log("closing connection") ));// => the release runs when the workflow fully completesWorkflow.addFinalizer
Section titled “Workflow.addFinalizer”Adds an exit finalizer to the workflow scope, capturing the services available at registration time. Runs once the workflow completes.
import { Effect, Exit } from "effect";import { Workflow } from "effect/unstable/workflow";
yield* Workflow.addFinalizer((exit) => Effect.log(Exit.isSuccess(exit) ? "workflow ok" : "workflow failed"));// => voidWorkflow.CaptureDefects
Section titled “Workflow.CaptureDefects”Context.Reference<boolean> (default true). When enabled, defects are captured
into the workflow/activity result instead of escaping as failures. Toggle per
workflow with .annotate, or per-effect with Effect.provideService.
import { Workflow } from "effect/unstable/workflow";
const noCapture = Order.annotate(Workflow.CaptureDefects, false);// => defects propagate as failures instead of being captured into the ResultWorkflow.SuspendOnFailure
Section titled “Workflow.SuspendOnFailure”Context.Reference<boolean> (default false). When enabled, any error suspends
the execution instead of failing it; resume later with MyWorkflow.resume(id).
import { Effect } from "effect";import { Workflow } from "effect/unstable/workflow";
// Per workflow:const resumable = Order.annotate(Workflow.SuspendOnFailure, true);
// Or per-effect inside a handler:const guarded = Effect.provideService( someStep, Workflow.SuspendOnFailure, true);Engine helpers: intoResult, wrapActivityResult, suspend
Section titled “Engine helpers: intoResult, wrapActivityResult, suspend”Lower-level building blocks used by WorkflowEngine implementations — you rarely
call these directly. intoResult runs an effect and converts its outcome into a
Result (handling suspension, defect capture, and scope finalization);
wrapActivityResult delays workflow suspension until running activities finish or
suspend; suspend marks an instance suspended and interrupts the current fiber.
import { Workflow } from "effect/unstable/workflow";// Workflow.intoResult(effect) => Effect<Result<A, E>, never, ...>// Workflow.wrapActivityResult(eff, fn) => Effect<A, E, R | WorkflowInstance>// Workflow.suspend(instance) => Effect<never>Type-level exports
Section titled “Type-level exports”These types support generic workflow APIs (proxies, engines). Use them when writing functions that operate over arbitrary workflows.
| Type | Meaning |
|---|---|
Workflow.Workflow<Name, Payload, Success, Error> | The full workflow definition interface. |
Workflow.Any | Type-erased workflow (name + schemas, no executable ops). |
Workflow.AnyWithProps | Type-erased workflow that also exposes execute/resume. |
Workflow.AnyStructSchema | Constraint for payload schemas that expose .fields. |
Workflow.Execution<Name> | Marker for services tied to a specific workflow run. |
Workflow.PayloadSchema<W> | Extracts the payload schema from a workflow. |
Workflow.RequirementsClient<W> | Schema services a client (execute/poll) needs. |
Workflow.RequirementsHandler<W> | Schema services a handler (decode/encode) needs. |
import type { Workflow } from "effect/unstable/workflow";// type P = Workflow.PayloadSchema<typeof Order> // the { orderId } struct schemaActivity module reference
Section titled “Activity module reference”An Activity is the side-effect boundary inside a workflow. The activity value
extends Effect, so yielding it runs (or replays) the work and returns the decoded
success value, failing with the decoded error.
Activity.make
Section titled “Activity.make”Creates an activity from an effect. Options: name (stable!), optional success
(default Schema.Void) and error (default Schema.Never) schemas, the execute
effect, an optional interruptRetryPolicy (a Schedule over Cause, controlling
retries when the activity is interrupted), and optional annotations.
import { Effect, Schema, Schedule } from "effect";import { Activity } from "effect/unstable/workflow";
const FetchUser = Activity.make({ name: "FetchUser", success: Schema.Struct({ id: Schema.String, name: Schema.String }), error: Schema.String, execute: Effect.succeed({ id: "u_1", name: "Ada" }), // Retry policy applied when the activity is interrupted. interruptRetryPolicy: Schedule.recurs(5)});// yield* FetchUser => { id: "u_1", name: "Ada" } (persisted, replayed on resume)Activity.retry(self, options)
Section titled “Activity.retry(self, options)”Retries an effect with Effect.retry while incrementing
Activity.CurrentAttempt on each attempt. Accepts the usual Effect.Retry.Options
except schedule (the attempt counter drives the loop).
import { Effect } from "effect";import { Activity } from "effect/unstable/workflow";
const robust = Activity.retry(FetchUser, { times: 3 });// => retries up to 3 times, with CurrentAttempt = 1, 2, 3Activity.CurrentAttempt
Section titled “Activity.CurrentAttempt”Context.Reference<number> (default 1) holding the current retry attempt. Read it
inside execute to vary behavior or to build attempt-specific idempotency keys.
import { Effect } from "effect";import { Activity } from "effect/unstable/workflow";
const attemptAware = Activity.make({ name: "AttemptAware", execute: Effect.gen(function* () { const attempt = yield* Activity.CurrentAttempt; yield* Effect.log(`attempt ${attempt}`); })});// => logs "attempt 1" on first runActivity.idempotencyKey(name, { includeAttempt? })
Section titled “Activity.idempotencyKey(name, { includeAttempt? })”Computes a deterministic hash from the current workflow executionId, the supplied
name, and optionally the CurrentAttempt. Use it as a stable request key when
calling external systems so retries don’t duplicate work.
import { Activity } from "effect/unstable/workflow";
// Inside an activity execute (has WorkflowInstance in context):const key = yield* Activity.idempotencyKey("ChargeCard");// => stable hash of executionId + "ChargeCard"
const perAttempt = yield* Activity.idempotencyKey("ChargeCard", { includeAttempt: true});// => hash also varies by attempt, for retries that must hit distinct operationsActivity.raceAll(name, activities)
Section titled “Activity.raceAll(name, activities)”Runs a non-empty array of activities as a durable race, returning the first completed success or failure. Success and error types are the unions of the racers’ schemas.
import { Effect, Schema } from "effect";import { Activity } from "effect/unstable/workflow";
const fromCache = Activity.make({ name: "FromCache", success: Schema.String, execute: Effect.succeed("cached")});const fromDb = Activity.make({ name: "FromDb", success: Schema.String, execute: Effect.succeed("db")});
// yield* this inside a workflow:const fastest = Activity.raceAll("ReadValue", [fromCache, fromDb]);// => "cached" | "db" (first to durably complete)activity.execute and activity.executeEncoded
Section titled “activity.execute and activity.executeEncoded”.execute is the underlying effect (scoped, needing the engine + instance);
.executeEncoded runs it and encodes the success/error through the JSON codecs of
the result schemas — the form the engine persists.
// activity.execute : Effect<Success["Type"], Error["Type"], ...>// activity.executeEncoded : Effect<unknown, unknown, ...> (encoded result)activity.annotate / activity.annotateMerge
Section titled “activity.annotate / activity.annotateMerge”Return a new activity with extra annotations (a single key/value, or a merged
Context).
import { Workflow } from "effect/unstable/workflow";
const noCapture = FetchUser.annotate(Workflow.CaptureDefects, false);// => activity whose defects propagate instead of being capturedactivity.successSchema / errorSchema / exitSchema
Section titled “activity.successSchema / errorSchema / exitSchema”The schemas attached to the activity: the success schema, the error schema, and the
combined Schema.Exit<Success, Error, Defect> used to encode persisted results.
FetchUser.successSchema; // => Schema.Struct({ id, name })FetchUser.errorSchema; // => Schema.StringFetchUser.exitSchema; // => Schema.Exit<...> for the persisted resultType-level exports
Section titled “Type-level exports”| Type | Meaning |
|---|---|
Activity.Activity<Success, Error, R> | The activity interface (extends Effect). |
Activity.Any | Type-erased activity (identity + encoded execution). |
Activity.AnyWithProps | Type-erased activity that also exposes the result schemas. |
import type { Activity } from "effect/unstable/workflow";// type A = Activity.Activity<typeof Schema.String, typeof Schema.Never>Where to go next
Section titled “Where to go next”- Durable Primitives —
DurableClock,DurableDeferred, andDurableQueuefor durable sleeps, signals, and queues inside workflows. - Engine & Execution — wiring a
WorkflowEngine, the in-memory engine for tests, and exposing workflows over the wire withWorkflowProxy.