Skip to content

PersistedQueue

PersistedQueue is a named, durable queue of schema-encoded values. Workers offer values onto the queue, and take processes one value at a time inside a scoped window: when the handler succeeds the element is acknowledged, and when it fails, is interrupted, or loses its backing-store lock the element is re-delivered until maxAttempts is reached (default 10).

Use it for durable handoffs, background jobs, outbox-style integrations, and any workload where failed work should survive fiber failures, process restarts, or be shared across multiple workers via Redis or SQL.

import { PersistedQueue } from "effect/unstable/persistence"

Two facts shape every example on this page:

  • Delivery is at-least-once. A handler may legitimately see the same element more than once, so handlers must be idempotent.
  • Ordering is not guaranteed. Retries, lock expiration, polling, and multiple workers can move entries behind newer work. Do not rely on strict FIFO.

The architecture has three layers:

  • PersistedQueue<A, R> — the typed, schema-aware queue you call offer and take on.
  • PersistedQueueFactory — a service that builds named queues from schemas. Provided by PersistedQueue.layer.
  • PersistedQueueStore — the low-level backing store (memory, SQL, or Redis) that actually persists elements, leases them to workers, and acks/retries.

Provide a store layer (layerStoreMemory here) and PersistedQueue.layer (which turns the store into a PersistedQueueFactory), then make a queue from a Schema describing the payload.

import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
// The payload schema. Values are JSON-encoded with this schema on offer and
// decoded with it on take, so its encode/decode services flow into the queue's R.
const EmailJob = Schema.Struct({
to: Schema.String,
subject: Schema.String
})
const program = Effect.gen(function* () {
// Build a named queue. Needs PersistedQueueFactory in context.
const queue = yield* PersistedQueue.make({
name: "emails",
schema: EmailJob
})
// offer returns the generated id of the enqueued element.
const id = yield* queue.offer({ to: "ada@example.com", subject: "Welcome" })
console.log(id) // => "f47ac10b-58cc-4372-a567-0e02b2c3d479" (a UUID)
// take processes ONE value inside a scoped window. The handler runs with the
// decoded value plus metadata { id, attempts }. Success acks the element;
// failure/interruption re-delivers it (up to maxAttempts).
yield* queue.take((job, { id, attempts }) =>
Effect.log(`sending ${job.subject} to ${job.to} (id=${id}, attempt ${attempts})`)
)
})
program.pipe(
Effect.provide([
PersistedQueue.layer, // PersistedQueueFactory from PersistedQueueStore
PersistedQueue.layerStoreMemory // process-local, volatile store
]),
Effect.runFork
)

A real worker loops on take so it keeps draining the queue. Run the loop in a forked fiber while the rest of your app produces work:

import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const runWorker = Effect.fn("runWorker")(function* (
queue: PersistedQueue.PersistedQueue<{ to: string; subject: string }>
) {
// take blocks until an element is available, then processes exactly one.
// Wrap in forever to keep draining; each iteration is its own scoped window.
yield* queue
.take((job) => Effect.log(`sending to ${job.to}: ${job.subject}`))
.pipe(Effect.forever)
})

By default offer assigns a random UUID. Pass a stable id to deduplicate: offering an element whose id already exists in the queue is a no-op, so re-running the same logical work does not enqueue a duplicate.

import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const Payment = Schema.Struct({ orderId: Schema.String, amount: Schema.Number })
const enqueuePayment = Effect.fn("enqueuePayment")(function* (
order: { orderId: string; amount: number }
) {
const queue = yield* PersistedQueue.make({ name: "payments", schema: Payment })
// Use a domain-stable id so retried producers don't double-charge.
const id = yield* queue.offer(order, { id: `payment:${order.orderId}` })
// Offering the same id again is ignored — the element is not added twice.
yield* queue.offer(order, { id: `payment:${order.orderId}` })
return id // => "payment:order-123"
})

The attempts value in the take metadata lets a handler detect that it is seeing a retry, e.g. to log, alert, or take a different code path after the first failure:

queue.take((order, { id, attempts }) =>
attempts === 0
? chargeCard(order)
: Effect.logWarning(`retrying ${id}, attempt ${attempts}`).pipe(
Effect.andThen(chargeCard(order))
)
)

