Skip to content

Durable Clocks, Deferreds & Queues

A workflow that runs for hours, days, or weeks rarely does all of its work in a single uninterrupted turn. It pauses for a timeout, waits for a human to approve something, fans work out to a background worker, or races a deadline against an external event. These pauses must survive process restarts and replays, so they cannot be ordinary Effect.sleep calls or in-memory Deferreds.

The workflow package provides three coordination primitives whose state is persisted by the WorkflowEngine:

  • DurableClock — a replayable, suspendable sleep.
  • DurableDeferred — a named wait point completed by an activity, worker, timer, or an external callback (webhook, human approval).
  • DurableQueue — a producer/worker queue where the producing workflow suspends until a background worker finishes the item.

All three are imported from effect/unstable/workflow and require a workflow context (WorkflowEngine + WorkflowInstance), which you get by running them inside workflow or activity code.

import { Effect, Schema } from "effect"
import {
DurableClock,
DurableDeferred,
DurableQueue
} from "effect/unstable/workflow"

A DurableClock is a workflow-safe sleep. Unlike Effect.sleep, it can be replayed and resumed by the engine after a restart. The common entry point is DurableClock.sleep.

import { Effect } from "effect"
import { DurableClock } from "effect/unstable/workflow"
const remindLater = Effect.gen(function* () {
yield* Effect.log("sending the order")
// Short waits run in-process as an activity (<= 60s by default).
yield* DurableClock.sleep({ name: "settle", duration: "5 seconds" })
// Long waits schedule a durable clock and suspend the workflow until the
// engine wakes it — this survives process restarts.
yield* DurableClock.sleep({ name: "follow-up", duration: "3 days" })
yield* Effect.log("sending the follow-up reminder")
})

How sleep chooses its strategy:

  • Zero duration completes immediately (no-op).
  • Duration <= inMemoryThreshold (default 60 seconds) runs with Effect.sleep inside an activity named DurableClock/<name>. Cheap, but the delay only survives in the current process.
  • Longer durations schedule a durable clock via the engine and await the clock’s deferred, suspending the workflow until the timer fires.

Lower inMemoryThreshold when even a short delay must survive a restart:

// Force a durable clock even for a 30s wait, so a crash mid-wait resumes
// correctly instead of restarting the timer from zero.
yield * DurableClock.sleep({
name: "grace-period",
duration: "30 seconds",
inMemoryThreshold: "0 millis"
})

A DurableDeferred is a named wait point whose result is persisted by the engine as an encoded Exit. A workflow can await one and suspend until something else completes it — an activity, a worker, a timer, or an external callback holding a token.

The most powerful use is coordinating with the outside world: a workflow awaits a deferred, an external system (a webhook handler, an approval UI) holds a token for it, and calls succeed/fail whenever the real-world event happens — possibly hours later, from a different process.

import { Effect, Schema } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
// A wait point that resolves to an approval decision.
const Approval = DurableDeferred.make("approval", {
success: Schema.Struct({ approvedBy: Schema.String }),
error: Schema.String
})
const requestApproval = Effect.gen(function* () {
// Hand a token to the outside world so it can complete this deferred later.
const token = yield* DurableDeferred.token(Approval)
yield* Effect.log(`waiting for approval, token = ${token}`)
// Suspend until someone completes the deferred. Resumes (even across a
// restart) with the decoded success value, or fails with the error value.
const decision = yield* DurableDeferred.await(Approval)
yield* Effect.log(`approved by ${decision.approvedBy}`)
})

Elsewhere — e.g. inside an HTTP handler for an approval webhook — the external system completes the deferred using the token it was given:

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
// `token` came from the workflow above (stored in a DB, passed in a URL, ...).
const handleApprovalWebhook = (token: DurableDeferred.Token, user: string) =>
DurableDeferred.succeed(Approval, {
token,
value: { approvedBy: user }
})
// requires WorkflowEngine to route the completion back to the execution

DurableDeferred.into — record an effect’s result

