# PersistedQueue

`PersistedQueue` is a **named, durable queue of schema-encoded values**. Workers
`offer` values onto the queue, and `take` processes **one** value at a time inside
a scoped window: when the handler succeeds the element is acknowledged, and when
it fails, is interrupted, or loses its backing-store lock the element is
re-delivered until `maxAttempts` is reached (default `10`).

Use it for durable handoffs, background jobs, outbox-style integrations, and any
workload where failed work should survive fiber failures, process restarts, or be
shared across multiple workers via Redis or SQL.

```ts
import { PersistedQueue } from "effect/unstable/persistence"
```

Two facts shape every example on this page:

- **Delivery is at-least-once.** A handler may legitimately see the same element
  more than once, so handlers must be idempotent.
- **Ordering is not guaranteed.** Retries, lock expiration, polling, and multiple
  workers can move entries behind newer work. Do not rely on strict FIFO.

The architecture has three layers:

- **`PersistedQueue<A, R>`** — the typed, schema-aware queue you call `offer` and
  `take` on.
- **`PersistedQueueFactory`** — a service that builds named queues from schemas.
  Provided by `PersistedQueue.layer`.
- **`PersistedQueueStore`** — the low-level backing store (memory, SQL, or Redis)
  that actually persists elements, leases them to workers, and acks/retries.

## Defining and using a queue

Provide a store layer (`layerStoreMemory` here) and `PersistedQueue.layer` (which
turns the store into a `PersistedQueueFactory`), then `make` a queue from a
`Schema` describing the payload.

```ts
import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

// The payload schema. Values are JSON-encoded with this schema on offer and
// decoded with it on take, so its encode/decode services flow into the queue's R.
const EmailJob = Schema.Struct({
  to: Schema.String,
  subject: Schema.String
})

const program = Effect.gen(function* () {
  // Build a named queue. Needs PersistedQueueFactory in context.
  const queue = yield* PersistedQueue.make({
    name: "emails",
    schema: EmailJob
  })

  // offer returns the generated id of the enqueued element.
  const id = yield* queue.offer({ to: "ada@example.com", subject: "Welcome" })
  console.log(id) // => "f47ac10b-58cc-4372-a567-0e02b2c3d479" (a UUID)

  // take processes ONE value inside a scoped window. The handler runs with the
  // decoded value plus metadata { id, attempts }. Success acks the element;
  // failure/interruption re-delivers it (up to maxAttempts).
  yield* queue.take((job, { id, attempts }) =>
    Effect.log(`sending ${job.subject} to ${job.to} (id=${id}, attempt ${attempts})`)
  )
})

program.pipe(
  Effect.provide([
    PersistedQueue.layer, // PersistedQueueFactory from PersistedQueueStore
    PersistedQueue.layerStoreMemory // process-local, volatile store
  ]),
  Effect.runFork
)
```

A real worker loops on `take` so it keeps draining the queue. Run the loop in a
forked fiber while the rest of your app produces work:

```ts
import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const runWorker = Effect.fn("runWorker")(function* (
  queue: PersistedQueue.PersistedQueue<{ to: string; subject: string }>
) {
  // take blocks until an element is available, then processes exactly one.
  // Wrap in forever to keep draining; each iteration is its own scoped window.
  yield* queue
    .take((job) => Effect.log(`sending to ${job.to}: ${job.subject}`))
    .pipe(Effect.forever)
})
```
**One element per take:** Each `take` call acquires a single element, runs your handler, then acks or
  retries based on the handler's exit. To process work continuously, run `take`
  in a loop (`Effect.forever`); to process concurrently, fork several such loops.

## Idempotency with custom ids

By default `offer` assigns a random UUID. Pass a stable `id` to **deduplicate**:
offering an element whose id already exists in the queue is a no-op, so re-running
the same logical work does not enqueue a duplicate.

