Skip to content

Workflows & Activities

A workflow is a typed, durable description of a long-running business process. It bundles a stable name, a struct payload schema, success/error schemas, and an idempotencyKey function that derives a deterministic execution ID from the payload. The actual side effects live inside activities — named, schema-backed effects that the engine can persist and replay instead of rerunning.

Because a WorkflowEngine persists, suspends, and resumes execution state, the golden rule is: keep external side effects at activity boundaries, and keep names and schemas stable across deployments so persisted results can be replayed.

The modules on this page are unstable and imported from effect/unstable/workflow. To run a workflow you also need an engine — see Engine & Execution — and the durable building blocks (DurableClock, DurableDeferred, DurableQueue) are covered in Durable Primitives.

Workflow.make takes a stable name, the payload as struct fields (or a struct schema), and an idempotencyKey that turns the payload into a stable string. success defaults to Schema.Void and error defaults to Schema.Never.

import { Schema } from "effect";
import { Workflow } from "effect/unstable/workflow";
const SendEmailWorkflow = Workflow.make({
name: "SendEmailWorkflow",
// Payload fields — encoded/decoded with these schemas across the durable boundary.
payload: {
userId: Schema.String,
template: Schema.String
},
// The success value the workflow resolves to (defaults to Schema.Void).
success: Schema.String,
// The typed failure (defaults to Schema.Never).
error: Schema.String,
// Derive a STABLE id for this logical request. Same payload => same execution.
idempotencyKey: ({ userId, template }) => `${userId}:${template}`
});

The handler is registered with MyWorkflow.toLayer. Write the handler with Effect.fnUntraced (not a plain function returning Effect.gen); it receives the decoded payload and the executionId. Inside the handler you yield activities, durable clocks, and durable deferreds.

import { Effect, Schema } from "effect";
import { Activity } from "effect/unstable/workflow";
const SendEmailWorkflowLayer = SendEmailWorkflow.toLayer(
Effect.fnUntraced(function* (payload, executionId) {
// Side effects belong in activities so they can be persisted and replayed.
const address = yield* Activity.make({
name: "LookupAddress",
success: Schema.String,
execute: Effect.succeed(`${payload.userId}@example.com`)
});
const messageId = yield* Activity.make({
name: "SendEmail",
success: Schema.String,
execute: Effect.sync(() => `sent ${payload.template} to ${address}`)
});
return messageId;
})
);

MyWorkflow.execute(payload) runs the workflow and returns the success value. Passing { discard: true } returns the executionId string instead (fire and forget). Execution requires the WorkflowEngine plus any schema encoding/decoding services your payload and result schemas need.

import { Effect, Layer } from "effect";
import { WorkflowEngine } from "effect/unstable/workflow";
const program = Effect.gen(function* () {
// Returns Success["Type"] — here, a string.
const messageId = yield* SendEmailWorkflow.execute({
userId: "u_123",
template: "welcome"
});
// => "sent welcome to u_123@example.com"
// discard: true returns the executionId instead of the result.
const executionId = yield* SendEmailWorkflow.execute(
{ userId: "u_123", template: "welcome" },
{ discard: true }
);
// => a deterministic hash string
return { messageId, executionId };
}).pipe(
Effect.provide(
// layerMemory is great for tests/local dev; use a durable engine in prod.
SendEmailWorkflowLayer.pipe(Layer.provideMerge(WorkflowEngine.layerMemory))
)
);

An Activity value is an Effect — you yield* it directly. The engine runs the inner execute effect once, persists its encoded result, and on replay returns the stored result instead of running the effect again. Make the underlying side effect idempotent (see Activity.idempotencyKey), since a workflow resume may re-run an activity that had not yet been persisted.

import { Effect, Schema } from "effect";
import { Activity } from "effect/unstable/workflow";
const ChargeCard = Activity.make({
name: "ChargeCard",
success: Schema.String, // a charge id
error: Schema.String,
execute: Effect.succeed("ch_abc")
});
// yield* ChargeCard inside a workflow gen => "ch_abc" (replayed on resume)

Creates a durable workflow definition. Options: name, payload (struct fields or a struct schema), idempotencyKey, optional success (default Schema.Void), optional error (default Schema.Never), optional suspendedRetrySchedule (a Schedule controlling retries while suspended), and optional annotations.

import { Schema, Schedule } from "effect";
import { Workflow } from "effect/unstable/workflow";
const Order = Workflow.make({
name: "ProcessOrder",
payload: { orderId: Schema.String },
success: Schema.Struct({ shipped: Schema.Boolean }),
error: Schema.String,
idempotencyKey: ({ orderId }) => orderId,
// Retry the workflow while it is in a suspended state.
suspendedRetrySchedule: Schedule.exponential("1 second")
});
// => Workflow<"ProcessOrder", ...>