Section titled “DurableDeferred.into — record an effect’s result”

into runs an effect and records its Exit into the deferred, resuming any workflows waiting on it. It is the bridge between “do the work” and “report the result to waiters”, and is what DurableQueue and raceAll build on. It is dual, so both data-first and pipe forms work.

import { Effect, Schema } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const Charge = DurableDeferred.make("charge", { success: Schema.Number })
// Data-first: run the effect, persist its Exit into `Charge`.
const dataFirst = DurableDeferred.into(
Effect.succeed(4200), // the work
Charge // the deferred to complete
)
// Pipe form (data-last):
const piped = Effect.succeed(4200).pipe(DurableDeferred.into(Charge))
// both => Effect<number> that also records `Exit.succeed(4200)` into Charge

Runs several effects as a durable race. On first run it completes a named deferred with the first result; on replay it returns the persisted result instead of re-running anything.

import { Effect, Schema } from "effect"
import { DurableClock, DurableDeferred } from "effect/unstable/workflow"
// Race real work against a timeout — whichever resolves first wins, durably.
const withDeadline = DurableDeferred.raceAll({
name: "fetch-or-timeout",
success: Schema.String,
error: Schema.Never,
effects: [
Effect.succeed("data"),
DurableClock.sleep({ name: "deadline", duration: "1 hour" }).pipe(
Effect.as("timed-out")
)
]
})
// => Effect<string> — "data" or "timed-out", persisted on first completion

DurableQueue — background workers (new in v4)

Section titled “DurableQueue — background workers (new in v4)”

DurableQueue (added in v4) delegates work to persisted background workers and resumes the waiting workflow with the worker’s result. The producer calls process, which encodes the payload, offers it to a backing PersistedQueue, attaches a DurableDeferred token, and suspends. A worker created with worker/makeWorker takes the item, runs the handler, and records the handler’s Exit through that token — at which point the producing workflow resumes with the typed success or error.

This is the authoritative end-to-end example from the module: define the queue once, process from workflow code, and run a worker layer in the service responsible for the work.

import { Effect, Schema } from "effect"
import { DurableQueue, Workflow } from "effect/unstable/workflow"
// Define a DurableQueue: shared payload/success/error schemas + idempotency key.
const ApiQueue = DurableQueue.make({
name: "ApiQueue",
payload: {
id: Schema.String
},
success: Schema.Void,
error: Schema.Never,
idempotencyKey(payload) {
return payload.id
}
})
const MyWorkflow = Workflow.make({
name: "MyWorkflow",
payload: {
id: Schema.String
},
idempotencyKey: ({ id }) => id
})
const MyWorkflowLayer = MyWorkflow.toLayer(
Effect.fnUntraced(function* () {
// Add an item to the DurableQueue. When the worker finishes processing it,
// the workflow resumes here.
yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })
yield* Effect.log("Workflow succeeded!")
})
)
// A worker layer that processes items from the DurableQueue.
const ApiWorker = DurableQueue.worker(
ApiQueue,
Effect.fnUntraced(function* ({ id }) {
yield* Effect.log(`Worker processing API call with id: ${id}`)
}),
{ concurrency: 5 } // process up to 5 items concurrently
)

Every public API of DurableDeferred. All completion APIs (done, succeed, fail, failCause) and the token helpers tokenFromExecutionId / tokenFromPayload are dual.

Creates a named durable deferred with optional success and error schemas. Defaults are Schema.Void (success) and Schema.Never (error).

import { Schema } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const Signal = DurableDeferred.make("signal") // success: void, error: never
const Result = DurableDeferred.make("result", {
success: Schema.Number,
error: Schema.String
})
// => DurableDeferred<Schema.Number, Schema.String>

Suspends the current workflow until the deferred has a persisted completion, then resumes with the decoded success value (or fails with the error).

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () {
const value = yield* DurableDeferred.await(Result)
// => suspends; on resume, value: number
})

