Skip to content

Entity proxies and external APIs

A clustered entity speaks the internal cluster protocol: messages are wrapped in envelopes, routed to the owning shard, and delivered to the single live instance for a given entity id. That protocol is great inside the fleet, but you usually do not want outside callers — a browser, a mobile app, a third-party service — to speak it directly.

EntityProxy closes that gap. It derives an ordinary surface from an entity:

  • EntityProxy.toRpcGroup(entity) → an RpcGroup you can serve with RpcServer and call with a normal RpcClient.
  • EntityProxy.toHttpApiGroup(name, entity) → an HttpApiGroup with one POST endpoint per message.

The proxy wraps each original payload with the target entityId (RPC puts it in the payload; HTTP puts it in the route path), and EntityProxyServer supplies the handlers that forward the call to the entity client. The cluster still does all the routing and delivery — the proxy is just a front door, ideal for a gateway service.

Start from an entity, derive its RPC group, and provide the forwarding handlers.

import { Layer, Schema } from "effect"
import { ClusterSchema, Entity, EntityProxy, EntityProxyServer } from "effect/unstable/cluster"
import { Rpc, RpcServer } from "effect/unstable/rpc"
// The entity we want to expose. Messages are persisted so they survive failover.
export const Counter = Entity.make("Counter", [
Rpc.make("Increment", {
payload: { id: Schema.String, amount: Schema.Number },
primaryKey: ({ id }) => id,
success: Schema.Number
})
]).annotateRpcs(ClusterSchema.Persisted, true)
// Derive an ordinary RpcGroup from the entity. The class form gives the group a
// stable identity you can reference from clients.
export class MyRpcs extends EntityProxy.toRpcGroup(Counter) {}
// Serve it. layerRpcHandlers wires each generated RPC to the entity client so
// requests are forwarded into the cluster and routed to the owning shard.
const RpcServerLayer = RpcServer.layer(MyRpcs).pipe(
Layer.provide(EntityProxyServer.layerRpcHandlers(Counter))
)

For every RPC Tag on the entity, the derived group contains two RPCs:

Generated RPCPayloadSuccessErrors added
Counter.Increment{ entityId, payload }original successoriginal error + MailboxFull / AlreadyProcessingMessage / PersistenceError
Counter.IncrementDiscard{ entityId, payload }voidonly the three cluster errors
  • Names are prefixed with the entity type. Increment on the Counter entity becomes Counter.Increment. This keeps tags unique if you proxy several entities into one group.
  • The payload is wrapped. The original payload moves under a payload field and an entityId: string field is added alongside it. The proxy uses entityId to pick the target instance.
  • Each RPC gets a …Discard twin. The discard variant is fire-and-forget: it forwards the message with { discard: true }, returns void, and cannot surface the entity’s own errors — only delivery errors.
  • Cluster delivery errors are merged into the error channel of every proxy RPC: MailboxFull (the entity’s inbox is saturated), AlreadyProcessingMessage (a message with the same id is already in flight), and PersistenceError (the durable store failed). Handle them with Effect.catchTag on the client.

EntityProxyServer.layerRpcHandlers requires Sharding plus the entity’s RPC server services in the environment, so the layer can route forwarded calls back to the owning shard.

EntityProxy.toHttpApiGroup produces an HttpApiGroup instead. Add it to an HttpApi, then provide handlers with EntityProxyServer.layerHttpApi.

import { Layer, Schema } from "effect"
import { ClusterSchema, Entity, EntityProxy, EntityProxyServer } from "effect/unstable/cluster"
import { HttpApi, HttpApiBuilder } from "effect/unstable/httpapi"
import { Rpc } from "effect/unstable/rpc"
export const Counter = Entity.make("Counter", [
Rpc.make("Increment", {
payload: { id: Schema.String, amount: Schema.Number },
primaryKey: ({ id }) => id,
success: Schema.Number
})
]).annotateRpcs(ClusterSchema.Persisted, true)
// Derive an HttpApiGroup named "counter", then mount it under /counter.
export class MyApi extends HttpApi.make("api")
.add(
EntityProxy.toHttpApiGroup("counter", Counter)
.prefix("/counter")
)
{}
// layerHttpApi implements the group's handlers, forwarding to the entity client.
const ApiLayer = HttpApiBuilder.layer(MyApi).pipe(
Layer.provide(EntityProxyServer.layerHttpApi(MyApi, "counter", Counter))
)

