PersistedQueue
PersistedQueue is a named, durable queue of schema-encoded values. Workers
offer values onto the queue, and take processes one value at a time inside
a scoped window: when the handler succeeds the element is acknowledged, and when
it fails, is interrupted, or loses its backing-store lock the element is
re-delivered until maxAttempts is reached (default 10).
Use it for durable handoffs, background jobs, outbox-style integrations, and any workload where failed work should survive fiber failures, process restarts, or be shared across multiple workers via Redis or SQL.
import { PersistedQueue } from "effect/unstable/persistence"Two facts shape every example on this page:
- Delivery is at-least-once. A handler may legitimately see the same element more than once, so handlers must be idempotent.
- Ordering is not guaranteed. Retries, lock expiration, polling, and multiple workers can move entries behind newer work. Do not rely on strict FIFO.
The architecture has three layers:
PersistedQueue<A, R>— the typed, schema-aware queue you callofferandtakeon.PersistedQueueFactory— a service that builds named queues from schemas. Provided byPersistedQueue.layer.PersistedQueueStore— the low-level backing store (memory, SQL, or Redis) that actually persists elements, leases them to workers, and acks/retries.
Defining and using a queue
Section titled “Defining and using a queue”Provide a store layer (layerStoreMemory here) and PersistedQueue.layer (which
turns the store into a PersistedQueueFactory), then make a queue from a
Schema describing the payload.
import { Effect, Schema } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
// The payload schema. Values are JSON-encoded with this schema on offer and// decoded with it on take, so its encode/decode services flow into the queue's R.const EmailJob = Schema.Struct({ to: Schema.String, subject: Schema.String})
const program = Effect.gen(function* () { // Build a named queue. Needs PersistedQueueFactory in context. const queue = yield* PersistedQueue.make({ name: "emails", schema: EmailJob })
// offer returns the generated id of the enqueued element. const id = yield* queue.offer({ to: "ada@example.com", subject: "Welcome" }) console.log(id) // => "f47ac10b-58cc-4372-a567-0e02b2c3d479" (a UUID)
// take processes ONE value inside a scoped window. The handler runs with the // decoded value plus metadata { id, attempts }. Success acks the element; // failure/interruption re-delivers it (up to maxAttempts). yield* queue.take((job, { id, attempts }) => Effect.log(`sending ${job.subject} to ${job.to} (id=${id}, attempt ${attempts})`) )})
program.pipe( Effect.provide([ PersistedQueue.layer, // PersistedQueueFactory from PersistedQueueStore PersistedQueue.layerStoreMemory // process-local, volatile store ]), Effect.runFork)A real worker loops on take so it keeps draining the queue. Run the loop in a
forked fiber while the rest of your app produces work:
import { Effect } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const runWorker = Effect.fn("runWorker")(function* ( queue: PersistedQueue.PersistedQueue<{ to: string; subject: string }>) { // take blocks until an element is available, then processes exactly one. // Wrap in forever to keep draining; each iteration is its own scoped window. yield* queue .take((job) => Effect.log(`sending to ${job.to}: ${job.subject}`)) .pipe(Effect.forever)})Idempotency with custom ids
Section titled “Idempotency with custom ids”By default offer assigns a random UUID. Pass a stable id to deduplicate:
offering an element whose id already exists in the queue is a no-op, so re-running
the same logical work does not enqueue a duplicate.
import { Effect, Schema } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const Payment = Schema.Struct({ orderId: Schema.String, amount: Schema.Number })
const enqueuePayment = Effect.fn("enqueuePayment")(function* ( order: { orderId: string; amount: number }) { const queue = yield* PersistedQueue.make({ name: "payments", schema: Payment })
// Use a domain-stable id so retried producers don't double-charge. const id = yield* queue.offer(order, { id: `payment:${order.orderId}` }) // Offering the same id again is ignored — the element is not added twice. yield* queue.offer(order, { id: `payment:${order.orderId}` }) return id // => "payment:order-123"})The attempts value in the take metadata lets a handler detect that it is
seeing a retry, e.g. to log, alert, or take a different code path after the first
failure:
queue.take((order, { id, attempts }) => attempts === 0 ? chargeCard(order) : Effect.logWarning(`retrying ${id}, attempt ${attempts}`).pipe( Effect.andThen(chargeCard(order)) ))At-least-once and retries
Section titled “At-least-once and retries”Tune the attempt budget per take:
import { Effect } from "effect"
// Give this job up to 3 attempts instead of the default 10.queue.take((job) => process(job), { maxAttempts: 3 })
declare const process: (job: unknown) => Effect.Effect<void>Note the interrupt behavior is special: a purely-interrupted take is re-delivered
without incrementing attempts, so graceful shutdown does not burn a retry.
Only genuine failures count against maxAttempts.
Choosing a backing store and tuning leases
Section titled “Choosing a backing store and tuning leases”The queue is store-agnostic; pick a PersistedQueueStore layer to control
durability and sharing:
| Store | Layer | Durability | Sharing |
|---|---|---|---|
| Memory | layerStoreMemory | Volatile, lost on restart | Process-local only |
| SQL | layerStoreSql(options?) | Durable rows in a table | Multiple workers via row locks |
| Redis | layerStoreRedis(options?) | Durable lists/hashes | Multiple workers via Lua-script leases |
SQL and Redis stores hand each element to one worker under a lease and refresh
that lease while the handler runs. If the worker dies or the handler runs longer
than lockExpiration, the lease expires and another worker may pick the element
up. Tune the three durations to your expected processing time:
pollInterval— how often a worker checks for new work (default1 second).lockRefreshInterval— how often a held lease is renewed (default30 seconds).lockExpiration— how long a lease is valid before it is reclaimable (default90 secondsfor Redis,2 minutesfor SQL).
lockExpiration must comfortably exceed your worst-case handler duration, or
slow-but-healthy work will be re-delivered. lockRefreshInterval should be well
below lockExpiration so a live worker keeps its lease.
import { Effect, Layer } from "effect"import { SqlClient } from "effect/unstable/sql"import { PersistedQueue } from "effect/unstable/persistence"
// layerStoreSql needs a SqlClient (from a driver layer such as// PgClient.layer / SqliteClient.layer). It creates the table + indexes itself.const QueueLive = PersistedQueue.layer.pipe( Layer.provide( PersistedQueue.layerStoreSql({ tableName: "effect_queue", // default lockExpiration: "5 minutes" // raise for long-running jobs }) ))
declare const SqlLive: Layer.Layer<SqlClient.SqlClient>
const Live = QueueLive.pipe(Layer.provide(SqlLive))import { Layer } from "effect"import { Redis } from "effect/unstable/persistence"import { PersistedQueue } from "effect/unstable/persistence"
// layerStoreRedis needs a Redis service (build one with Redis.make over your// client's send function). Keys are namespaced under `prefix`.const QueueLive = PersistedQueue.layer.pipe( Layer.provide( PersistedQueue.layerStoreRedis({ prefix: "effectq:", // default pollInterval: "500 millis" }) ))
declare const RedisLive: Layer.Layer<Redis.Redis>
const Live = QueueLive.pipe(Layer.provide(RedisLive))API reference
Section titled “API reference”PersistedQueue<A, R>
Section titled “PersistedQueue<A, R>”The typed queue interface. A is the decoded payload type; R is the
schema’s encode/decode service requirement. Two methods:
offer(value, { id? })returnsEffect<string, PersistedQueueError | SchemaError, R>— enqueuesvalueand resolves to its id (existing ids are not re-added).take(f, { maxAttempts? })wheref(value, { id, attempts })runs your handler; returnsEffect<XA, XE | PersistedQueueError | SchemaError, R | XR>.
import { Effect, Schema } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const queue = yield* PersistedQueue.make({ name: "jobs", schema: Schema.Struct({ n: Schema.Number })})
const id = yield* queue.offer({ n: 1 }) // => "<uuid>"yield* queue.take((job, meta) => Effect.log(`${job.n} @ attempt ${meta.attempts}`))PersistedQueueFactory
Section titled “PersistedQueueFactory”Context.Service whose make({ name, schema }) builds a PersistedQueue. The
schema’s EncodingServices | DecodingServices flow into the resulting queue’s R.
Provided by layer.
import { Effect, Schema } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const make = Effect.gen(function* () { const factory = yield* PersistedQueue.PersistedQueueFactory return yield* factory.make({ name: "audit", schema: Schema.String })})// requires PersistedQueueFactoryAccessor that calls PersistedQueueFactory.make. The idiomatic way to construct a
queue; requires PersistedQueueFactory in context.
import { Schema } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const queue = yield* PersistedQueue.make({ name: "emails", schema: Schema.Struct({ to: Schema.String })})// => PersistedQueue<{ to: string }, never>, requires PersistedQueueFactorymakeFactory
Section titled “makeFactory”An Effect that builds a PersistedQueueFactory from the current
PersistedQueueStore. Use it when you want to construct the factory manually
rather than through layer.
import { Effect, Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const FactoryLive = Layer.effect( PersistedQueue.PersistedQueueFactory, PersistedQueue.makeFactory) // requires PersistedQueueStore — this is exactly what `layer` doesLayer<PersistedQueueFactory, never, PersistedQueueStore>. Provides the factory
from whichever store layer you supply.
import { Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const Live = PersistedQueue.layer.pipe( Layer.provide(PersistedQueue.layerStoreMemory))// => Layer<PersistedQueueFactory>PersistedQueueError
Section titled “PersistedQueueError”Schema.ErrorClass with _tag: "PersistedQueueError", a message: string, and an
optional cause. Raised by store operations (failed offer/take against the
backend). Catch it with Effect.catchTag.
import { Effect } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
queue .offer({ to: "x" }) .pipe( Effect.catchTag("PersistedQueueError", (err) => Effect.logError(err.message) ) )// => recovers from backend offer failuresIt also carries the brand [ErrorTypeId] for runtime guards (see below).
TypeId / ErrorTypeId
Section titled “TypeId / ErrorTypeId”String brands used to tag values at runtime. TypeId
("~effect/persistence/PersistedQueue") brands PersistedQueue values;
ErrorTypeId brands PersistedQueueError. You rarely reference these directly.
import { PersistedQueue } from "effect/unstable/persistence"
PersistedQueue.TypeId // => "~effect/persistence/PersistedQueue"PersistedQueue.ErrorTypeId // => "~@effect/experimental/PersistedQueue/PersistedQueueError"PersistedQueueStore
Section titled “PersistedQueueStore”Context.Service defining the low-level backend. Most users provide one of the
layerStore* layers rather than implementing it. Two operations:
offer({ name, id, element, isCustomId })— persists an already-encoded element.take({ name, maxAttempts })— returns{ id, attempts, element }inside aScope; the scope’s finalizer acks on success and retries on failure based on the processing exit.
import { Effect } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const drain = Effect.gen(function* () { const store = yield* PersistedQueue.PersistedQueueStore // take is scoped: closing the scope acks (success) or requeues (failure). const item = yield* store.take({ name: "jobs", maxAttempts: 10 }) console.log(item) // => { id: "...", attempts: 0, element: <encoded> }}).pipe(Effect.scoped)layerStoreMemory
Section titled “layerStoreMemory”Layer<PersistedQueueStore>. An in-memory, process-local and volatile store.
Failed takes are requeued until maxAttempts. Ideal for tests and single-process
apps that do not need durability.
import { Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const Live = PersistedQueue.layer.pipe( Layer.provide(PersistedQueue.layerStoreMemory))// => no external dependencies; data lives only for the process lifetimelayerStoreSql
Section titled “layerStoreSql”(options?) => Layer<PersistedQueueStore, SqlError, SqlClient>. A durable
SQL-backed store. It creates the queue table and supporting indexes, acquires rows
with per-worker locks (FOR UPDATE SKIP LOCKED where supported), refreshes active
locks while takes run, and completes or retries rows by exit. Supports pg, mysql,
mssql, and sqlite dialects.
Options: tableName (default "effect_queue"), pollInterval (default 1s),
lockRefreshInterval (default 30s), lockExpiration (default 2 minutes).
import { Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = PersistedQueue.layerStoreSql({ tableName: "jobs_queue", lockExpiration: "10 minutes"})// => Layer<PersistedQueueStore, SqlError, SqlClient>makeStoreSql
Section titled “makeStoreSql”Scoped Effect form of the SQL store:
(options?) => Effect<PersistedQueueStore["Service"], SqlError, SqlClient | Scope>.
Use it when you need the store value directly (e.g. composing your own layer or
running background fibers in a custom scope). layerStoreSql is makeStoreSql
wrapped in Layer.effect.
import { Effect, Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = Layer.effect( PersistedQueue.PersistedQueueStore, PersistedQueue.makeStoreSql({ pollInterval: "250 millis" }))layerStoreRedis
Section titled “layerStoreRedis”(options?) => Layer<PersistedQueueStore, never, Redis>. A durable Redis-backed
store using lists and hashes with per-element worker locks driven by cached Lua
scripts. It periodically refreshes locks for in-flight items and moves exhausted
items (past maxAttempts) to a :failed list. Requires a Redis service.
Options: prefix (default "effectq:"), pollInterval (default 1s),
lockRefreshInterval (default 30s), lockExpiration (default 90s).
import { Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = PersistedQueue.layerStoreRedis({ prefix: "myapp:queue:", lockExpiration: "2 minutes"})// => Layer<PersistedQueueStore, never, Redis>makeStoreRedis
Section titled “makeStoreRedis”Scoped Effect form of the Redis store:
(options?) => Effect<PersistedQueueStore["Service"], never, Redis | Scope>.
layerStoreRedis is makeStoreRedis wrapped in Layer.effect.
import { Layer } from "effect"import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = Layer.effect( PersistedQueue.PersistedQueueStore, PersistedQueue.makeStoreRedis({ prefix: "effectq:" }))