Tune the attempt budget per take:

import { Effect } from "effect"
// Give this job up to 3 attempts instead of the default 10.
queue.take((job) => process(job), { maxAttempts: 3 })
declare const process: (job: unknown) => Effect.Effect<void>

Note the interrupt behavior is special: a purely-interrupted take is re-delivered without incrementing attempts, so graceful shutdown does not burn a retry. Only genuine failures count against maxAttempts.

Choosing a backing store and tuning leases

Section titled “Choosing a backing store and tuning leases”

The queue is store-agnostic; pick a PersistedQueueStore layer to control durability and sharing:

StoreLayerDurabilitySharing
MemorylayerStoreMemoryVolatile, lost on restartProcess-local only
SQLlayerStoreSql(options?)Durable rows in a tableMultiple workers via row locks
RedislayerStoreRedis(options?)Durable lists/hashesMultiple workers via Lua-script leases

SQL and Redis stores hand each element to one worker under a lease and refresh that lease while the handler runs. If the worker dies or the handler runs longer than lockExpiration, the lease expires and another worker may pick the element up. Tune the three durations to your expected processing time:

  • pollInterval — how often a worker checks for new work (default 1 second).
  • lockRefreshInterval — how often a held lease is renewed (default 30 seconds).
  • lockExpiration — how long a lease is valid before it is reclaimable (default 90 seconds for Redis, 2 minutes for SQL).

lockExpiration must comfortably exceed your worst-case handler duration, or slow-but-healthy work will be re-delivered. lockRefreshInterval should be well below lockExpiration so a live worker keeps its lease.

import { Effect, Layer } from "effect"
import { SqlClient } from "effect/unstable/sql"
import { PersistedQueue } from "effect/unstable/persistence"
// layerStoreSql needs a SqlClient (from a driver layer such as
// PgClient.layer / SqliteClient.layer). It creates the table + indexes itself.
const QueueLive = PersistedQueue.layer.pipe(
Layer.provide(
PersistedQueue.layerStoreSql({
tableName: "effect_queue", // default
lockExpiration: "5 minutes" // raise for long-running jobs
})
)
)
declare const SqlLive: Layer.Layer<SqlClient.SqlClient>
const Live = QueueLive.pipe(Layer.provide(SqlLive))

The typed queue interface. A is the decoded payload type; R is the schema’s encode/decode service requirement. Two methods:

  • offer(value, { id? }) returns Effect<string, PersistedQueueError | SchemaError, R> — enqueues value and resolves to its id (existing ids are not re-added).
  • take(f, { maxAttempts? }) where f(value, { id, attempts }) runs your handler; returns Effect<XA, XE | PersistedQueueError | SchemaError, R | XR>.
import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const queue = yield* PersistedQueue.make({
name: "jobs",
schema: Schema.Struct({ n: Schema.Number })
})
const id = yield* queue.offer({ n: 1 }) // => "<uuid>"
yield* queue.take((job, meta) => Effect.log(`${job.n} @ attempt ${meta.attempts}`))

Context.Service whose make({ name, schema }) builds a PersistedQueue. The schema’s EncodingServices | DecodingServices flow into the resulting queue’s R. Provided by layer.

import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const make = Effect.gen(function* () {
const factory = yield* PersistedQueue.PersistedQueueFactory
return yield* factory.make({ name: "audit", schema: Schema.String })
})
// requires PersistedQueueFactory

Accessor that calls PersistedQueueFactory.make. The idiomatic way to construct a queue; requires PersistedQueueFactory in context.

import { Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const queue = yield* PersistedQueue.make({
name: "emails",
schema: Schema.Struct({ to: Schema.String })
})
// => PersistedQueue<{ to: string }, never>, requires PersistedQueueFactory

An Effect that builds a PersistedQueueFactory from the current PersistedQueueStore. Use it when you want to construct the factory manually rather than through layer.

import { Effect, Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const FactoryLive = Layer.effect(
PersistedQueue.PersistedQueueFactory,
PersistedQueue.makeFactory
) // requires PersistedQueueStore — this is exactly what `layer` does

Layer<PersistedQueueFactory, never, PersistedQueueStore>. Provides the factory from whichever store layer you supply.

import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const Live = PersistedQueue.layer.pipe(
Layer.provide(PersistedQueue.layerStoreMemory)
)
// => Layer<PersistedQueueFactory>

Schema.ErrorClass with _tag: "PersistedQueueError", a message: string, and an optional cause. Raised by store operations (failed offer/take against the backend). Catch it with Effect.catchTag.

import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
queue
.offer({ to: "x" })
.pipe(
Effect.catchTag("PersistedQueueError", (err) =>
Effect.logError(err.message)
)
)
// => recovers from backend offer failures

It also carries the brand [ErrorTypeId] for runtime guards (see below).

String brands used to tag values at runtime. TypeId ("~effect/persistence/PersistedQueue") brands PersistedQueue values; ErrorTypeId brands PersistedQueueError. You rarely reference these directly.

import { PersistedQueue } from "effect/unstable/persistence"
PersistedQueue.TypeId // => "~effect/persistence/PersistedQueue"
PersistedQueue.ErrorTypeId // => "~@effect/experimental/PersistedQueue/PersistedQueueError"

Context.Service defining the low-level backend. Most users provide one of the layerStore* layers rather than implementing it. Two operations:

  • offer({ name, id, element, isCustomId }) — persists an already-encoded element.
  • take({ name, maxAttempts }) — returns { id, attempts, element } inside a Scope; the scope’s finalizer acks on success and retries on failure based on the processing exit.
import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const drain = Effect.gen(function* () {
const store = yield* PersistedQueue.PersistedQueueStore
// take is scoped: closing the scope acks (success) or requeues (failure).
const item = yield* store.take({ name: "jobs", maxAttempts: 10 })
console.log(item) // => { id: "...", attempts: 0, element: <encoded> }
}).pipe(Effect.scoped)

Layer<PersistedQueueStore>. An in-memory, process-local and volatile store. Failed takes are requeued until maxAttempts. Ideal for tests and single-process apps that do not need durability.

import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const Live = PersistedQueue.layer.pipe(
Layer.provide(PersistedQueue.layerStoreMemory)
)
// => no external dependencies; data lives only for the process lifetime

(options?) => Layer<PersistedQueueStore, SqlError, SqlClient>. A durable SQL-backed store. It creates the queue table and supporting indexes, acquires rows with per-worker locks (FOR UPDATE SKIP LOCKED where supported), refreshes active locks while takes run, and completes or retries rows by exit. Supports pg, mysql, mssql, and sqlite dialects.

Options: tableName (default "effect_queue"), pollInterval (default 1s), lockRefreshInterval (default 30s), lockExpiration (default 2 minutes).

import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = PersistedQueue.layerStoreSql({
tableName: "jobs_queue",
lockExpiration: "10 minutes"
})
// => Layer<PersistedQueueStore, SqlError, SqlClient>

Scoped Effect form of the SQL store: (options?) => Effect<PersistedQueueStore["Service"], SqlError, SqlClient | Scope>. Use it when you need the store value directly (e.g. composing your own layer or running background fibers in a custom scope). layerStoreSql is makeStoreSql wrapped in Layer.effect.

import { Effect, Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = Layer.effect(
PersistedQueue.PersistedQueueStore,
PersistedQueue.makeStoreSql({ pollInterval: "250 millis" })
)

(options?) => Layer<PersistedQueueStore, never, Redis>. A durable Redis-backed store using lists and hashes with per-element worker locks driven by cached Lua scripts. It periodically refreshes locks for in-flight items and moves exhausted items (past maxAttempts) to a :failed list. Requires a Redis service.

Options: prefix (default "effectq:"), pollInterval (default 1s), lockRefreshInterval (default 30s), lockExpiration (default 90s).

import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = PersistedQueue.layerStoreRedis({
prefix: "myapp:queue:",
lockExpiration: "2 minutes"
})
// => Layer<PersistedQueueStore, never, Redis>

Scoped Effect form of the Redis store: (options?) => Effect<PersistedQueueStore["Service"], never, Redis | Scope>. layerStoreRedis is makeStoreRedis wrapped in Layer.effect.

import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"
const StoreLive = Layer.effect(
PersistedQueue.PersistedQueueStore,
PersistedQueue.makeStoreRedis({ prefix: "effectq:" })
)