Each entity RPC becomes two POST endpoints; the entityId lives in the route path rather than the body, and the path segment is the lower-cased RPC tag.

Endpoint idMethodPath (under the /counter prefix)
IncrementPOST/increment/:entityId
IncrementDiscardPOST/increment/:entityId/discard

So POST /counter/increment/cart-42 with body { "id": "cart-42", "amount": 1 } increments the Counter instance for entity id cart-42. The discard endpoint at /counter/increment/cart-42/discard does the same as fire-and-forget. As with RPC, the three cluster delivery errors are added to each endpoint’s error responses.

EntityProxyServer.layerHttpApi reads the entityId path parameter, forwards the request body to the matching entity client method, and (for …/discard) passes { discard: true }. It requires Sharding and the entity RPC server services.


EntityResource — pooled per-id resources

Section titled “EntityResource — pooled per-id resources”

Sometimes an entity instance needs to own a heavyweight resource: a database connection, a child process, a Kubernetes Pod. You want one such resource per entity id, kept alive while the entity is active, and released only when the entity truly goes away — not on every routine shard movement.

EntityResource (from effect/unstable/cluster/EntityResource) gives you that. It wraps the resource in an RcRef so concurrent acquirers share one instance, calls Entity.keepAlive so the entity is not passivated while the resource is held, and only fully releases the resource after an idle timeout (infinite by default) or an explicit close.

import { Effect, Layer, Schema } from "effect"
import { Entity, EntityResource } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"
const Session = Entity.make("Session", [
Rpc.make("Ping", { payload: { id: Schema.String }, primaryKey: ({ id }) => id })
])
const SessionLive = Session.toLayer(
Effect.gen(function* () {
// One pooled connection per entity id, kept alive while in use.
const connection = yield* EntityResource.make({
acquire: Effect.acquireRelease(
Effect.log("opening connection").pipe(Effect.as({ send: (_: string) => Effect.void })),
() => Effect.log("closing connection")
)
})
return {
Ping: Effect.fnUntraced(function* () {
// `get` reuses the resource within the caller's scope.
const conn = yield* Effect.scoped(connection.get)
yield* conn.send("ping")
})
}
})
)

Creates an EntityResource that can be acquired inside an entity. Options:

  • acquire: Effect<A, E, R> — how to build the resource. It runs with a CloseScope service in context (see below); resources you register on that scope outlive shard movement.
  • idleTimeToLive?: Duration.Input — release the resource this long after the last user releases it. Defaults to infinite, so the resource lives until close.
  • acquireEagerly?: boolean — when true, initialize the resource immediately rather than lazily on first get.

Returns Effect<EntityResource<A, E>, E, Scope | Exclude<R, CloseScope> | Sharding | Entity.CurrentAddress> — so it must be built inside an entity handler/layer where Sharding and CurrentAddress are available.

import { Effect } from "effect"
import { EntityResource } from "effect/unstable/cluster"
const resource = EntityResource.make({
acquire: Effect.succeed("db-handle"),
idleTimeToLive: "30 seconds",
acquireEagerly: true
})
// => Effect<EntityResource<string>, never, Scope | Sharding | Entity.CurrentAddress>

Creates an EntityResource backed by a Kubernetes Pod: the pod is created and waited on via K8sHttpClient, and torn down when the resource closes or its idle timeout expires. Useful for giving each entity id its own pod. Takes a v1.Pod spec and an optional { idleTimeToLive }; requires K8sHttpClient.K8sHttpClient in addition to Sharding and CurrentAddress.

