Durable Clocks, Deferreds & Queues
A workflow that runs for hours, days, or weeks rarely does all of its work in a
single uninterrupted turn. It pauses for a timeout, waits for a human to approve
something, fans work out to a background worker, or races a deadline against an
external event. These pauses must survive process restarts and replays, so they
cannot be ordinary Effect.sleep calls or in-memory Deferreds.
The workflow package provides three coordination primitives whose state is
persisted by the WorkflowEngine:
DurableClock— a replayable, suspendable sleep.DurableDeferred— a named wait point completed by an activity, worker, timer, or an external callback (webhook, human approval).DurableQueue— a producer/worker queue where the producing workflow suspends until a background worker finishes the item.
All three are imported from effect/unstable/workflow and require a workflow
context (WorkflowEngine + WorkflowInstance), which you get by running them
inside workflow or activity code.
import { Effect, Schema } from "effect"import { DurableClock, DurableDeferred, DurableQueue} from "effect/unstable/workflow"DurableClock — replayable sleeps
Section titled “DurableClock — replayable sleeps”A DurableClock is a workflow-safe sleep. Unlike Effect.sleep, it can be
replayed and resumed by the engine after a restart. The common entry point is
DurableClock.sleep.
import { Effect } from "effect"import { DurableClock } from "effect/unstable/workflow"
const remindLater = Effect.gen(function* () { yield* Effect.log("sending the order")
// Short waits run in-process as an activity (<= 60s by default). yield* DurableClock.sleep({ name: "settle", duration: "5 seconds" })
// Long waits schedule a durable clock and suspend the workflow until the // engine wakes it — this survives process restarts. yield* DurableClock.sleep({ name: "follow-up", duration: "3 days" })
yield* Effect.log("sending the follow-up reminder")})How sleep chooses its strategy:
- Zero duration completes immediately (no-op).
- Duration
<= inMemoryThreshold(default 60 seconds) runs withEffect.sleepinside an activity namedDurableClock/<name>. Cheap, but the delay only survives in the current process. - Longer durations schedule a durable clock via the engine and
awaitthe clock’s deferred, suspending the workflow until the timer fires.
Lower inMemoryThreshold when even a short delay must survive a restart:
// Force a durable clock even for a 30s wait, so a crash mid-wait resumes// correctly instead of restarting the timer from zero.yield * DurableClock.sleep({ name: "grace-period", duration: "30 seconds", inMemoryThreshold: "0 millis"})DurableDeferred — named wait points
Section titled “DurableDeferred — named wait points”A DurableDeferred is a named wait point whose result is persisted by the
engine as an encoded Exit. A workflow can await one and suspend until
something else completes it — an activity, a worker, a timer, or an external
callback holding a token.
The most powerful use is coordinating with the outside world: a workflow
awaits a deferred, an external system (a webhook handler, an approval UI) holds
a token for it, and calls succeed/fail whenever the real-world event
happens — possibly hours later, from a different process.
import { Effect, Schema } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
// A wait point that resolves to an approval decision.const Approval = DurableDeferred.make("approval", { success: Schema.Struct({ approvedBy: Schema.String }), error: Schema.String})
const requestApproval = Effect.gen(function* () { // Hand a token to the outside world so it can complete this deferred later. const token = yield* DurableDeferred.token(Approval) yield* Effect.log(`waiting for approval, token = ${token}`)
// Suspend until someone completes the deferred. Resumes (even across a // restart) with the decoded success value, or fails with the error value. const decision = yield* DurableDeferred.await(Approval) yield* Effect.log(`approved by ${decision.approvedBy}`)})Elsewhere — e.g. inside an HTTP handler for an approval webhook — the external system completes the deferred using the token it was given:
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
// `token` came from the workflow above (stored in a DB, passed in a URL, ...).const handleApprovalWebhook = (token: DurableDeferred.Token, user: string) => DurableDeferred.succeed(Approval, { token, value: { approvedBy: user } })// requires WorkflowEngine to route the completion back to the executionDurableDeferred.into — record an effect’s result
Section titled “DurableDeferred.into — record an effect’s result”into runs an effect and records its Exit into the deferred, resuming any
workflows waiting on it. It is the bridge between “do the work” and “report the
result to waiters”, and is what DurableQueue and raceAll build on. It is
dual, so both data-first and pipe forms work.
import { Effect, Schema } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const Charge = DurableDeferred.make("charge", { success: Schema.Number })
// Data-first: run the effect, persist its Exit into `Charge`.const dataFirst = DurableDeferred.into( Effect.succeed(4200), // the work Charge // the deferred to complete)
// Pipe form (data-last):const piped = Effect.succeed(4200).pipe(DurableDeferred.into(Charge))// both => Effect<number> that also records `Exit.succeed(4200)` into ChargeDurableDeferred.raceAll — durable race
Section titled “DurableDeferred.raceAll — durable race”Runs several effects as a durable race. On first run it completes a named deferred with the first result; on replay it returns the persisted result instead of re-running anything.
import { Effect, Schema } from "effect"import { DurableClock, DurableDeferred } from "effect/unstable/workflow"
// Race real work against a timeout — whichever resolves first wins, durably.const withDeadline = DurableDeferred.raceAll({ name: "fetch-or-timeout", success: Schema.String, error: Schema.Never, effects: [ Effect.succeed("data"), DurableClock.sleep({ name: "deadline", duration: "1 hour" }).pipe( Effect.as("timed-out") ) ]})// => Effect<string> — "data" or "timed-out", persisted on first completionDurableQueue — background workers (new in v4)
Section titled “DurableQueue — background workers (new in v4)”DurableQueue (added in v4) delegates work to persisted background workers and
resumes the waiting workflow with the worker’s result. The producer calls
process, which encodes the payload, offers it to a backing
PersistedQueue, attaches a DurableDeferred token, and
suspends. A worker created with worker/makeWorker takes the item, runs the
handler, and records the handler’s Exit through that token — at which point the
producing workflow resumes with the typed success or error.
This is the authoritative end-to-end example from the module: define the queue
once, process from workflow code, and run a worker layer in the service
responsible for the work.
import { Effect, Schema } from "effect"import { DurableQueue, Workflow } from "effect/unstable/workflow"
// Define a DurableQueue: shared payload/success/error schemas + idempotency key.const ApiQueue = DurableQueue.make({ name: "ApiQueue", payload: { id: Schema.String }, success: Schema.Void, error: Schema.Never, idempotencyKey(payload) { return payload.id }})
const MyWorkflow = Workflow.make({ name: "MyWorkflow", payload: { id: Schema.String }, idempotencyKey: ({ id }) => id})
const MyWorkflowLayer = MyWorkflow.toLayer( Effect.fnUntraced(function* () { // Add an item to the DurableQueue. When the worker finishes processing it, // the workflow resumes here. yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })
yield* Effect.log("Workflow succeeded!") }))
// A worker layer that processes items from the DurableQueue.const ApiWorker = DurableQueue.worker( ApiQueue, Effect.fnUntraced(function* ({ id }) { yield* Effect.log(`Worker processing API call with id: ${id}`) }), { concurrency: 5 } // process up to 5 items concurrently)DurableDeferred reference
Section titled “DurableDeferred reference”Every public API of DurableDeferred. All completion APIs (done, succeed,
fail, failCause) and the token helpers tokenFromExecutionId /
tokenFromPayload are dual.
Creates a named durable deferred with optional success and error schemas.
Defaults are Schema.Void (success) and Schema.Never (error).
import { Schema } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const Signal = DurableDeferred.make("signal") // success: void, error: neverconst Result = DurableDeferred.make("result", { success: Schema.Number, error: Schema.String})// => DurableDeferred<Schema.Number, Schema.String>Suspends the current workflow until the deferred has a persisted completion, then resumes with the decoded success value (or fails with the error).
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () { const value = yield* DurableDeferred.await(Result) // => suspends; on resume, value: number})Runs an effect and records its Exit into the deferred, resuming waiters. Dual.
See the common-case section above for both forms.
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
Effect.fail("boom").pipe(DurableDeferred.into(Result))// => records Exit.fail("boom") into Result, then fails with "boom"raceAll
Section titled “raceAll”Durable race over a non-empty array of effects; persists the first result under
raceAll/<name>. See above for a runnable example.
Completes the deferred identified by a token with an arbitrary Exit. The
low-level primitive behind succeed/fail/failCause. Dual.
import { Exit } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.done(Result, { token, exit: Exit.succeed(200) })// => Effect<void>, requires WorkflowEnginesucceed
Section titled “succeed”Completes the deferred with a successful value (shorthand for
done(..., { exit: Exit.succeed(value) })). Dual.
import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.succeed(Result, { token, value: 200 })// => Effect<void>Completes the deferred with a typed failure (Exit.fail(error)). Dual.
import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.fail(Result, { token, error: "declined" })// => Effect<void>failCause
Section titled “failCause”Completes the deferred with a full Cause (defects, multiple failures,
interruption). Dual.
import { Cause } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.failCause(Result, { token, cause: Cause.fail("declined") })// => Effect<void>Builds a Token for the deferred using the current workflow instance’s name
and execution id. Hand this to an external system so it can complete the
deferred later.
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () { const t = yield* DurableDeferred.token(Result) // => Token (branded string), requires WorkflowInstance})tokenFromExecutionId
Section titled “tokenFromExecutionId”Builds a token from an explicit workflow + executionId (no running instance
needed). Dual; returns a Token synchronously.
import { DurableDeferred } from "effect/unstable/workflow"
const t = DurableDeferred.tokenFromExecutionId(Result, { workflow: MyWorkflow, executionId: "order-123"})// => TokentokenFromPayload
Section titled “tokenFromPayload”Builds a token by deriving the execution id from a workflow payload (using
the workflow’s own idempotencyKey). Dual; returns an Effect<Token>.
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () { const t = yield* DurableDeferred.tokenFromPayload(Result, { workflow: MyWorkflow, payload: { id: "order-123" } }) // => Token})withActivityAttempt
Section titled “withActivityAttempt”A member of each deferred: an Effect that yields an attempt-scoped copy named
<name>/<attempt>, so a retried activity completes a fresh wait point instead of
clobbering a previous attempt’s completion.
import { Effect } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () { const scoped = yield* Result.withActivityAttempt // => DurableDeferred named "result/<currentAttempt>"})Token / TokenTypeId
Section titled “Token / TokenTypeId”Token is a branded string (Brand.Branded<string, TokenTypeId>) plus a
matching Schema for decoding/encoding in payloads. TokenTypeId is the
runtime/type-level brand identifier.
import { Schema } from "effect"import { DurableDeferred } from "effect/unstable/workflow"
// Embed a token in an API payload schema:const CallbackBody = Schema.Struct({ token: DurableDeferred.Token })// => Schema with field `token: Token`TokenParsed
Section titled “TokenParsed”A Schema.Class holding a token’s three components (workflowName,
executionId, deferredName) with helpers to convert to/from the encoded
string form.
import { DurableDeferred } from "effect/unstable/workflow"
const parsed = DurableDeferred.TokenParsed.fromString(token)// => TokenParsed { workflowName, executionId, deferredName }
parsed.asToken // => Token (re-encode)DurableDeferred.TokenParsed.encode(parsed) // => string (via FromString schema)// TokenParsed.FromString is the underlying Schema<string -> TokenParsed>Type-level: DurableDeferred, Any, AnyWithProps
Section titled “Type-level: DurableDeferred, Any, AnyWithProps”DurableDeferred<Success, Error> is the full model. Any is the type-erased
shape (just name + the type id) for APIs that only need identity;
AnyWithProps additionally exposes successSchema / errorSchema /
exitSchema.
import { DurableDeferred } from "effect/unstable/workflow"
const accept = (d: DurableDeferred.Any) => d.name// => accepts any deferred regardless of its Success/Error typesDurableQueue reference
Section titled “DurableQueue reference”Defines a queue: a name, a payload schema (struct fields or a Schema), an
idempotencyKey derived from the payload, and optional success / error
schemas (default Schema.Void / Schema.Never). The idempotency key becomes the
persisted queue item id.
import { Schema } from "effect"import { DurableQueue } from "effect/unstable/workflow"
const EmailQueue = DurableQueue.make({ name: "EmailQueue", payload: { to: Schema.String, subject: Schema.String }, success: Schema.Boolean, idempotencyKey: ({ to, subject }) => `${to}:${subject}`})// => DurableQueue<Schema.Struct<...>, Schema.Boolean, Schema.Never>process
Section titled “process”Enqueues an item and awaits the worker’s result, suspending the producing
workflow in between. Optionally retries transient PersistedQueueErrors from the
backing store via retrySchedule. Requires WorkflowEngine, WorkflowInstance,
and PersistedQueue.PersistedQueueFactory.
import { Effect, Schedule } from "effect"import { DurableQueue } from "effect/unstable/workflow"
const program = Effect.gen(function* () { const sent = yield* DurableQueue.process( EmailQueue, { to: "a@b.com", subject: "hi" }, { retrySchedule: Schedule.exponential("500 millis") } ) // => suspends; on resume, sent: boolean (the worker's result)})makeWorker
Section titled “makeWorker”Builds a worker as an Effect<never>: it loops, takes items, runs the handler
f, and records each Exit back through the item’s token. concurrency
(default 1) processes that many items in parallel. Use this when you want to
fork the worker yourself.
import { Effect } from "effect"import { DurableQueue } from "effect/unstable/workflow"
const worker = DurableQueue.makeWorker( EmailQueue, Effect.fnUntraced(function* ({ to, subject }) { yield* Effect.log(`sending ${subject} to ${to}`) return true }), { concurrency: 10 })// => Effect<never> — run/fork it inside a scopeworker
Section titled “worker”The layer form of makeWorker: Layer.effectDiscard that forkScopeds the
worker. This is the usual way to wire a worker into your application alongside
the producing workflow’s layer.
import { Layer } from "effect"import { DurableQueue } from "effect/unstable/workflow"
const EmailWorker = DurableQueue.worker( EmailQueue, (payload) => Effect.as(Effect.log(`sending to ${payload.to}`), true), { concurrency: 10 })// => Layer<never, never, WorkflowEngine | PersistedQueueFactory | ...>
// Wire producer + worker together:const AppLayer = Layer.mergeAll(MyWorkflowLayer, EmailWorker)Type-level: DurableQueue, TypeId
Section titled “Type-level: DurableQueue, TypeId”DurableQueue<Payload, Success, Error> is the queue model (carrying
payloadSchema, idempotencyKey, and the internal deferred). TypeId is the
runtime/type-level identifier branding queue values.
import { DurableQueue } from "effect/unstable/workflow"
const queueName = (q: DurableQueue.DurableQueue<any, any, any>) => q.name// => reads the queue's name regardless of its schemasDurableClock reference
Section titled “DurableClock reference”The workflow-facing helper covered above: in-process activity for short waits,
durable scheduled clock for long ones. inMemoryThreshold defaults to 60s.
import { DurableClock } from "effect/unstable/workflow"
DurableClock.sleep({ name: "retry-backoff", duration: "10 minutes" })// => Effect<void, never, WorkflowEngine | WorkflowInstance>Constructs the DurableClock model directly: a name, a normalized Duration,
and a DurableDeferred (named DurableClock/<name>) completed when the timer
wakes. sleep uses this internally; you rarely call it yourself.
import { DurableClock } from "effect/unstable/workflow"
const clock = DurableClock.make({ name: "deadline", duration: "1 day" })// => { name: "deadline", duration: Duration, deferred: DurableDeferred<Void> }Type-level: DurableClock
Section titled “Type-level: DurableClock”The model interface: { name, duration, deferred }. Useful when wiring a custom
engine that schedules clocks.
import { DurableClock } from "effect/unstable/workflow"
const describe = (c: DurableClock.DurableClock) => `${c.name}: ${c.duration}`// => formats a clock's name and durationWhere these fit
Section titled “Where these fit”- These primitives only run with a workflow context. Learn how to define workflows and activities in Workflows & Activities.
- The engine that persists clocks, deferreds, and queue state — and how to execute, poll, resume, and interrupt workflows — is covered in Engine & Execution.
DurableQueueis backed by aPersistedQueue; choose a memory, SQL, or Redis store there.