Entity proxies and external APIs
A clustered entity speaks the internal cluster protocol: messages are wrapped in envelopes, routed to the owning shard, and delivered to the single live instance for a given entity id. That protocol is great inside the fleet, but you usually do not want outside callers — a browser, a mobile app, a third-party service — to speak it directly.
EntityProxy closes that gap. It derives an ordinary surface from an entity:
EntityProxy.toRpcGroup(entity)→ anRpcGroupyou can serve withRpcServerand call with a normalRpcClient.EntityProxy.toHttpApiGroup(name, entity)→ anHttpApiGroupwith one POST endpoint per message.
The proxy wraps each original payload with the target entityId (RPC puts it in the
payload; HTTP puts it in the route path), and EntityProxyServer supplies the
handlers that forward the call to the entity client. The cluster still does all the
routing and delivery — the proxy is just a front door, ideal for a gateway service.
Exposing an entity over RPC
Section titled “Exposing an entity over RPC”Start from an entity, derive its RPC group, and provide the forwarding handlers.
import { Layer, Schema } from "effect"import { ClusterSchema, Entity, EntityProxy, EntityProxyServer } from "effect/unstable/cluster"import { Rpc, RpcServer } from "effect/unstable/rpc"
// The entity we want to expose. Messages are persisted so they survive failover.export const Counter = Entity.make("Counter", [ Rpc.make("Increment", { payload: { id: Schema.String, amount: Schema.Number }, primaryKey: ({ id }) => id, success: Schema.Number })]).annotateRpcs(ClusterSchema.Persisted, true)
// Derive an ordinary RpcGroup from the entity. The class form gives the group a// stable identity you can reference from clients.export class MyRpcs extends EntityProxy.toRpcGroup(Counter) {}
// Serve it. layerRpcHandlers wires each generated RPC to the entity client so// requests are forwarded into the cluster and routed to the owning shard.const RpcServerLayer = RpcServer.layer(MyRpcs).pipe( Layer.provide(EntityProxyServer.layerRpcHandlers(Counter)))What gets generated
Section titled “What gets generated”For every RPC Tag on the entity, the derived group contains two RPCs:
| Generated RPC | Payload | Success | Errors added |
|---|---|---|---|
Counter.Increment | { entityId, payload } | original success | original error + MailboxFull / AlreadyProcessingMessage / PersistenceError |
Counter.IncrementDiscard | { entityId, payload } | void | only the three cluster errors |
- Names are prefixed with the entity type.
Incrementon theCounterentity becomesCounter.Increment. This keeps tags unique if you proxy several entities into one group. - The payload is wrapped. The original payload moves under a
payloadfield and anentityId: stringfield is added alongside it. The proxy usesentityIdto pick the target instance. - Each RPC gets a
…Discardtwin. The discard variant is fire-and-forget: it forwards the message with{ discard: true }, returnsvoid, and cannot surface the entity’s own errors — only delivery errors. - Cluster delivery errors are merged into the error channel of every proxy RPC:
MailboxFull(the entity’s inbox is saturated),AlreadyProcessingMessage(a message with the same id is already in flight), andPersistenceError(the durable store failed). Handle them withEffect.catchTagon the client.
EntityProxyServer.layerRpcHandlers requires Sharding plus the entity’s RPC server
services in the environment, so the layer can route forwarded calls back to the
owning shard.
Exposing an entity over HTTP
Section titled “Exposing an entity over HTTP”EntityProxy.toHttpApiGroup produces an HttpApiGroup
instead. Add it to an HttpApi, then provide handlers with
EntityProxyServer.layerHttpApi.
import { Layer, Schema } from "effect"import { ClusterSchema, Entity, EntityProxy, EntityProxyServer } from "effect/unstable/cluster"import { HttpApi, HttpApiBuilder } from "effect/unstable/httpapi"import { Rpc } from "effect/unstable/rpc"
export const Counter = Entity.make("Counter", [ Rpc.make("Increment", { payload: { id: Schema.String, amount: Schema.Number }, primaryKey: ({ id }) => id, success: Schema.Number })]).annotateRpcs(ClusterSchema.Persisted, true)
// Derive an HttpApiGroup named "counter", then mount it under /counter.export class MyApi extends HttpApi.make("api") .add( EntityProxy.toHttpApiGroup("counter", Counter) .prefix("/counter") ){}
// layerHttpApi implements the group's handlers, forwarding to the entity client.const ApiLayer = HttpApiBuilder.layer(MyApi).pipe( Layer.provide(EntityProxyServer.layerHttpApi(MyApi, "counter", Counter)))Generated endpoints
Section titled “Generated endpoints”Each entity RPC becomes two POST endpoints; the entityId lives in the route path
rather than the body, and the path segment is the lower-cased RPC tag.
| Endpoint id | Method | Path (under the /counter prefix) |
|---|---|---|
Increment | POST | /increment/:entityId |
IncrementDiscard | POST | /increment/:entityId/discard |
So POST /counter/increment/cart-42 with body { "id": "cart-42", "amount": 1 }
increments the Counter instance for entity id cart-42. The discard endpoint at
/counter/increment/cart-42/discard does the same as fire-and-forget. As with RPC,
the three cluster delivery errors are added to each endpoint’s error responses.
EntityProxyServer.layerHttpApi reads the entityId path parameter, forwards the
request body to the matching entity client method, and (for …/discard) passes
{ discard: true }. It requires Sharding and the entity RPC server services.
EntityResource — pooled per-id resources
Section titled “EntityResource — pooled per-id resources”Sometimes an entity instance needs to own a heavyweight resource: a database connection, a child process, a Kubernetes Pod. You want one such resource per entity id, kept alive while the entity is active, and released only when the entity truly goes away — not on every routine shard movement.
EntityResource (from effect/unstable/cluster/EntityResource) gives you that. It
wraps the resource in an RcRef so concurrent acquirers
share one instance, calls Entity.keepAlive so the entity is
not passivated while the resource is held, and only fully releases the resource after
an idle timeout (infinite by default) or an explicit close.
import { Effect, Layer, Schema } from "effect"import { Entity, EntityResource } from "effect/unstable/cluster"import { Rpc } from "effect/unstable/rpc"
const Session = Entity.make("Session", [ Rpc.make("Ping", { payload: { id: Schema.String }, primaryKey: ({ id }) => id })])
const SessionLive = Session.toLayer( Effect.gen(function* () { // One pooled connection per entity id, kept alive while in use. const connection = yield* EntityResource.make({ acquire: Effect.acquireRelease( Effect.log("opening connection").pipe(Effect.as({ send: (_: string) => Effect.void })), () => Effect.log("closing connection") ) })
return { Ping: Effect.fnUntraced(function* () { // `get` reuses the resource within the caller's scope. const conn = yield* Effect.scoped(connection.get) yield* conn.send("ping") }) } }))EntityResource.make(options)
Section titled “EntityResource.make(options)”Creates an EntityResource that can be acquired inside an entity. Options:
acquire: Effect<A, E, R>— how to build the resource. It runs with aCloseScopeservice in context (see below); resources you register on that scope outlive shard movement.idleTimeToLive?: Duration.Input— release the resource this long after the last user releases it. Defaults to infinite, so the resource lives untilclose.acquireEagerly?: boolean— whentrue, initialize the resource immediately rather than lazily on firstget.
Returns Effect<EntityResource<A, E>, E, Scope | Exclude<R, CloseScope> | Sharding | Entity.CurrentAddress> — so it must be built inside an entity handler/layer where
Sharding and CurrentAddress are available.
import { Effect } from "effect"import { EntityResource } from "effect/unstable/cluster"
const resource = EntityResource.make({ acquire: Effect.succeed("db-handle"), idleTimeToLive: "30 seconds", acquireEagerly: true})// => Effect<EntityResource<string>, never, Scope | Sharding | Entity.CurrentAddress>EntityResource.makeK8sPod(spec, options?)
Section titled “EntityResource.makeK8sPod(spec, options?)”Creates an EntityResource backed by a Kubernetes Pod: the pod is created and waited
on via K8sHttpClient, and torn down when the resource closes or its idle timeout
expires. Useful for giving each entity id its own pod. Takes a v1.Pod spec and an
optional { idleTimeToLive }; requires K8sHttpClient.K8sHttpClient in addition to
Sharding and CurrentAddress.
import { EntityResource } from "effect/unstable/cluster"
const pod = EntityResource.makeK8sPod( { metadata: { name: "worker" }, spec: { containers: [{ name: "main", image: "ghcr.io/acme/worker:latest" }] } }, { idleTimeToLive: "5 minutes" })// => Effect<EntityResource<K8sHttpClient.PodStatus>, never,// Scope | Sharding | Entity.CurrentAddress | K8sHttpClient.K8sHttpClient>EntityResource (interface)
Section titled “EntityResource (interface)”The handle returned by make/makeK8sPod. It exposes:
get: Effect<A, E, Scope.Scope>— acquire or reuse the resource in the caller’s scope. The shared instance is reference-counted across concurrentgets.close: Effect<void>— invalidate the resource so its close scope can be released. The nextgetrebuilds it.
import { Effect } from "effect"import type { EntityResource } from "effect/unstable/cluster"
declare const resource: EntityResource.EntityResource<{ query: string }>
const program = Effect.gen(function* () { const r = yield* Effect.scoped(resource.get) // acquire/reuse // ... use r.query ... yield* resource.close // force release; next get rebuilds})EntityResource.CloseScope
Section titled “EntityResource.CloseScope”A Context.Service (id "effect/cluster/EntityResource/CloseScope") holding a
Scope that is only closed when the resource is explicitly closed or idles out —
not on entity restarts, shard movement, or node shutdown. Inside acquire, register
finalizers on this scope for cleanup that must survive routine churn.
import { Effect, Scope } from "effect"import { EntityResource } from "effect/unstable/cluster"
const resource = EntityResource.make({ acquire: Effect.gen(function* () { const closeScope = yield* EntityResource.CloseScope // Finalizer runs only when the resource is truly closed. yield* Scope.addFinalizer(closeScope, Effect.log("releasing for good")) return "handle" })})// => Effect<EntityResource<string>, never, Scope | Sharding | Entity.CurrentAddress>EntityResource.TypeId
Section titled “EntityResource.TypeId”The branding type identifier "~effect/cluster/EntityResource", available as both a
value and a type, used to tag EntityResource values.
import { EntityResource } from "effect/unstable/cluster"
EntityResource.TypeId// => "~effect/cluster/EntityResource"Reference
Section titled “Reference”EntityProxy.toRpcGroup(entity)
Section titled “EntityProxy.toRpcGroup(entity)”Derives an RpcGroup from an entity. For each entity RPC it emits a prefixed
request RPC and a …Discard fire-and-forget RPC, both with an entityId-wrapped
payload and the cluster delivery errors merged in.
import { Schema } from "effect"import { Entity, EntityProxy } from "effect/unstable/cluster"import { Rpc } from "effect/unstable/rpc"
const Counter = Entity.make("Counter", [ Rpc.make("Increment", { payload: { id: Schema.String, amount: Schema.Number }, primaryKey: ({ id }) => id, success: Schema.Number })])
const group = EntityProxy.toRpcGroup(Counter)// => RpcGroup with RPCs "Counter.Increment" and "Counter.IncrementDiscard",// each payload shaped { entityId: string, payload: { id, amount } }
// Use the class form when you need a referenceable identity for clients/servers:class MyRpcs extends EntityProxy.toRpcGroup(Counter) {}EntityProxy.ConvertRpcs (type)
Section titled “EntityProxy.ConvertRpcs (type)”Type-level conversion behind toRpcGroup. Given the entity’s RPCs and a Prefix,
it produces, per RPC, the ${Prefix}.${Tag} request RPC (payload { entityId, payload }, errors widened with MailboxFull | AlreadyProcessingMessage | PersistenceError) and the ${Prefix}.${Tag}Discard RPC (success Void, errors =
just the three cluster errors).
import type { EntityProxy } from "effect/unstable/cluster"import type { Rpc } from "effect/unstable/rpc"
// Given Rpcs and the entity type, this is the union the derived RpcGroup carries.type Derived<Rpcs extends Rpc.Any> = EntityProxy.ConvertRpcs<Rpcs, "Counter">// => Rpc<"Counter.Increment", ...> | Rpc<"Counter.IncrementDiscard", ...> | ...EntityProxy.toHttpApiGroup(name, entity)
Section titled “EntityProxy.toHttpApiGroup(name, entity)”Derives an HttpApiGroup (with the given name) from an entity. For each RPC it
adds POST /<lowercased-tag>/:entityId and POST /<lowercased-tag>/:entityId/discard
endpoints. Chain .prefix("/…") to mount the whole group under a base path.
import { Schema } from "effect"import { Entity, EntityProxy } from "effect/unstable/cluster"import { Rpc } from "effect/unstable/rpc"
const Counter = Entity.make("Counter", [ Rpc.make("Increment", { payload: { id: Schema.String, amount: Schema.Number }, primaryKey: ({ id }) => id, success: Schema.Number })])
const apiGroup = EntityProxy.toHttpApiGroup("counter", Counter).prefix("/counter")// => HttpApiGroup "counter" with endpoints:// POST /counter/increment/:entityId (success: number)// POST /counter/increment/:entityId/discard (success: void)EntityProxy.ConvertHttpApi (type)
Section titled “EntityProxy.ConvertHttpApi (type)”Type-level conversion behind toHttpApiGroup. Per entity RPC it yields the
POST /<Lowercase<Tag>>/:entityId endpoint (params carry entityId, success is
the RPC’s success, errors widened with the three cluster errors) and the
/discard endpoint (success Void).
import type { EntityProxy } from "effect/unstable/cluster"import type { Rpc } from "effect/unstable/rpc"
type Endpoints<Rpcs extends Rpc.Any> = EntityProxy.ConvertHttpApi<Rpcs>// => HttpApiEndpoint<"Increment", "POST", "/increment/:entityId", ...>// | HttpApiEndpoint<"IncrementDiscard", "POST", "/increment/:entityId/discard", ...>EntityProxyServer.layerRpcHandlers(entity)
Section titled “EntityProxyServer.layerRpcHandlers(entity)”Builds a Layer that implements the handlers for the group produced by
toRpcGroup. Each handler resolves the entity client for the wrapped entityId and
calls the matching method; the …Discard handler calls it with { discard: true }.
Requires Sharding | Rpc.ServicesServer<Rpcs>.
import { Layer } from "effect"import { EntityProxyServer } from "effect/unstable/cluster"import { RpcServer } from "effect/unstable/rpc"
declare const Counter: import("effect/unstable/cluster").Entity.Entity<"Counter", any>declare const MyRpcs: any
const ServerLayer = RpcServer.layer(MyRpcs).pipe( Layer.provide(EntityProxyServer.layerRpcHandlers(Counter)))// => Layer providing RpcHandlers<Rpcs, "Counter">, needs Sharding + entity servicesEntityProxyServer.RpcHandlers (type)
Section titled “EntityProxyServer.RpcHandlers (type)”The union of RPC handler services required to serve an entity’s proxy RPCs —
Rpc.Handler<`${Prefix}.${Tag}`> and its …Discard variant for every entity RPC.
This is exactly the RIn that layerRpcHandlers provides.
import type { EntityProxyServer } from "effect/unstable/cluster"import type { Rpc } from "effect/unstable/rpc"
type Provided<Rpcs extends Rpc.Any> = EntityProxyServer.RpcHandlers<Rpcs, "Counter">// => Rpc.Handler<"Counter.Increment"> | Rpc.Handler<"Counter.IncrementDiscard"> | ...EntityProxyServer.layerHttpApi(api, name, entity)
Section titled “EntityProxyServer.layerHttpApi(api, name, entity)”Builds a Layer implementing the handlers for the HTTP group name within api.
Each endpoint reads the entityId path parameter and forwards the request payload
to the corresponding entity client method (discard endpoints pass { discard: true }).
Defects are logged with the entity, id, and method annotated. Requires
Sharding | Rpc.ServicesServer<Rpcs>.
import { Layer } from "effect"import { EntityProxyServer } from "effect/unstable/cluster"import { HttpApiBuilder } from "effect/unstable/httpapi"
declare const Counter: import("effect/unstable/cluster").Entity.Entity<"Counter", any>declare const MyApi: any
const ApiLayer = HttpApiBuilder.layer(MyApi).pipe( Layer.provide(EntityProxyServer.layerHttpApi(MyApi, "counter", Counter)))// => Layer implementing the "counter" group, needs Sharding + entity services