import { EntityResource } from "effect/unstable/cluster"
const pod = EntityResource.makeK8sPod(
{
metadata: { name: "worker" },
spec: { containers: [{ name: "main", image: "ghcr.io/acme/worker:latest" }] }
},
{ idleTimeToLive: "5 minutes" }
)
// => Effect<EntityResource<K8sHttpClient.PodStatus>, never,
// Scope | Sharding | Entity.CurrentAddress | K8sHttpClient.K8sHttpClient>

The handle returned by make/makeK8sPod. It exposes:

  • get: Effect<A, E, Scope.Scope> — acquire or reuse the resource in the caller’s scope. The shared instance is reference-counted across concurrent gets.
  • close: Effect<void> — invalidate the resource so its close scope can be released. The next get rebuilds it.
import { Effect } from "effect"
import type { EntityResource } from "effect/unstable/cluster"
declare const resource: EntityResource.EntityResource<{ query: string }>
const program = Effect.gen(function* () {
const r = yield* Effect.scoped(resource.get) // acquire/reuse
// ... use r.query ...
yield* resource.close // force release; next get rebuilds
})

A Context.Service (id "effect/cluster/EntityResource/CloseScope") holding a Scope that is only closed when the resource is explicitly closed or idles out — not on entity restarts, shard movement, or node shutdown. Inside acquire, register finalizers on this scope for cleanup that must survive routine churn.

import { Effect, Scope } from "effect"
import { EntityResource } from "effect/unstable/cluster"
const resource = EntityResource.make({
acquire: Effect.gen(function* () {
const closeScope = yield* EntityResource.CloseScope
// Finalizer runs only when the resource is truly closed.
yield* Scope.addFinalizer(closeScope, Effect.log("releasing for good"))
return "handle"
})
})
// => Effect<EntityResource<string>, never, Scope | Sharding | Entity.CurrentAddress>

The branding type identifier "~effect/cluster/EntityResource", available as both a value and a type, used to tag EntityResource values.

import { EntityResource } from "effect/unstable/cluster"
EntityResource.TypeId
// => "~effect/cluster/EntityResource"

Derives an RpcGroup from an entity. For each entity RPC it emits a prefixed request RPC and a …Discard fire-and-forget RPC, both with an entityId-wrapped payload and the cluster delivery errors merged in.

import { Schema } from "effect"
import { Entity, EntityProxy } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"
const Counter = Entity.make("Counter", [
Rpc.make("Increment", {
payload: { id: Schema.String, amount: Schema.Number },
primaryKey: ({ id }) => id,
success: Schema.Number
})
])
const group = EntityProxy.toRpcGroup(Counter)
// => RpcGroup with RPCs "Counter.Increment" and "Counter.IncrementDiscard",
// each payload shaped { entityId: string, payload: { id, amount } }
// Use the class form when you need a referenceable identity for clients/servers:
class MyRpcs extends EntityProxy.toRpcGroup(Counter) {}

Type-level conversion behind toRpcGroup. Given the entity’s RPCs and a Prefix, it produces, per RPC, the ${Prefix}.${Tag} request RPC (payload { entityId, payload }, errors widened with MailboxFull | AlreadyProcessingMessage | PersistenceError) and the ${Prefix}.${Tag}Discard RPC (success Void, errors = just the three cluster errors).

import type { EntityProxy } from "effect/unstable/cluster"
import type { Rpc } from "effect/unstable/rpc"
// Given Rpcs and the entity type, this is the union the derived RpcGroup carries.
type Derived<Rpcs extends Rpc.Any> = EntityProxy.ConvertRpcs<Rpcs, "Counter">
// => Rpc<"Counter.Increment", ...> | Rpc<"Counter.IncrementDiscard", ...> | ...

Derives an HttpApiGroup (with the given name) from an entity. For each RPC it adds POST /<lowercased-tag>/:entityId and POST /<lowercased-tag>/:entityId/discard endpoints. Chain .prefix("/…") to mount the whole group under a base path.