Runs an effect and records its Exit into the deferred, resuming waiters. Dual. See the common-case section above for both forms.

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
Effect.fail("boom").pipe(DurableDeferred.into(Result))
// => records Exit.fail("boom") into Result, then fails with "boom"

Durable race over a non-empty array of effects; persists the first result under raceAll/<name>. See above for a runnable example.

Completes the deferred identified by a token with an arbitrary Exit. The low-level primitive behind succeed/fail/failCause. Dual.

import { Exit } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.done(Result, { token, exit: Exit.succeed(200) })
// => Effect<void>, requires WorkflowEngine

Completes the deferred with a successful value (shorthand for done(..., { exit: Exit.succeed(value) })). Dual.

import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.succeed(Result, { token, value: 200 })
// => Effect<void>

Completes the deferred with a typed failure (Exit.fail(error)). Dual.

import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.fail(Result, { token, error: "declined" })
// => Effect<void>

Completes the deferred with a full Cause (defects, multiple failures, interruption). Dual.

import { Cause } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
DurableDeferred.failCause(Result, { token, cause: Cause.fail("declined") })
// => Effect<void>

Builds a Token for the deferred using the current workflow instance’s name and execution id. Hand this to an external system so it can complete the deferred later.

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () {
const t = yield* DurableDeferred.token(Result)
// => Token (branded string), requires WorkflowInstance
})

Builds a token from an explicit workflow + executionId (no running instance needed). Dual; returns a Token synchronously.

import { DurableDeferred } from "effect/unstable/workflow"
const t = DurableDeferred.tokenFromExecutionId(Result, {
workflow: MyWorkflow,
executionId: "order-123"
})
// => Token

Builds a token by deriving the execution id from a workflow payload (using the workflow’s own idempotencyKey). Dual; returns an Effect<Token>.

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () {
const t = yield* DurableDeferred.tokenFromPayload(Result, {
workflow: MyWorkflow,
payload: { id: "order-123" }
})
// => Token
})

A member of each deferred: an Effect that yields an attempt-scoped copy named <name>/<attempt>, so a retried activity completes a fresh wait point instead of clobbering a previous attempt’s completion.

import { Effect } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
const program = Effect.gen(function* () {
const scoped = yield* Result.withActivityAttempt
// => DurableDeferred named "result/<currentAttempt>"
})

Token is a branded string (Brand.Branded<string, TokenTypeId>) plus a matching Schema for decoding/encoding in payloads. TokenTypeId is the runtime/type-level brand identifier.

import { Schema } from "effect"
import { DurableDeferred } from "effect/unstable/workflow"
// Embed a token in an API payload schema:
const CallbackBody = Schema.Struct({ token: DurableDeferred.Token })
// => Schema with field `token: Token`

A Schema.Class holding a token’s three components (workflowName, executionId, deferredName) with helpers to convert to/from the encoded string form.

import { DurableDeferred } from "effect/unstable/workflow"
const parsed = DurableDeferred.TokenParsed.fromString(token)
// => TokenParsed { workflowName, executionId, deferredName }
parsed.asToken // => Token (re-encode)
DurableDeferred.TokenParsed.encode(parsed) // => string (via FromString schema)
// TokenParsed.FromString is the underlying Schema<string -> TokenParsed>

Type-level: DurableDeferred, Any, AnyWithProps

Section titled “Type-level: DurableDeferred, Any, AnyWithProps”

DurableDeferred<Success, Error> is the full model. Any is the type-erased shape (just name + the type id) for APIs that only need identity; AnyWithProps additionally exposes successSchema / errorSchema / exitSchema.

import { DurableDeferred } from "effect/unstable/workflow"
const accept = (d: DurableDeferred.Any) => d.name
// => accepts any deferred regardless of its Success/Error types

Defines a queue: a name, a payload schema (struct fields or a Schema), an idempotencyKey derived from the payload, and optional success / error schemas (default Schema.Void / Schema.Never). The idempotency key becomes the persisted queue item id.