Executes the workflow. Without discard it resolves to Success["Type"] (and may fail with Error["Type"]); with { discard: true } it resolves to the executionId string and never fails. Requires WorkflowEngine plus payload encoding and result decoding services.

const result = yield* Order.execute({ orderId: "o_1" });
// => { shipped: true }
const id = yield* Order.execute({ orderId: "o_1" }, { discard: true });
// => "9f2c..." (executionId)

Returns Option<Result<A, E>> for a previously started execution: None if it is unknown, otherwise Some of a Complete or Suspended result.

import { Option } from "effect";
const polled = yield* Order.poll(id);
if (Option.isSome(polled) && polled.value._tag === "Complete") {
// polled.value.exit is the persisted Exit
}
// => Option.some(Complete { exit: ... }) | Option.some(Suspended {...}) | Option.none()

Interrupts a running or suspended execution. Resolves to void and requires only the WorkflowEngine.

yield* Order.interrupt(id);
// => void

Manually resumes a suspended execution (for example one suspended via SuspendOnFailure or a DurableDeferred). Resolves to void.

yield* Order.resume(id);
// => void

Computes the deterministic execution ID for a payload without running anything — the same value execute(..., { discard: true }) would return.

const id = yield* Order.executionId({ orderId: "o_1" });
// => "9f2c..." (stable hash of name + idempotencyKey)

Builds a Layer that registers the workflow’s handler with the engine. The execute function receives (payload, executionId) and returns the success effect. Use Effect.fnUntraced for the handler.

const layer = Order.toLayer(
Effect.fnUntraced(function* (payload, executionId) {
return { shipped: true };
})
);
// => Layer<never, never, WorkflowEngine | ...schema services>

.annotate(key, value) and .annotateMerge(context)

Section titled “.annotate(key, value) and .annotateMerge(context)”

Return a new workflow with extra annotations. annotate adds a single Context.Key/value; annotateMerge merges a whole Context. Annotations are how you toggle references like SuspendOnFailure and CaptureDefects per workflow.

import { Workflow } from "effect/unstable/workflow";
const Resumable = Order.annotate(Workflow.SuspendOnFailure, true);
// => Workflow that suspends (instead of failing) on error, awaiting .resume

.withCompensation(effect, (value, cause) => ...)

Section titled “.withCompensation(effect, (value, cause) => ...)”

Instance method form of Workflow.withCompensation (below): registers a compensation finalizer that runs if the entire workflow fails.

const compensated = Order.withCompensation(
Effect.succeed("reserved-inventory"),
(value, cause) => Effect.log(`compensating ${value}`)
);

Standalone dual that attaches saga-style compensation to a top-level effect inside a workflow. The finalizer (value, cause) => Effect<void> runs only if the whole workflow fails, and lets you undo a completed step.

import { Effect } from "effect";
import { Workflow } from "effect/unstable/workflow";
// Inside a workflow handler:
const reservationId = yield* Workflow.withCompensation(
Effect.succeed("rsv_1"),
(id, cause) => Effect.log(`releasing reservation ${id}`)
);
// => "rsv_1" — release runs only if the workflow ultimately fails

The polled/decoded outcome of an execution: a union of Complete<A, E> and Suspended.

import { Workflow } from "effect/unstable/workflow";
// type R = Workflow.Result<{ shipped: boolean }, string>
// => Complete<{ shipped: boolean }, string> | Suspended

Data.TaggedClass (_tag: "Complete") wrapping the persisted Exit. Use Complete.Schema({ success, error }) to build a schema for it.

import { Exit } from "effect";
import { Workflow } from "effect/unstable/workflow";
const c = new Workflow.Complete({ exit: Exit.succeed({ shipped: true }) });
c._tag; // => "Complete"
c.exit; // => Exit.succeed({ shipped: true })

Static constructor returning a schema that decodes/encodes a Complete from the given success and error schemas (the underlying Exit is encoded with Schema.Defect for unexpected failures).

import { Schema } from "effect";
import { Workflow } from "effect/unstable/workflow";
const CompleteSchema = Workflow.Complete.Schema({
success: Schema.Number,
error: Schema.String
});
// => schema for Complete<number, string>

Schema.Class (_tag: "Suspended") representing a paused execution, optionally carrying the cause that triggered suspension.

import { Workflow } from "effect/unstable/workflow";
const s = new Workflow.Suspended({});
s._tag; // => "Suspended"

Schema constructor: builds a Union of Complete.Schema and Suspended for the given success/error schemas.

import { Schema } from "effect";
import { Workflow } from "effect/unstable/workflow";
const ResultSchema = Workflow.Result({
success: Schema.Number,
error: Schema.String
});
// => Schema.Union([Complete.Schema(...), Suspended])