import { Schema } from "effect"
import { Entity, EntityProxy } from "effect/unstable/cluster"
import { Rpc } from "effect/unstable/rpc"
const Counter = Entity.make("Counter", [
Rpc.make("Increment", {
payload: { id: Schema.String, amount: Schema.Number },
primaryKey: ({ id }) => id,
success: Schema.Number
})
])
const apiGroup = EntityProxy.toHttpApiGroup("counter", Counter).prefix("/counter")
// => HttpApiGroup "counter" with endpoints:
// POST /counter/increment/:entityId (success: number)
// POST /counter/increment/:entityId/discard (success: void)

Type-level conversion behind toHttpApiGroup. Per entity RPC it yields the POST /<Lowercase<Tag>>/:entityId endpoint (params carry entityId, success is the RPC’s success, errors widened with the three cluster errors) and the /discard endpoint (success Void).

import type { EntityProxy } from "effect/unstable/cluster"
import type { Rpc } from "effect/unstable/rpc"
type Endpoints<Rpcs extends Rpc.Any> = EntityProxy.ConvertHttpApi<Rpcs>
// => HttpApiEndpoint<"Increment", "POST", "/increment/:entityId", ...>
// | HttpApiEndpoint<"IncrementDiscard", "POST", "/increment/:entityId/discard", ...>

EntityProxyServer.layerRpcHandlers(entity)

Section titled “EntityProxyServer.layerRpcHandlers(entity)”

Builds a Layer that implements the handlers for the group produced by toRpcGroup. Each handler resolves the entity client for the wrapped entityId and calls the matching method; the …Discard handler calls it with { discard: true }. Requires Sharding | Rpc.ServicesServer<Rpcs>.

import { Layer } from "effect"
import { EntityProxyServer } from "effect/unstable/cluster"
import { RpcServer } from "effect/unstable/rpc"
declare const Counter: import("effect/unstable/cluster").Entity.Entity<"Counter", any>
declare const MyRpcs: any
const ServerLayer = RpcServer.layer(MyRpcs).pipe(
Layer.provide(EntityProxyServer.layerRpcHandlers(Counter))
)
// => Layer providing RpcHandlers<Rpcs, "Counter">, needs Sharding + entity services

The union of RPC handler services required to serve an entity’s proxy RPCs — Rpc.Handler<`${Prefix}.${Tag}`> and its …Discard variant for every entity RPC. This is exactly the RIn that layerRpcHandlers provides.

import type { EntityProxyServer } from "effect/unstable/cluster"
import type { Rpc } from "effect/unstable/rpc"
type Provided<Rpcs extends Rpc.Any> = EntityProxyServer.RpcHandlers<Rpcs, "Counter">
// => Rpc.Handler<"Counter.Increment"> | Rpc.Handler<"Counter.IncrementDiscard"> | ...

EntityProxyServer.layerHttpApi(api, name, entity)

Section titled “EntityProxyServer.layerHttpApi(api, name, entity)”

Builds a Layer implementing the handlers for the HTTP group name within api. Each endpoint reads the entityId path parameter and forwards the request payload to the corresponding entity client method (discard endpoints pass { discard: true }). Defects are logged with the entity, id, and method annotated. Requires Sharding | Rpc.ServicesServer<Rpcs>.

import { Layer } from "effect"
import { EntityProxyServer } from "effect/unstable/cluster"
import { HttpApiBuilder } from "effect/unstable/httpapi"
declare const Counter: import("effect/unstable/cluster").Entity.Entity<"Counter", any>
declare const MyApi: any
const ApiLayer = HttpApiBuilder.layer(MyApi).pipe(
Layer.provide(EntityProxyServer.layerHttpApi(MyApi, "counter", Counter))
)
// => Layer implementing the "counter" group, needs Sharding + entity services
  • Entities — define the entity protocols you proxy here.
  • HTTP API — the HttpApiGroup/HttpApi building blocks that toHttpApiGroup plugs into.
  • RPC — the RpcGroup/RpcServer building blocks behind toRpcGroup.