Building an Agent Loop
The Chat page shows the core agentic loop in a few lines: call the
model with a toolkit, and while the response contains tool calls,
loop again — the session merges tool results into history for you. That loop is
an agent. This page takes it the rest of the way to production by layering on the
concerns a real agent needs:
- a turn budget so a misbehaving model can’t loop forever,
- per-turn tracing so you can see token usage and finish reasons in observability,
- streaming the reply to the user as it’s generated,
- context compaction to stay within the model’s context window on long runs,
- steering and interruption of an agent while it’s working, and
- typed errors that preserve their cause.
Everything here is plain Effect — Queue, Ref, Fiber, Stream, spans — so an
agent composes with the rest of your application exactly like any other effect.
Shared setup
Section titled “Shared setup”Every example below assumes this provider client and a one-tool toolkit. The model
is captured into a Layer once and provided per generation, so you can swap
providers — or even switch models mid-conversation — without touching the loop.
import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai"import { Config, DateTime, Effect, Layer, Schema } from "effect"import { Tool, Toolkit } from "effect/unstable/ai"import { FetchHttpClient } from "effect/unstable/http"
const OpenAiClientLayer = OpenAiClient.layerConfig({ apiKey: Config.redacted("OPENAI_API_KEY")}).pipe(Layer.provide(FetchHttpClient.layer))
// One tool the agent can call. The handler is an Effect, so it gets the Clock,// services, and typed errors — no `Date.now()`, no raw promises.const Tools = Toolkit.make(Tool.make("getCurrentTime", { description: "Get the current time in ISO format", parameters: Schema.Struct({ timezone: Schema.String }), success: Schema.String}))
const ToolsLayer = Tools.toLayer(Effect.gen(function*() { return Tools.of({ getCurrentTime: Effect.fn("Tools.getCurrentTime")(function*(_) { const now = yield* DateTime.now return DateTime.formatIso(now) }) })}))The loop
Section titled “The loop”Wrap the loop in a service so the rest of the app depends
only on Agent, never on the model or toolkit. Two things make this more than the
minimal version: each turn runs inside its own span that records
token usage and the finish reason, and a turn budget bounds the loop so a model
that keeps calling tools eventually fails with a domain error instead of hanging.
import { Context, Effect, Layer, Schema } from "effect"import { AiError, Chat, Toolkit } from "effect/unstable/ai"// ...OpenAiClientLayer, Tools, ToolsLayer from "Shared setup" above
// A tagged error that keeps the underlying failure as `cause` instead of// stringifying it — so the original tag and context survive for debugging.class AgentError extends Schema.TaggedErrorClass<AgentError>()("AgentError", { message: Schema.String, cause: Schema.optionalKey(Schema.Defect)}) { static fromAiError(error: AiError.AiError) { return new AgentError({ message: `model call failed: ${error.reason}`, cause: error }) }}
const MAX_TURNS = 10
class Agent extends Context.Service<Agent, { run(question: string): Effect.Effect<string, AgentError>}>()("app/Agent") { static readonly layer = Layer.effect( Agent, Effect.gen(function*() { const modelLayer = yield* OpenAiLanguageModel.model("gpt-5.2").captureRequirements const toolkit = yield* Tools
// One turn: generate, then record what happened on the active span. const runTurn = Effect.fn("Agent.turn")(function*(session: Chat.Service) { const response = yield* session.generateText({ prompt: [], toolkit }) yield* Effect.annotateCurrentSpan({ "agent.finish_reason": response.finishReason, "agent.tool_calls": response.toolCalls.length, "agent.input_tokens": response.usage.inputTokens.total ?? 0, "agent.output_tokens": response.usage.outputTokens.total ?? 0 }) return response })
const run = Effect.fn("Agent.run")( function*(question: string) { const session = yield* Chat.fromPrompt([ { role: "system", content: "You can use tools to answer questions." }, { role: "user", content: question } ])
for (let turn = 1; turn <= MAX_TURNS; turn++) { const response = yield* runTurn(session).pipe(Effect.provide(modelLayer)) // No tool calls → the model produced its final answer. if (response.toolCalls.length === 0) return response.text // Otherwise the session has already appended the tool results to // history; loop to let the model continue. }
// Budget exhausted: surface it as a domain failure, never a hang. return yield* new AgentError({ message: `agent did not finish within ${MAX_TURNS} turns` }) }, // Narrow the error channel to AgentError. Map AiError to our domain // error (keeping it as `cause`); die on anything truly unexpected, such // as a defect from a tool handler. Effect.catchTag( "AiError", (error) => Effect.fail(AgentError.fromAiError(error)), (defect) => Effect.die(defect) ) )
return Agent.of({ run }) }) ).pipe(Layer.provide([OpenAiClientLayer, ToolsLayer]))}Each iteration is one model turn; the loop ends either when the model answers in
plain text or when the budget runs out. Because runTurn is an
Effect.fn with a name, every turn becomes a span
nested under Agent.run, annotated with its token usage and finish reason — pull a
trace and you can see exactly how many turns and tokens a question cost.
Streaming the agent’s output
Section titled “Streaming the agent’s output”For a responsive UI you want tokens on screen as the model produces them, not after
the whole turn completes. Use session.streamText and consume the
stream of response parts. The catch: in streaming mode there’s no
response.toolCalls to inspect — the loop’s continue/stop decision comes from the
finish part’s reason, which is "tool-calls" when the model paused to call
tools.
import { Effect, Ref, Stream } from "effect"import { Chat, Response } from "effect/unstable/ai"// ...modelLayer + toolkit in scope
// Stream one turn: print text deltas as they arrive, capture the finish reason.const streamTurn = (session: Chat.Service) => Effect.gen(function*() { const finishReason = yield* Ref.make<Response.FinishReason>("unknown")
yield* Stream.runForEach( session.streamText({ prompt: [], toolkit }), (part) => { switch (part.type) { case "text-delta": return Effect.sync(() => process.stdout.write(part.delta)) case "finish": return Ref.set(finishReason, part.reason) default: return Effect.void } } )
return yield* Ref.get(finishReason) })
const streamingAgent = (question: string) => Effect.gen(function*() { const session = yield* Chat.fromPrompt([ { role: "system", content: "You can use tools to answer questions." }, { role: "user", content: question } ])
// Keep streaming turns while the model is still calling tools. The session // appends each turn (and its tool results) to history as the stream finalizes. let reason = yield* streamTurn(session).pipe(Effect.provide(modelLayer)) while (reason === "tool-calls") { reason = yield* streamTurn(session).pipe(Effect.provide(modelLayer)) } })Keeping the conversation within the context window
Section titled “Keeping the conversation within the context window”A long-running agent accumulates history until it overflows the model’s context
window. The fix is compaction: once the prompt grows past a threshold, replace
the older turns with a model-written summary and keep only the most recent ones.
The per-turn span already records inputTokens, so you have the signal to act on.
import { Effect, Ref } from "effect"import { Chat, LanguageModel, Prompt } from "effect/unstable/ai"
const COMPACT_AT_TOKENS = 100_000 // act well before the hard context limitconst KEEP_RECENT = 8 // messages to preserve verbatim after the summary
const compactIfNeeded = (session: Chat.Service, lastInputTokens: number) => Effect.gen(function*() { if (lastInputTokens < COMPACT_AT_TOKENS) return
const history = yield* Ref.get(session.history) if (history.content.length <= KEEP_RECENT + 1) return
const recent = history.content.slice(-KEEP_RECENT) const older = Prompt.fromMessages(history.content.slice(0, -KEEP_RECENT))
// Summarize the older turns with the same model. const { text: summary } = yield* LanguageModel.generateText({ prompt: older.pipe(Prompt.appendSystem( "Summarize the conversation so far into a concise brief that preserves " + "key facts, decisions, identifiers, and open tasks. Reply with the summary only." )) })
// Rewrite history to [summary-as-system, ...recent turns]. Writing to // `session.history` directly is exactly what this kind of maintenance is for. yield* Ref.set( session.history, Prompt.fromMessages(recent).pipe( Prompt.prependSystem(`Summary of earlier conversation:\n${summary}\n\n`) ) ) })Call compactIfNeeded(session, response.usage.inputTokens.total ?? 0) at the end of
each turn in the loop. Compaction is the one place where writing to session.history
directly is the right move — for normal turns, prefer the generation methods, which
keep the encode/decode/save helpers in sync.
Steering and interrupting a running agent
Section titled “Steering and interrupting a running agent”A real agent runs while the user keeps interacting with it — adding a clarification,
redirecting it, or stopping it outright. Run the loop on its own fiber,
feed it user input through a Queue, and use
Fiber.interrupt to stop it. Effect’s structured concurrency tears down the loop —
including any in-flight model call — cleanly.
-
Drain helper — pull every message currently queued without blocking, so a turn can fold in everything the user typed while the previous turn was running:
import { Effect, Option, Queue } from "effect"const drain = <A>(queue: Queue.Queue<A>) =>Effect.gen(function*() {const items: Array<A> = []let next = yield* Queue.poll(queue)while (Option.isSome(next)) {items.push(next.value)next = yield* Queue.poll(queue)}return items}) -
The loop — block until the user says something, drain any extra messages, then run the tool loop for that input and print the answer:
import { Chat } from "effect/unstable/ai"// ...modelLayer + toolkit in scopeconst conversationLoop = (session: Chat.Service, inbox: Queue.Queue<string>) =>Effect.gen(function*() {while (true) {const first = yield* Queue.take(inbox) // waits for inputconst rest = yield* drain(inbox) // anything else queued meanwhileconst turnInput = [first, ...rest].map((text) => ({role: "user" as const,content: text}))let response = yield* session.generateText({ prompt: turnInput, toolkit }).pipe(Effect.provide(modelLayer))while (response.toolCalls.length > 0) {response = yield* session.generateText({ prompt: [], toolkit }).pipe(Effect.provide(modelLayer))}yield* Effect.log(response.text)}}) -
Drive it — fork the loop, push messages to steer it, and interrupt to stop:
import { Effect, Fiber, Queue } from "effect"const program = Effect.gen(function*() {const inbox = yield* Queue.unbounded<string>()const session = yield* Chat.fromPrompt([{ role: "system", content: "You are a helpful research assistant." }])const fiber = yield* Effect.fork(conversationLoop(session, inbox))yield* Queue.offer(inbox, "Research the trade-offs of optimistic locking.")// ...later, steer it — picked up at the next turn boundary:yield* Queue.offer(inbox, "Focus on the write-contention failure modes.")// ...stop it; the in-flight turn is interrupted and the loop torn down.yield* Effect.sleep("2 minutes")yield* Fiber.interrupt(fiber)})
Because the loop blocks on Queue.take, an idle agent costs nothing. Steering is
just another offer — messages are folded into the next turn, so a redirect lands
the moment the current turn finishes rather than being lost. And interruption is
free: you never wrote cancellation logic, structured concurrency handles it.
Persisting the conversation
Section titled “Persisting the conversation”For agents that outlive a single request or process, don’t serialize by hand — use
a persisted chat session, which saves its history to a backing store after every
turn. The loop above is unchanged; only the session constructor differs. See
Persisted chats for the full setup with
Chat.layerPersisted and a BackingPersistence layer.
Related
Section titled “Related”- Chat — the stateful session and the minimal agentic loop this builds on.
- Tools and Toolkits — define what your agent can do, with approval gating and failure modes.
- Language Model — the generation API underneath, including
streamTextand response metadata. - Concurrency —
Fiber,Queue, and the structured concurrency that makes steering and interruption free. - Observability — the spans every turn emits.