Sharding, runners, and configuration
Most application code never touches the APIs on this page directly: you define an
entity with Entity.toLayer, register a
singleton with Singleton.make, and provide a
single transport layer such as NodeClusterSocket.layer(). That layer wires up
everything described here.
This page is for the other half of the job — operating a cluster: deciding
which transport to use, how runners find each other, how shard ownership moves on
failure, and exactly which ShardingConfig knob does what.
The mental model
Section titled “The mental model”A cluster is a set of runners (processes that can host entity instances) plus a shared storage backend they coordinate through. Routing a message works like this:
client.MyEntity("user-42") // an entity id │ ▼ getShardId("user-42", group) // hash the id into a shard within its group │ => ShardId "default:137" ▼ shardAssignments[shard] // which runner currently owns that shard? │ => RunnerAddress(10.0.0.3:34431) ▼ the owning runner // routes to (or spawns) the entity instanceThe moving parts:
- Entity ids hash into shard ids inside a shard group.
getShardIdcomputesabs(hash(entityId) % shardsPerGroup) + 1and pairs it with the group name. The number of shards is fixed per group (shardsPerGroup, default300). - Healthy runners are placed on a hash ring per shard group. Each runner
advertises the
groupsit serves and aweight; the ring distributes the group’s shards across runners proportionally to weight. - The local runner runs handlers only for shards it currently owns. Owning a shard is two steps: the sharding layer assigns the shard to a runner (from the hash ring), and the runner then acquires a storage lock for it. Assignment and acquisition are separate — a runner can be assigned a shard before it has acquired the lock, and will not process that shard’s messages until it has.
- Ownership moves on shutdown, runner failure, or health refresh. When a runner
leaves or is reported unhealthy, its shards are reassigned and re-acquired by
other runners. Because ownership can change mid-flight, sends can fail with
EntityNotAssignedToRunnerand are retried automatically (sendRetryInterval).
The services that implement this — Sharding, Runners, RunnerStorage,
RunnerHealth, MessageStorage — are all provided by a single transport layer.
You rarely assemble them by hand.
Common case: a Node socket cluster
Section titled “Common case: a Node socket cluster”A production runner over raw TCP, backed by SQL storage. NodeClusterSocket.layer
provides Sharding, Runners, and MessageStorage; the only thing it asks for is
a SqlClient (because the default storage is SQL).
import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"import { Layer } from "effect"import { Entity } from "effect/unstable/cluster"
// Your entity layers (see the Entities page) — each registers handlers.declare const CounterLayer: Layer.Layer<never, never, Entity.Sharding>
// A SqlClient layer — SQLite, Postgres, MySQL, etc.declare const SqlClientLayer: Layer.Layer<any>
// The cluster transport. Defaults: msgpack serialization, SQL storage,// ping-based runner health. `runnerAddress` / `shardsPerGroup` come from the// environment via ShardingConfig.layerFromEnv (override with `shardingConfig`).const ClusterLayer = NodeClusterSocket.layer({ serialization: "msgpack", storage: "sql", runnerHealth: "ping", shardingConfig: { // host + port other runners use to reach this process runnerAddress: undefined // falls back to env / default localhost:34431 }}).pipe(Layer.provide(SqlClientLayer))
const ProductionLayer = CounterLayer.pipe(Layer.provide(ClusterLayer))
// Layer.launch keeps the runner alive serving messages until interrupted.Layer.launch(ProductionLayer).pipe(NodeRuntime.runMain)A client-only process — it can send messages to entities but never owns shards
and never starts a server. Pass clientOnly: true; this clears runnerAddress
internally so the process stays off the hash ring.
import { NodeClusterSocket } from "@effect/platform-node"
const ClientLayer = NodeClusterSocket.layer({ clientOnly: true, storage: "sql"})// => Layer<Sharding | Runners | MessageStorage, ConfigError, SqlClient>To use HTTP or WebSocket instead of raw TCP, swap in NodeClusterHttp.layer and
add a transport field:
import { NodeClusterHttp } from "@effect/platform-node"
const HttpClusterLayer = NodeClusterHttp.layer({ transport: "http", // or "websocket" serialization: "msgpack", storage: "sql"})For tests and single-process apps you do not need any of this — use
TestRunner.layer (in-memory everything) or SingleRunner.layer (SQL message
storage, single node). Both are covered under
Choosing a runner transport below.
The Sharding service
Section titled “The Sharding service”Sharding (from effect/unstable/cluster, tag effect/cluster/Sharding) is the
runtime that maps entity ids to shards, decides ownership, and delivers messages.
Most apps reach it indirectly through Entity.toLayer / Singleton.make. Access
it directly only for diagnostics, custom routing, or framework-level code.
import { Effect } from "effect"import { Sharding } from "effect/unstable/cluster"
const program = Effect.gen(function*() { const sharding = yield* Sharding // ... use sharding.getShardId, sharding.makeClient, etc.})getRegistrationEvents
Section titled “getRegistrationEvents”A Stream<ShardingRegistrationEvent> emitting an event every time this runner
registers an entity type or a singleton. Useful in startup coordination and tests.
import { Effect, Stream } from "effect"import { Sharding } from "effect/unstable/cluster"
Effect.gen(function*() { const sharding = yield* Sharding yield* sharding.getRegistrationEvents.pipe( Stream.runForEach((event) => Effect.log(event._tag)), Effect.fork ) // => logs "EntityRegistered" / "SingletonRegistered" as they happen})getShardId
Section titled “getShardId”(entityId, group) => ShardId. The pure hashing function that decides which shard
an entity id belongs to. Deterministic for a given shardsPerGroup.
import { Sharding } from "effect/unstable/cluster"
declare const sharding: Sharding["Service"]sharding.getShardId("user-42" as any, "default")// => ShardId { group: "default", id: 137 } (a stable 1..shardsPerGroup value)hasShardId
Section titled “hasShardId”(shardId) => boolean. Returns true only if this runner has acquired the lock
for that shard (not merely been assigned it), and is not shutting down.
import { ShardId } from "effect/unstable/cluster"
declare const sharding: Sharding["Service"]sharding.hasShardId(ShardId.make("default", 137))// => true if this runner currently owns shard default:137getSnowflake
Section titled “getSnowflake”An Effect<Snowflake> that mints a cluster-unique, time-ordered id local to this
runner. Used internally for request and reply ids; handy for your own correlation
ids.
import { Effect } from "effect"
declare const sharding: Sharding["Service"]Effect.gen(function*() { const id = yield* sharding.getSnowflake // => Snowflake(7421… ) unique to this runner})isShutdown
Section titled “isShutdown”An Effect<boolean> that reports whether sharding has begun shutting down. After
this is true, the runner stops accepting new requests.
declare const sharding: Sharding["Service"]sharding.isShutdown// => Effect<boolean>makeClient
Section titled “makeClient”(entity) => Effect<(entityId: string) => RpcClient>. Builds a typed client for an
entity; calling the returned function with an entity id routes encoded RPCs to the
current owner. This is what Entity.makeClient is built on.
import { Effect } from "effect"import { Entity } from "effect/unstable/cluster"
declare const Counter: Entity.Entity<"Counter", any>declare const sharding: Sharding["Service"]
Effect.gen(function*() { const makeClient = yield* sharding.makeClient(Counter) const client = makeClient("user-42") // client.Increment({ amount: 1 }) => routed to the owner of user-42's shard})registerEntity
Section titled “registerEntity”(entity, handlers, options?) => Effect<void, …, Scope | …>. Registers entity
handlers with the runner. options covers per-entity tuning: maxIdleTime,
concurrency (number | "unbounded"), mailboxCapacity, disableFatalDefects,
defectRetryPolicy, and spanAttributes. Prefer Entity.toLayer, which calls
this for you.
import { Effect } from "effect"
declare const sharding: Sharding["Service"]declare const Counter: anydeclare const handlers: Effect.Effect<any>
sharding.registerEntity(Counter, handlers, { maxIdleTime: "5 minutes", concurrency: "unbounded", mailboxCapacity: 4096})// => Effect<void> (scoped — unregisters when the scope closes)registerSingleton
Section titled “registerSingleton”(name, run, options?) => Effect<void>. Runs run exactly once across the cluster,
on whichever runner owns the singleton’s shard within its group. options.shardGroup
chooses the group (default "default"). Prefer Singleton.make.
import { Effect } from "effect"
declare const sharding: Sharding["Service"]sharding.registerSingleton( "report-generator", Effect.log("running on exactly one runner"), { shardGroup: "default" })// => Effect<void>(message) => Effect<void, EntityNotAssignedToRunner | MailboxFull | AlreadyProcessingMessage>.
Delivers an already-encoded incoming message to a local entity. Low-level — clients
built with makeClient call this path for you.
sendOutgoing
Section titled “sendOutgoing”(message, discard) => Effect<void, MailboxFull | AlreadyProcessingMessage | PersistenceError>.
Sends an outgoing message, routing it to the owning runner (local or remote) and,
for persisted messages, writing to storage first. discard: true means “fire and
forget — don’t wait for a reply”.
notify
Section titled “notify”(message, { waitUntilRead? }) => Effect<void, EntityNotAssignedToRunner | AlreadyProcessingMessage>.
Tells sharding that a persisted message is available in storage so it polls and
delivers it. waitUntilRead: true resolves only once the message has been picked
up by its entity.
(requestId) => Effect<boolean>. Clears the stored replies for a request id so it
can be re-processed; returns true on success. Used to recover a stuck durable
request.
declare const sharding: Sharding["Service"]declare const requestId: anysharding.reset(requestId)// => Effect<boolean>pollStorage
Section titled “pollStorage”An Effect<void> that triggers an immediate read of all unprocessed messages from
storage, instead of waiting for the next entityMessagePollInterval tick.
declare const sharding: Sharding["Service"]sharding.pollStorage// => Effect<void> — forces a storage read nowactiveEntityCount
Section titled “activeEntityCount”An Effect<number> reporting how many entity instances are currently alive on this
runner. Useful for metrics and graceful-shutdown checks.
import { Effect } from "effect"
declare const sharding: Sharding["Service"]Effect.gen(function*() { const count = yield* sharding.activeEntityCount // => 17})Sharding.layer
Section titled “Sharding.layer”Layer<Sharding, never, ShardingConfig | Runners | MessageStorage | RunnerStorage | RunnerHealth>.
The raw layer that builds Sharding from its five dependencies. Transport layers
(SocketRunner, HttpRunner, …) provide it for you with the dependencies wired —
you only reach for Sharding.layer when assembling a fully custom runtime.
import { Layer } from "effect"import { Sharding } from "effect/unstable/cluster"
Sharding.layer// => Layer<Sharding, never, ShardingConfig | Runners | MessageStorage | RunnerStorage | RunnerHealth>Configuring sharding
Section titled “Configuring sharding”ShardingConfig (tag effect/cluster/ShardingConfig) is the single service that
every cluster component reads its settings from: runner identity, shard groups,
counts, weights, lock timing, and the many polling intervals. The table below lists
every field with its default.
| Field | Type | Default | What it controls |
|---|---|---|---|
runnerAddress | Option<RunnerAddress> | Some(localhost:34431) | Externally reachable address advertised to other runners. None ⇒ client-only mode. |
runnerListenAddress | Option<RunnerAddress> | None (uses runnerAddress) | Local bind address, when it differs from the advertised address. |
runnerShardWeight | number | 1 | Relative share of shards this runner takes. 2 ⇒ twice as many as a weight-1 runner. |
availableShardGroups | ReadonlyArray<string> | ["default"] | All shard groups across the cluster. |
assignedShardGroups | ReadonlyArray<string> | ["default"] | The subset of groups this runner serves. |
shardsPerGroup | number | 300 | Shards per group. Must be identical across all runners. |
shardLockRefreshInterval | Duration.Input | 10 seconds | How often a runner re-asserts its shard locks. |
shardLockExpiration | Duration.Input | 35 seconds | How long a lock lives without a refresh before it can be stolen. |
shardLockDisableAdvisory | boolean | false | Disable advisory (DB-level) locks for shard locking. |
preemptiveShutdown | boolean | true | When one entity begins shutting down, start shutting down the whole runner. |
entityMailboxCapacity | number | "unbounded" | 4096 | Default per-entity mailbox size. |
entityMaxIdleTime | Duration.Input | 1 minute | Idle time before an entity is passivated (interrupted). |
entityRegistrationTimeout | Duration.Input | 1 minute | If an entity type hasn’t registered this long after a message arrives, the message fails. |
entityTerminationTimeout | Duration.Input | 15 seconds | Max time to wait for an entity to terminate (tuned to k8s defaults). |
entityMessagePollInterval | Duration.Input | 10 seconds | How often to poll storage for unprocessed messages. |
entityReplyPollInterval | Duration.Input | 200 millis | How often to poll storage for client replies. |
refreshAssignmentsInterval | Duration.Input | 3 seconds | How often to re-read runners and recompute shard assignments. |
sendRetryInterval | Duration.Input | 100 millis | Retry delay when a send hits EntityNotAssignedToRunner. |
runnerHealthCheckInterval | Duration.Input | 1 minute | How often the health-check singleton probes other runners. |
simulateRemoteSerialization | boolean | true | Round-trip local messages through serialization, exercising the remote path. |
ShardingConfig.layer
Section titled “ShardingConfig.layer”(options?) => Layer<ShardingConfig>. Shallow-merges your partial options over
defaults. The common way to set config in code.
import { ShardingConfig, RunnerAddress } from "effect/unstable/cluster"
ShardingConfig.layer({ runnerShardWeight: 2, shardsPerGroup: 512, entityMaxIdleTime: "5 minutes"})// => Layer<ShardingConfig> — every other field uses its defaultShardingConfig.layerDefaults
Section titled “ShardingConfig.layerDefaults”Layer<ShardingConfig>. The defaults with no overrides — equivalent to
ShardingConfig.layer().
import { ShardingConfig } from "effect/unstable/cluster"
ShardingConfig.layerDefaults// => Layer<ShardingConfig> with localhost:34431, 300 shards, group "default"ShardingConfig.defaults
Section titled “ShardingConfig.defaults”The plain ShardingConfig["Service"] object of default values. Read or spread it
when you need the baseline outside a layer.
import { ShardingConfig } from "effect/unstable/cluster"
ShardingConfig.defaults.shardsPerGroup // => 300ShardingConfig.defaults.entityTerminationTimeout // => Duration "15 seconds"ShardingConfig.config
Section titled “ShardingConfig.config”A Config<ShardingConfig["Service"]> describing how to load every field from the
Effect Config system (keys like host, port, shardsPerGroup,
shardGroups, runnerShardWeight, …).
import { ShardingConfig } from "effect/unstable/cluster"
ShardingConfig.config// => Config<ShardingConfig["Service"]>ShardingConfig.configFromEnv
Section titled “ShardingConfig.configFromEnv”The same config, pre-wired to a constant-case environment provider — so
shardsPerGroup reads SHARDS_PER_GROUP, runnerShardWeight reads
RUNNER_SHARD_WEIGHT, etc.
import { Effect } from "effect"import { ShardingConfig } from "effect/unstable/cluster"
Effect.gen(function*() { const config = yield* ShardingConfig.configFromEnv // => fully-populated ShardingConfig service read from process.env})ShardingConfig.layerFromEnv
Section titled “ShardingConfig.layerFromEnv”(options?) => Layer<ShardingConfig, ConfigError>. Loads config from the
environment, then overlays any explicit options on top. This is what
NodeClusterSocket.layer({ shardingConfig }) uses under the hood.
import { ShardingConfig } from "effect/unstable/cluster"
ShardingConfig.layerFromEnv({ // env supplies host/port/shard counts; we force a heavier weight here runnerShardWeight: 3})// => Layer<ShardingConfig, ConfigError>ShardingConfig.shardGroupConfig
Section titled “ShardingConfig.shardGroupConfig”(config) => { available: ReadonlySet<string>, assigned: ReadonlySet<string> }.
Normalizes the configured group arrays into the (sorted) available set and the
assigned subset actually served by this runner.
import { ShardingConfig } from "effect/unstable/cluster"
ShardingConfig.shardGroupConfig({ ...ShardingConfig.defaults, availableShardGroups: ["default", "heavy"], assignedShardGroups: ["heavy"]})// => { available: Set{"default","heavy"}, assigned: Set{"heavy"} }Choosing a runner transport
Section titled “Choosing a runner transport”A transport layer provides Sharding + Runners (and usually storage) on top of
a wire protocol. Pick one based on your deployment. The platform-node helpers
(NodeClusterSocket, NodeClusterHttp) are the ergonomic entry points; the
lower-level SocketRunner / HttpRunner modules are what they’re built from and
are useful for custom platforms.
SocketRunner.layer / layerClientOnly
Section titled “SocketRunner.layer / layerClientOnly”Raw TCP/Unix-socket runner over the RPC protocol. layer serves runner RPCs from a
provided SocketServer and installs Sharding + Runners clients; layerClientOnly
provides only the clients (no server, no shard assignments).
import { SocketRunner } from "effect/unstable/cluster"
SocketRunner.layer// => Layer<Sharding | Runners, never,// RpcClientProtocol | ShardingConfig | RpcSerialization | SocketServer// | MessageStorage | RunnerStorage | RunnerHealth>
SocketRunner.layerClientOnly// => Layer<Sharding | Runners, never,// RpcClientProtocol | ShardingConfig | MessageStorage | RunnerStorage>HttpRunner family
Section titled “HttpRunner family”HTTP and WebSocket transports for runner RPCs. The complete layers serve runner
routes at / and configure matching clients:
HttpRunner.layerHttp/layerHttpClientOnly— serve/connect over HTTP.HttpRunner.layerWebsocket/layerWebsocketClientOnly— serve/connect over WS.HttpRunner.layerHttpOptions({ path })/layerWebsocketOptions({ path })— add runner routes to an existingHttpRouterunder a chosen path (paths on both sides must match).HttpRunner.layerClientProtocolHttp({ path, https? })/layerClientProtocolWebsocket(...)and their*Defaultvariants (path/) — just the outgoing client protocol.HttpRunner.layerClient—Sharding+Runnersfrom a suppliedRpcClientProtocoland storage, serving nothing.HttpRunner.toHttpEffect/toHttpEffectWebsocket— raw HTTP effects for adapters that don’t useHttpRouter.serve.
import { HttpRunner } from "effect/unstable/cluster"
HttpRunner.layerHttp// => Layer<Sharding | Runners, never,// RpcSerialization | ShardingConfig | HttpClient | HttpServer// | MessageStorage | RunnerStorage | RunnerHealth>
HttpRunner.layerClientProtocolHttp({ path: "/cluster", https: true })// => Layer<RpcClientProtocol, never, RpcSerialization | HttpClient>SingleRunner.layer
Section titled “SingleRunner.layer”(options?) => Layer<Sharding | Runners | MessageStorage, ConfigError, SqlClient>.
A single-process cluster: real SQL message storage, but no-op runner transport and
health. For development, embedding, and small single-node deployments where you
still want durable entities/workflows. runnerStorage: "memory" swaps SQL runner
storage for in-memory (message storage stays SQL, so a SqlClient is still
required).
import { SingleRunner } from "effect/unstable/cluster"
SingleRunner.layer({ shardingConfig: { entityMaxIdleTime: "30 seconds" }, runnerStorage: "sql"})// => Layer<Sharding | Runners | MessageStorage, ConfigError, SqlClient>TestRunner.layer
Section titled “TestRunner.layer”Layer<Sharding | Runners | MessageStorage | MemoryDriver>. The smallest useful
cluster: in-memory message storage, in-memory runner storage, no-op transport,
always-healthy checks, default config. Ideal for unit-testing sharded entities and
singletons. See Entities for a full test example.
import { TestRunner } from "effect/unstable/cluster"
TestRunner.layer// => Layer<Sharding | Runners | MessageStorage | MemoryDriver> (no requirements)NodeClusterSocket.layer / NodeClusterHttp.layer
Section titled “NodeClusterSocket.layer / NodeClusterHttp.layer”The platform-node entry points that assemble a complete sharding runtime. Both take
an options object (the only difference: NodeClusterHttp requires a transport
field):
| Option | Values | Default | Meaning |
|---|---|---|---|
transport (Http only) | "http" | "websocket" | — | Wire protocol for runner RPCs. |
serialization | "msgpack" | "ndjson" | "msgpack" | RPC wire codec. |
clientOnly | boolean | false | Send-only; no server, no shard ownership. |
storage | "local" | "sql" | "byo" | "sql" | local = in-memory/noop; sql = durable; byo = you provide MessageStorage + RunnerStorage. |
runnerHealth | "ping" | "k8s" | "ping" | Failure detection strategy. |
runnerHealthK8s | { namespace?, labelSelector? } | — | Scope for the k8s health check. |
shardingConfig | Partial<ShardingConfig["Service"]> | — | Overrides applied over env-loaded config. |
import { NodeClusterSocket } from "@effect/platform-node"
NodeClusterSocket.layer({ serialization: "msgpack", storage: "sql", runnerHealth: "ping" })// => Layer<Sharding | Runners | MessageStorage, ConfigError | SocketServerError, SqlClient>
NodeClusterSocket.layer({ clientOnly: true, storage: "byo" })// => Layer<Sharding | Runners, ConfigError, MessageStorage | RunnerStorage>NodeClusterSocket also re-exports layerClientProtocol (the shared socket client
protocol) and layerSocketServer (the socket server) for hand-wiring, plus
layerDispatcherK8s and layerK8sHttpClient for Kubernetes health checks.
NodeClusterHttp likewise re-exports layerK8sHttpClient and provides
layerHttpServer (an HttpServer bound to runnerListenAddress/runnerAddress).
Runner membership and health
Section titled “Runner membership and health”These services back the routing decisions. Transport layers provide them; you touch them directly only for custom storage or health strategies.
Runner
Section titled “Runner”A Schema.Class holding cluster membership data — not a process handle:
{ address, groups, weight }. It is persisted and exchanged so runners agree on
who can host what. Build one with Runner.make.
import { Runner, RunnerAddress } from "effect/unstable/cluster"
const runner = Runner.make({ address: RunnerAddress.make("10.0.0.3", 34431), groups: ["default"], weight: 1})// => Runner({ address: RunnerAddress(10.0.0.3:34431), groups: ["default"], weight: 1 })
Runner.encodeSync(runner) // => JSON string, for storage/transportRunnerAddress
Section titled “RunnerAddress”The stable { host, port } identity of a runner; equality, hashing, and the
host:port primary key all use both fields. RunnerAddress.make(host, port) is the
module constructor.
import { RunnerAddress } from "effect/unstable/cluster"
const addr = RunnerAddress.make("10.0.0.3", 34431)addr.toString() // => "RunnerAddress(10.0.0.3:34431)"RunnerHealth
Section titled “RunnerHealth”Service answering isAlive(address) => Effect<boolean> — whether a runner is
healthy enough to keep its shards. Implementations:
RunnerHealth.layerNoop— everything is always healthy (tests, single node).RunnerHealth.layerPing/makePing— ping over theRunnersprotocol, with timeout and retries.RunnerHealth.layerK8s({ namespace?, labelSelector? })/makeK8s— derive health from Kubernetes pod readiness. API failures count as healthy to avoid reassigning shards during a transient control-plane outage.
import { RunnerHealth } from "effect/unstable/cluster"
RunnerHealth.layerNoop // => Layer<RunnerHealth>RunnerHealth.layerPing // => Layer<RunnerHealth, never, Runners>RunnerHealth.layerK8s({ namespace: "prod", labelSelector: "app=worker" })// => Layer<RunnerHealth, never, K8sHttpClient>The k8s strategy needs a K8sHttpClient: K8sHttpClient.layer (targets
https://kubernetes.default.svc/api with the mounted service-account token),
K8sHttpClient.makeGetPods({ namespace?, labelSelector? }) (cached list of running
pods keyed by pod IP), K8sHttpClient.makeCreatePod (scoped pod creator), and the
Pod / PodStatus schemas with isReady / isReadyOrInitializing helpers.
RunnerStorage
Section titled “RunnerStorage”The persistence boundary for runner membership and shard locks:
register, unregister, getRunners, setRunnerHealth, acquire, refresh,
release, releaseAll. Implementations:
RunnerStorage.makeMemory/layerMemory— process-local; registrations and locks do not survive a restart.RunnerStorage.makeEncoded(encoded)— adapt a string/number-encoded backend (the shape production adapters implement) into the typed service.- For durable, cross-process membership use
SqlRunnerStorage.layer(default table prefix) orSqlRunnerStorage.layerWith({ prefix })— see Message storage for the SQL side.
import { RunnerStorage } from "effect/unstable/cluster"
RunnerStorage.layerMemory // => Layer<RunnerStorage> (in-memory, non-durable)Runners
Section titled “Runners”The transport service used to talk to other runners: ping, send, sendLocal,
notify, notifyLocal, onRunnerUnavailable. Implementations:
Runners.make(callbacks)— wrap your own remoteping/send/notifycallbacks with the standard local-persistence, duplicate-handling, and reply-polling logic.Runners.makeNoop/layerNoop— reject remote sends; for single-node use.Runners.makeRpc/layerRpc— RPC-backed, caching one client per runner address. Needs aRpcClientProtocol.- Supporting exports:
Runners.Rpcs(the runner-to-runner RPC group),Runners.RpcClient(its client interface),Runners.makeRpcClient(build a client from the currentRpcClient.Protocol), andRunners.RpcClientProtocol(the service that turns aRunnerAddressinto a connection).
import { Runners } from "effect/unstable/cluster"
Runners.layerNoop// => Layer<Runners, never, ShardingConfig | MessageStorage>Runners.layerRpc// => Layer<Runners, never, MessageStorage | RpcClientProtocol | ShardingConfig>RunnerServer.layer / layerWithClients / layerClientOnly / layerHandlers
Section titled “RunnerServer.layer / layerWithClients / layerClientOnly / layerHandlers”The transport-agnostic server side of the runner protocol — it forwards incoming
runner RPCs into Sharding. Transports supply the RpcServer.Protocol; this module
supplies the handlers.
RunnerServer.layerHandlers— the raw handler set (Ping,Notify,Effect,Stream,Envelope).RunnerServer.layer— serve those handlers on a providedRpcServer.Protocol.RunnerServer.layerWithClients—layerplusSharding+Runnersclients (whatSocketRunner.layerbuilds on).RunnerServer.layerClientOnly—Sharding+RunnerswithrunnerAddresscleared, so the process sends but never owns shards.
import { RunnerServer } from "effect/unstable/cluster"
RunnerServer.layerWithClients// => Layer<Sharding | Runners, never,// RpcServer.Protocol | ShardingConfig | RpcClientProtocol// | MessageStorage | RunnerStorage | RunnerHealth>Shard ids and registration events
Section titled “Shard ids and registration events”ShardId
Section titled “ShardId”A { group, id } pair identifying a shard after a routing decision; it does not
choose a shard for an arbitrary key. Equality, hashing, and the primary key use the
group:id string form.
import { ShardId } from "effect/unstable/cluster"
const shard = ShardId.make("default", 137)shard.toString() // => "default:137"
ShardId.fromString("default:137") // => ShardId { group: "default", id: 137 }ShardId.fromStringEncoded("default:137") // => { group: "default", id: 137 }ShardId.isShardId(shard) // => trueShardingRegistrationEvent
Section titled “ShardingRegistrationEvent”The tagged union emitted by Sharding.getRegistrationEvents:
EntityRegistered { entity } and SingletonRegistered { address }. A registration
means this runner installed the capability locally — it does not mean the runner
currently owns the relevant shard. Match with the generated match helper.
import { Effect, Stream } from "effect"import { Sharding, ShardingRegistrationEvent } from "effect/unstable/cluster"
Effect.gen(function*() { const sharding = yield* Sharding yield* sharding.getRegistrationEvents.pipe( Stream.runForEach( ShardingRegistrationEvent.match({ EntityRegistered: ({ entity }) => Effect.log(`entity ${entity.type}`), SingletonRegistered: ({ address }) => Effect.log(`singleton ${address.name}`) }) ) ) // => logs "entity Counter", "singleton report-generator", …})See also
Section titled “See also”- Entities — define and run sharded entities (the layer you actually provide).
- Singletons & cron — run an effect on exactly one runner per shard group.
- Message storage — volatile vs. persisted messages
and the SQL backends used by
storage: "sql".