A pre-built Schema.Codec for the JSON-encoded form of a result with generic (Any | Void) payloads — used by proxy/engine serialization.

import { Workflow } from "effect/unstable/workflow";
// Workflow.ResultEncoded : Schema.Codec<ResultEncoded<any, any>>

Runtime guard that returns true for both Complete and Suspended values.

import { Workflow } from "effect/unstable/workflow";
Workflow.isResult(new Workflow.Suspended({})); // => true
Workflow.isResult({ _tag: "Complete" }); // => false (no result brand)

Result types: CompleteEncoded, ResultEncoded

Section titled “Result types: CompleteEncoded, ResultEncoded”

Type-level shapes for the encoded forms. CompleteEncoded<A, E> is { _tag: "Complete"; exit: ExitEncoded<A, E> }; ResultEncoded<A, E> is the union with the encoded Suspended.

import type { Workflow } from "effect/unstable/workflow";
// type CE = Workflow.CompleteEncoded<number, string>
// type RE = Workflow.ResultEncoded<number, string>

An Effect yielding the workflow’s Scope. Unlike a normal scope, it is closed only when the whole workflow completes — so resources you acquire on it survive suspension and resume.

import { Workflow } from "effect/unstable/workflow";
// Inside a handler:
const scope = yield* Workflow.scope;
// => Scope that lives for the entire (possibly suspended) workflow

Provides the workflow scope to a scoped effect, so its finalizers run only at full workflow completion rather than at the end of the current step.

import { Effect } from "effect";
import { Workflow } from "effect/unstable/workflow";
const withResource = Workflow.provideScope(
Effect.acquireRelease(
Effect.succeed("conn"),
() => Effect.log("closing connection")
)
);
// => the release runs when the workflow fully completes

Adds an exit finalizer to the workflow scope, capturing the services available at registration time. Runs once the workflow completes.

import { Effect, Exit } from "effect";
import { Workflow } from "effect/unstable/workflow";
yield* Workflow.addFinalizer((exit) =>
Effect.log(Exit.isSuccess(exit) ? "workflow ok" : "workflow failed")
);
// => void

Context.Reference<boolean> (default true). When enabled, defects are captured into the workflow/activity result instead of escaping as failures. Toggle per workflow with .annotate, or per-effect with Effect.provideService.

import { Workflow } from "effect/unstable/workflow";
const noCapture = Order.annotate(Workflow.CaptureDefects, false);
// => defects propagate as failures instead of being captured into the Result

Context.Reference<boolean> (default false). When enabled, any error suspends the execution instead of failing it; resume later with MyWorkflow.resume(id).

import { Effect } from "effect";
import { Workflow } from "effect/unstable/workflow";
// Per workflow:
const resumable = Order.annotate(Workflow.SuspendOnFailure, true);
// Or per-effect inside a handler:
const guarded = Effect.provideService(
someStep,
Workflow.SuspendOnFailure,
true
);

Engine helpers: intoResult, wrapActivityResult, suspend

Section titled “Engine helpers: intoResult, wrapActivityResult, suspend”

Lower-level building blocks used by WorkflowEngine implementations — you rarely call these directly. intoResult runs an effect and converts its outcome into a Result (handling suspension, defect capture, and scope finalization); wrapActivityResult delays workflow suspension until running activities finish or suspend; suspend marks an instance suspended and interrupts the current fiber.

import { Workflow } from "effect/unstable/workflow";
// Workflow.intoResult(effect) => Effect<Result<A, E>, never, ...>
// Workflow.wrapActivityResult(eff, fn) => Effect<A, E, R | WorkflowInstance>
// Workflow.suspend(instance) => Effect<never>

These types support generic workflow APIs (proxies, engines). Use them when writing functions that operate over arbitrary workflows.

TypeMeaning
Workflow.Workflow<Name, Payload, Success, Error>The full workflow definition interface.
Workflow.AnyType-erased workflow (name + schemas, no executable ops).
Workflow.AnyWithPropsType-erased workflow that also exposes execute/resume.
Workflow.AnyStructSchemaConstraint for payload schemas that expose .fields.
Workflow.Execution<Name>Marker for services tied to a specific workflow run.
Workflow.PayloadSchema<W>Extracts the payload schema from a workflow.
Workflow.RequirementsClient<W>Schema services a client (execute/poll) needs.
Workflow.RequirementsHandler<W>Schema services a handler (decode/encode) needs.
import type { Workflow } from "effect/unstable/workflow";
// type P = Workflow.PayloadSchema<typeof Order> // the { orderId } struct schema

An Activity is the side-effect boundary inside a workflow. The activity value extends Effect, so yielding it runs (or replays) the work and returns the decoded success value, failing with the decoded error.