```ts
import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const Payment = Schema.Struct({ orderId: Schema.String, amount: Schema.Number })

const enqueuePayment = Effect.fn("enqueuePayment")(function* (
  order: { orderId: string; amount: number }
) {
  const queue = yield* PersistedQueue.make({ name: "payments", schema: Payment })

  // Use a domain-stable id so retried producers don't double-charge.
  const id = yield* queue.offer(order, { id: `payment:${order.orderId}` })
  // Offering the same id again is ignored — the element is not added twice.
  yield* queue.offer(order, { id: `payment:${order.orderId}` })
  return id // => "payment:order-123"
})
```

The `attempts` value in the `take` metadata lets a handler detect that it is
seeing a retry, e.g. to log, alert, or take a different code path after the first
failure:

```ts
queue.take((order, { id, attempts }) =>
  attempts === 0
    ? chargeCard(order)
    : Effect.logWarning(`retrying ${id}, attempt ${attempts}`).pipe(
        Effect.andThen(chargeCard(order))
      )
)
```
**Choose collision-free ids:** Stores enforce uniqueness at the queue, prefix, or table level. Pick ids that
  cannot collide across different logical work items, and remember that changing
  a queue `name`, schema, Redis `prefix`, SQL table, or id format is a persistence
  migration — old entries may decode differently, stop being visible, or collide.

## At-least-once and retries
**Design handlers for re-delivery:** A `take` handler that **fails**, is **interrupted**, or whose worker **loses its
  backing-store lock** (e.g. the lease expired because processing took too long)
  will see the element delivered again. Re-delivery continues until `attempts`
  reaches `maxAttempts` (default `10`), after which the element is dropped (SQL/Redis
  stores move it to a failed list/marker). Because of this, and because ordering is
  not guaranteed, handlers must be **idempotent** and must not assume strict FIFO.

Tune the attempt budget per `take`:

```ts
import { Effect } from "effect"

// Give this job up to 3 attempts instead of the default 10.
queue.take((job) => process(job), { maxAttempts: 3 })

declare const process: (job: unknown) => Effect.Effect<void>
```

Note the interrupt behavior is special: a purely-interrupted take is re-delivered
**without** incrementing `attempts`, so graceful shutdown does not burn a retry.
Only genuine failures count against `maxAttempts`.

## Choosing a backing store and tuning leases

The queue is store-agnostic; pick a `PersistedQueueStore` layer to control
durability and sharing:

| Store | Layer | Durability | Sharing |
| --- | --- | --- | --- |
| Memory | `layerStoreMemory` | Volatile, lost on restart | Process-local only |
| SQL | `layerStoreSql(options?)` | Durable rows in a table | Multiple workers via row locks |
| Redis | `layerStoreRedis(options?)` | Durable lists/hashes | Multiple workers via Lua-script leases |

SQL and Redis stores hand each element to one worker under a **lease** and refresh
that lease while the handler runs. If the worker dies or the handler runs longer
than `lockExpiration`, the lease expires and another worker may pick the element
up. Tune the three durations to your expected processing time:

- `pollInterval` — how often a worker checks for new work (default `1 second`).
- `lockRefreshInterval` — how often a held lease is renewed (default `30 seconds`).
- `lockExpiration` — how long a lease is valid before it is reclaimable
  (default `90 seconds` for Redis, `2 minutes` for SQL).

`lockExpiration` must comfortably exceed your worst-case handler duration, or
slow-but-healthy work will be re-delivered. `lockRefreshInterval` should be well
below `lockExpiration` so a live worker keeps its lease.

```ts
import { Effect, Layer } from "effect"
import { SqlClient } from "effect/unstable/sql"
import { PersistedQueue } from "effect/unstable/persistence"

// layerStoreSql needs a SqlClient (from a driver layer such as
// PgClient.layer / SqliteClient.layer). It creates the table + indexes itself.
const QueueLive = PersistedQueue.layer.pipe(
  Layer.provide(
    PersistedQueue.layerStoreSql({
      tableName: "effect_queue", // default
      lockExpiration: "5 minutes" // raise for long-running jobs
    })
  )
)

declare const SqlLive: Layer.Layer<SqlClient.SqlClient>

const Live = QueueLive.pipe(Layer.provide(SqlLive))
```