import { Schema } from "effect"
import { DurableQueue } from "effect/unstable/workflow"
const EmailQueue = DurableQueue.make({
name: "EmailQueue",
payload: { to: Schema.String, subject: Schema.String },
success: Schema.Boolean,
idempotencyKey: ({ to, subject }) => `${to}:${subject}`
})
// => DurableQueue<Schema.Struct<...>, Schema.Boolean, Schema.Never>

Enqueues an item and awaits the worker’s result, suspending the producing workflow in between. Optionally retries transient PersistedQueueErrors from the backing store via retrySchedule. Requires WorkflowEngine, WorkflowInstance, and PersistedQueue.PersistedQueueFactory.

import { Effect, Schedule } from "effect"
import { DurableQueue } from "effect/unstable/workflow"
const program = Effect.gen(function* () {
const sent = yield* DurableQueue.process(
EmailQueue,
{ to: "a@b.com", subject: "hi" },
{ retrySchedule: Schedule.exponential("500 millis") }
)
// => suspends; on resume, sent: boolean (the worker's result)
})

Builds a worker as an Effect<never>: it loops, takes items, runs the handler f, and records each Exit back through the item’s token. concurrency (default 1) processes that many items in parallel. Use this when you want to fork the worker yourself.

import { Effect } from "effect"
import { DurableQueue } from "effect/unstable/workflow"
const worker = DurableQueue.makeWorker(
EmailQueue,
Effect.fnUntraced(function* ({ to, subject }) {
yield* Effect.log(`sending ${subject} to ${to}`)
return true
}),
{ concurrency: 10 }
)
// => Effect<never> — run/fork it inside a scope

The layer form of makeWorker: Layer.effectDiscard that forkScopeds the worker. This is the usual way to wire a worker into your application alongside the producing workflow’s layer.

import { Layer } from "effect"
import { DurableQueue } from "effect/unstable/workflow"
const EmailWorker = DurableQueue.worker(
EmailQueue,
(payload) => Effect.as(Effect.log(`sending to ${payload.to}`), true),
{ concurrency: 10 }
)
// => Layer<never, never, WorkflowEngine | PersistedQueueFactory | ...>
// Wire producer + worker together:
const AppLayer = Layer.mergeAll(MyWorkflowLayer, EmailWorker)

DurableQueue<Payload, Success, Error> is the queue model (carrying payloadSchema, idempotencyKey, and the internal deferred). TypeId is the runtime/type-level identifier branding queue values.

import { DurableQueue } from "effect/unstable/workflow"
const queueName = (q: DurableQueue.DurableQueue<any, any, any>) => q.name
// => reads the queue's name regardless of its schemas

The workflow-facing helper covered above: in-process activity for short waits, durable scheduled clock for long ones. inMemoryThreshold defaults to 60s.

import { DurableClock } from "effect/unstable/workflow"
DurableClock.sleep({ name: "retry-backoff", duration: "10 minutes" })
// => Effect<void, never, WorkflowEngine | WorkflowInstance>

Constructs the DurableClock model directly: a name, a normalized Duration, and a DurableDeferred (named DurableClock/<name>) completed when the timer wakes. sleep uses this internally; you rarely call it yourself.

import { DurableClock } from "effect/unstable/workflow"
const clock = DurableClock.make({ name: "deadline", duration: "1 day" })
// => { name: "deadline", duration: Duration, deferred: DurableDeferred<Void> }

The model interface: { name, duration, deferred }. Useful when wiring a custom engine that schedules clocks.

import { DurableClock } from "effect/unstable/workflow"
const describe = (c: DurableClock.DurableClock) => `${c.name}: ${c.duration}`
// => formats a clock's name and duration

  • These primitives only run with a workflow context. Learn how to define workflows and activities in Workflows & Activities.
  • The engine that persists clocks, deferreds, and queue state — and how to execute, poll, resume, and interrupt workflows — is covered in Engine & Execution.
  • DurableQueue is backed by a PersistedQueue; choose a memory, SQL, or Redis store there.