Creates an activity from an effect. Options: name (stable!), optional success (default Schema.Void) and error (default Schema.Never) schemas, the execute effect, an optional interruptRetryPolicy (a Schedule over Cause, controlling retries when the activity is interrupted), and optional annotations.

import { Effect, Schema, Schedule } from "effect";
import { Activity } from "effect/unstable/workflow";
const FetchUser = Activity.make({
name: "FetchUser",
success: Schema.Struct({ id: Schema.String, name: Schema.String }),
error: Schema.String,
execute: Effect.succeed({ id: "u_1", name: "Ada" }),
// Retry policy applied when the activity is interrupted.
interruptRetryPolicy: Schedule.recurs(5)
});
// yield* FetchUser => { id: "u_1", name: "Ada" } (persisted, replayed on resume)

Retries an effect with Effect.retry while incrementing Activity.CurrentAttempt on each attempt. Accepts the usual Effect.Retry.Options except schedule (the attempt counter drives the loop).

import { Effect } from "effect";
import { Activity } from "effect/unstable/workflow";
const robust = Activity.retry(FetchUser, { times: 3 });
// => retries up to 3 times, with CurrentAttempt = 1, 2, 3

Context.Reference<number> (default 1) holding the current retry attempt. Read it inside execute to vary behavior or to build attempt-specific idempotency keys.

import { Effect } from "effect";
import { Activity } from "effect/unstable/workflow";
const attemptAware = Activity.make({
name: "AttemptAware",
execute: Effect.gen(function* () {
const attempt = yield* Activity.CurrentAttempt;
yield* Effect.log(`attempt ${attempt}`);
})
});
// => logs "attempt 1" on first run

Activity.idempotencyKey(name, { includeAttempt? })

Section titled “Activity.idempotencyKey(name, { includeAttempt? })”

Computes a deterministic hash from the current workflow executionId, the supplied name, and optionally the CurrentAttempt. Use it as a stable request key when calling external systems so retries don’t duplicate work.

import { Activity } from "effect/unstable/workflow";
// Inside an activity execute (has WorkflowInstance in context):
const key = yield* Activity.idempotencyKey("ChargeCard");
// => stable hash of executionId + "ChargeCard"
const perAttempt = yield* Activity.idempotencyKey("ChargeCard", {
includeAttempt: true
});
// => hash also varies by attempt, for retries that must hit distinct operations

Runs a non-empty array of activities as a durable race, returning the first completed success or failure. Success and error types are the unions of the racers’ schemas.

import { Effect, Schema } from "effect";
import { Activity } from "effect/unstable/workflow";
const fromCache = Activity.make({
name: "FromCache",
success: Schema.String,
execute: Effect.succeed("cached")
});
const fromDb = Activity.make({
name: "FromDb",
success: Schema.String,
execute: Effect.succeed("db")
});
// yield* this inside a workflow:
const fastest = Activity.raceAll("ReadValue", [fromCache, fromDb]);
// => "cached" | "db" (first to durably complete)

activity.execute and activity.executeEncoded

Section titled “activity.execute and activity.executeEncoded”

.execute is the underlying effect (scoped, needing the engine + instance); .executeEncoded runs it and encodes the success/error through the JSON codecs of the result schemas — the form the engine persists.

// activity.execute : Effect<Success["Type"], Error["Type"], ...>
// activity.executeEncoded : Effect<unknown, unknown, ...> (encoded result)

activity.annotate / activity.annotateMerge

Section titled “activity.annotate / activity.annotateMerge”

Return a new activity with extra annotations (a single key/value, or a merged Context).

import { Workflow } from "effect/unstable/workflow";
const noCapture = FetchUser.annotate(Workflow.CaptureDefects, false);
// => activity whose defects propagate instead of being captured

activity.successSchema / errorSchema / exitSchema

Section titled “activity.successSchema / errorSchema / exitSchema”

The schemas attached to the activity: the success schema, the error schema, and the combined Schema.Exit<Success, Error, Defect> used to encode persisted results.

FetchUser.successSchema; // => Schema.Struct({ id, name })
FetchUser.errorSchema; // => Schema.String
FetchUser.exitSchema; // => Schema.Exit<...> for the persisted result
TypeMeaning
Activity.Activity<Success, Error, R>The activity interface (extends Effect).
Activity.AnyType-erased activity (identity + encoded execution).
Activity.AnyWithPropsType-erased activity that also exposes the result schemas.
import type { Activity } from "effect/unstable/workflow";
// type A = Activity.Activity<typeof Schema.String, typeof Schema.Never>

  • Durable PrimitivesDurableClock, DurableDeferred, and DurableQueue for durable sleeps, signals, and queues inside workflows.
  • Engine & Execution — wiring a WorkflowEngine, the in-memory engine for tests, and exposing workflows over the wire with WorkflowProxy.