```ts
import { Layer } from "effect"
import { Redis } from "effect/unstable/persistence"
import { PersistedQueue } from "effect/unstable/persistence"

// layerStoreRedis needs a Redis service (build one with Redis.make over your
// client's send function). Keys are namespaced under `prefix`.
const QueueLive = PersistedQueue.layer.pipe(
  Layer.provide(
    PersistedQueue.layerStoreRedis({
      prefix: "effectq:", // default
      pollInterval: "500 millis"
    })
  )
)

declare const RedisLive: Layer.Layer<Redis.Redis>

const Live = QueueLive.pipe(Layer.provide(RedisLive))
```

---

## API reference

### PersistedQueue&lt;A, R&gt;

The typed queue interface. `A` is the decoded payload type; `R` is the
schema's encode/decode service requirement. Two methods:

- `offer(value, { id? })` returns `Effect<string, PersistedQueueError | SchemaError, R>`
  — enqueues `value` and resolves to its id (existing ids are not re-added).
- `take(f, { maxAttempts? })` where `f(value, { id, attempts })` runs your handler;
  returns `Effect<XA, XE | PersistedQueueError | SchemaError, R | XR>`.

```ts
import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const queue = yield* PersistedQueue.make({
  name: "jobs",
  schema: Schema.Struct({ n: Schema.Number })
})

const id = yield* queue.offer({ n: 1 }) // => "<uuid>"
yield* queue.take((job, meta) => Effect.log(`${job.n} @ attempt ${meta.attempts}`))
```

### PersistedQueueFactory

`Context.Service` whose `make({ name, schema })` builds a `PersistedQueue`. The
schema's `EncodingServices | DecodingServices` flow into the resulting queue's `R`.
Provided by [`layer`](#layer).

```ts
import { Effect, Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const make = Effect.gen(function* () {
  const factory = yield* PersistedQueue.PersistedQueueFactory
  return yield* factory.make({ name: "audit", schema: Schema.String })
})
// requires PersistedQueueFactory
```

### make

Accessor that calls `PersistedQueueFactory.make`. The idiomatic way to construct a
queue; requires `PersistedQueueFactory` in context.

```ts
import { Schema } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const queue = yield* PersistedQueue.make({
  name: "emails",
  schema: Schema.Struct({ to: Schema.String })
})
// => PersistedQueue<{ to: string }, never>, requires PersistedQueueFactory
```

### makeFactory

An `Effect` that builds a `PersistedQueueFactory` from the current
`PersistedQueueStore`. Use it when you want to construct the factory manually
rather than through [`layer`](#layer).

```ts
import { Effect, Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const FactoryLive = Layer.effect(
  PersistedQueue.PersistedQueueFactory,
  PersistedQueue.makeFactory
) // requires PersistedQueueStore — this is exactly what `layer` does
```

### layer

`Layer<PersistedQueueFactory, never, PersistedQueueStore>`. Provides the factory
from whichever store layer you supply.

```ts
import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const Live = PersistedQueue.layer.pipe(
  Layer.provide(PersistedQueue.layerStoreMemory)
)
// => Layer<PersistedQueueFactory>
```

### PersistedQueueError

`Schema.ErrorClass` with `_tag: "PersistedQueueError"`, a `message: string`, and an
optional `cause`. Raised by store operations (failed offer/take against the
backend). Catch it with `Effect.catchTag`.

```ts
import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

queue
  .offer({ to: "x" })
  .pipe(
    Effect.catchTag("PersistedQueueError", (err) =>
      Effect.logError(err.message)
    )
  )
// => recovers from backend offer failures
```

It also carries the brand `[ErrorTypeId]` for runtime guards (see below).

### TypeId / ErrorTypeId

String brands used to tag values at runtime. `TypeId`
(`"~effect/persistence/PersistedQueue"`) brands `PersistedQueue` values;
`ErrorTypeId` brands `PersistedQueueError`. You rarely reference these directly.

```ts
import { PersistedQueue } from "effect/unstable/persistence"

PersistedQueue.TypeId // => "~effect/persistence/PersistedQueue"
PersistedQueue.ErrorTypeId // => "~@effect/experimental/PersistedQueue/PersistedQueueError"
```

### PersistedQueueStore

`Context.Service` defining the low-level backend. Most users provide one of the
`layerStore*` layers rather than implementing it. Two operations:

- `offer({ name, id, element, isCustomId })` — persists an already-encoded element.
- `take({ name, maxAttempts })` — returns `{ id, attempts, element }` inside a
  `Scope`; the scope's finalizer **acks on success and retries on failure** based
  on the processing exit.

```ts
import { Effect } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const drain = Effect.gen(function* () {
  const store = yield* PersistedQueue.PersistedQueueStore
  // take is scoped: closing the scope acks (success) or requeues (failure).
  const item = yield* store.take({ name: "jobs", maxAttempts: 10 })
  console.log(item) // => { id: "...", attempts: 0, element: <encoded> }
}).pipe(Effect.scoped)
```

### layerStoreMemory

`Layer<PersistedQueueStore>`. An in-memory, **process-local and volatile** store.
Failed takes are requeued until `maxAttempts`. Ideal for tests and single-process
apps that do not need durability.

```ts
import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const Live = PersistedQueue.layer.pipe(
  Layer.provide(PersistedQueue.layerStoreMemory)
)
// => no external dependencies; data lives only for the process lifetime
```

### layerStoreSql

`(options?) => Layer<PersistedQueueStore, SqlError, SqlClient>`. A durable
SQL-backed store. It creates the queue table and supporting indexes, acquires rows
with per-worker locks (`FOR UPDATE SKIP LOCKED` where supported), refreshes active
locks while takes run, and completes or retries rows by exit. Supports pg, mysql,
mssql, and sqlite dialects.

Options: `tableName` (default `"effect_queue"`), `pollInterval` (default `1s`),
`lockRefreshInterval` (default `30s`), `lockExpiration` (default `2 minutes`).

```ts
import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const StoreLive = PersistedQueue.layerStoreSql({
  tableName: "jobs_queue",
  lockExpiration: "10 minutes"
})
// => Layer<PersistedQueueStore, SqlError, SqlClient>
```

### makeStoreSql

Scoped `Effect` form of the SQL store:
`(options?) => Effect<PersistedQueueStore["Service"], SqlError, SqlClient | Scope>`.
Use it when you need the store value directly (e.g. composing your own layer or
running background fibers in a custom scope). `layerStoreSql` is `makeStoreSql`
wrapped in `Layer.effect`.

```ts
import { Effect, Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const StoreLive = Layer.effect(
  PersistedQueue.PersistedQueueStore,
  PersistedQueue.makeStoreSql({ pollInterval: "250 millis" })
)
```

### layerStoreRedis

`(options?) => Layer<PersistedQueueStore, never, Redis>`. A durable Redis-backed
store using lists and hashes with per-element worker locks driven by cached Lua
scripts. It periodically refreshes locks for in-flight items and moves exhausted
items (past `maxAttempts`) to a `:failed` list. Requires a `Redis` service.

Options: `prefix` (default `"effectq:"`), `pollInterval` (default `1s`),
`lockRefreshInterval` (default `30s`), `lockExpiration` (default `90s`).

```ts
import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const StoreLive = PersistedQueue.layerStoreRedis({
  prefix: "myapp:queue:",
  lockExpiration: "2 minutes"
})
// => Layer<PersistedQueueStore, never, Redis>
```

### makeStoreRedis

Scoped `Effect` form of the Redis store:
`(options?) => Effect<PersistedQueueStore["Service"], never, Redis | Scope>`.
`layerStoreRedis` is `makeStoreRedis` wrapped in `Layer.effect`.

```ts
import { Layer } from "effect"
import { PersistedQueue } from "effect/unstable/persistence"

const StoreLive = Layer.effect(
  PersistedQueue.PersistedQueueStore,
  PersistedQueue.makeStoreRedis({ prefix: "effectq:" })
)
```
**Building the Redis service:** The Redis store depends on the `Redis` service from
  `effect/unstable/persistence`, which you construct with `Redis.make({ send })`
  over your Redis client's command function. Provide that `Redis` layer beneath
  `layerStoreRedis`.