Skip to content

Building an Agent Loop

The Chat page shows the core agentic loop in a few lines: call the model with a toolkit, and while the response contains tool calls, loop again — the session merges tool results into history for you. That loop is an agent. This page takes it the rest of the way to production by layering on the concerns a real agent needs:

  • a turn budget so a misbehaving model can’t loop forever,
  • per-turn tracing so you can see token usage and finish reasons in observability,
  • streaming the reply to the user as it’s generated,
  • context compaction to stay within the model’s context window on long runs,
  • steering and interruption of an agent while it’s working, and
  • typed errors that preserve their cause.

Everything here is plain Effect — Queue, Ref, Fiber, Stream, spans — so an agent composes with the rest of your application exactly like any other effect.

Every example below assumes this provider client and a one-tool toolkit. The model is captured into a Layer once and provided per generation, so you can swap providers — or even switch models mid-conversation — without touching the loop.

import { OpenAiClient, OpenAiLanguageModel } from "@effect/ai-openai"
import { Config, DateTime, Effect, Layer, Schema } from "effect"
import { Tool, Toolkit } from "effect/unstable/ai"
import { FetchHttpClient } from "effect/unstable/http"
const OpenAiClientLayer = OpenAiClient.layerConfig({
apiKey: Config.redacted("OPENAI_API_KEY")
}).pipe(Layer.provide(FetchHttpClient.layer))
// One tool the agent can call. The handler is an Effect, so it gets the Clock,
// services, and typed errors — no `Date.now()`, no raw promises.
const Tools = Toolkit.make(Tool.make("getCurrentTime", {
description: "Get the current time in ISO format",
parameters: Schema.Struct({ timezone: Schema.String }),
success: Schema.String
}))
const ToolsLayer = Tools.toLayer(Effect.gen(function*() {
return Tools.of({
getCurrentTime: Effect.fn("Tools.getCurrentTime")(function*(_) {
const now = yield* DateTime.now
return DateTime.formatIso(now)
})
})
}))

Wrap the loop in a service so the rest of the app depends only on Agent, never on the model or toolkit. Two things make this more than the minimal version: each turn runs inside its own span that records token usage and the finish reason, and a turn budget bounds the loop so a model that keeps calling tools eventually fails with a domain error instead of hanging.

import { Context, Effect, Layer, Schema } from "effect"
import { AiError, Chat, Toolkit } from "effect/unstable/ai"
// ...OpenAiClientLayer, Tools, ToolsLayer from "Shared setup" above
// A tagged error that keeps the underlying failure as `cause` instead of
// stringifying it — so the original tag and context survive for debugging.
class AgentError extends Schema.TaggedErrorClass<AgentError>()("AgentError", {
message: Schema.String,
cause: Schema.optionalKey(Schema.Defect)
}) {
static fromAiError(error: AiError.AiError) {
return new AgentError({ message: `model call failed: ${error.reason}`, cause: error })
}
}
const MAX_TURNS = 10
class Agent extends Context.Service<Agent, {
run(question: string): Effect.Effect<string, AgentError>
}>()("app/Agent") {
static readonly layer = Layer.effect(
Agent,
Effect.gen(function*() {
const modelLayer = yield* OpenAiLanguageModel.model("gpt-5.2").captureRequirements
const toolkit = yield* Tools
// One turn: generate, then record what happened on the active span.
const runTurn = Effect.fn("Agent.turn")(function*(session: Chat.Service) {
const response = yield* session.generateText({ prompt: [], toolkit })
yield* Effect.annotateCurrentSpan({
"agent.finish_reason": response.finishReason,
"agent.tool_calls": response.toolCalls.length,
"agent.input_tokens": response.usage.inputTokens.total ?? 0,
"agent.output_tokens": response.usage.outputTokens.total ?? 0
})
return response
})
const run = Effect.fn("Agent.run")(
function*(question: string) {
const session = yield* Chat.fromPrompt([
{ role: "system", content: "You can use tools to answer questions." },
{ role: "user", content: question }
])
for (let turn = 1; turn <= MAX_TURNS; turn++) {
const response = yield* runTurn(session).pipe(Effect.provide(modelLayer))
// No tool calls → the model produced its final answer.
if (response.toolCalls.length === 0) return response.text
// Otherwise the session has already appended the tool results to
// history; loop to let the model continue.
}
// Budget exhausted: surface it as a domain failure, never a hang.
return yield* new AgentError({
message: `agent did not finish within ${MAX_TURNS} turns`
})
},
// Narrow the error channel to AgentError. Map AiError to our domain
// error (keeping it as `cause`); die on anything truly unexpected, such
// as a defect from a tool handler.
Effect.catchTag(
"AiError",
(error) => Effect.fail(AgentError.fromAiError(error)),
(defect) => Effect.die(defect)
)
)
return Agent.of({ run })
})
).pipe(Layer.provide([OpenAiClientLayer, ToolsLayer]))
}

Each iteration is one model turn; the loop ends either when the model answers in plain text or when the budget runs out. Because runTurn is an Effect.fn with a name, every turn becomes a span nested under Agent.run, annotated with its token usage and finish reason — pull a trace and you can see exactly how many turns and tokens a question cost.

For a responsive UI you want tokens on screen as the model produces them, not after the whole turn completes. Use session.streamText and consume the stream of response parts. The catch: in streaming mode there’s no response.toolCalls to inspect — the loop’s continue/stop decision comes from the finish part’s reason, which is "tool-calls" when the model paused to call tools.

import { Effect, Ref, Stream } from "effect"
import { Chat, Response } from "effect/unstable/ai"
// ...modelLayer + toolkit in scope
// Stream one turn: print text deltas as they arrive, capture the finish reason.
const streamTurn = (session: Chat.Service) =>
Effect.gen(function*() {
const finishReason = yield* Ref.make<Response.FinishReason>("unknown")
yield* Stream.runForEach(
session.streamText({ prompt: [], toolkit }),
(part) => {
switch (part.type) {
case "text-delta":
return Effect.sync(() => process.stdout.write(part.delta))
case "finish":
return Ref.set(finishReason, part.reason)
default:
return Effect.void
}
}
)
return yield* Ref.get(finishReason)
})
const streamingAgent = (question: string) =>
Effect.gen(function*() {
const session = yield* Chat.fromPrompt([
{ role: "system", content: "You can use tools to answer questions." },
{ role: "user", content: question }
])
// Keep streaming turns while the model is still calling tools. The session
// appends each turn (and its tool results) to history as the stream finalizes.
let reason = yield* streamTurn(session).pipe(Effect.provide(modelLayer))
while (reason === "tool-calls") {
reason = yield* streamTurn(session).pipe(Effect.provide(modelLayer))
}
})

Keeping the conversation within the context window

Section titled “Keeping the conversation within the context window”

A long-running agent accumulates history until it overflows the model’s context window. The fix is compaction: once the prompt grows past a threshold, replace the older turns with a model-written summary and keep only the most recent ones. The per-turn span already records inputTokens, so you have the signal to act on.

import { Effect, Ref } from "effect"
import { Chat, LanguageModel, Prompt } from "effect/unstable/ai"
const COMPACT_AT_TOKENS = 100_000 // act well before the hard context limit
const KEEP_RECENT = 8 // messages to preserve verbatim after the summary
const compactIfNeeded = (session: Chat.Service, lastInputTokens: number) =>
Effect.gen(function*() {
if (lastInputTokens < COMPACT_AT_TOKENS) return
const history = yield* Ref.get(session.history)
if (history.content.length <= KEEP_RECENT + 1) return
const recent = history.content.slice(-KEEP_RECENT)
const older = Prompt.fromMessages(history.content.slice(0, -KEEP_RECENT))
// Summarize the older turns with the same model.
const { text: summary } = yield* LanguageModel.generateText({
prompt: older.pipe(Prompt.appendSystem(
"Summarize the conversation so far into a concise brief that preserves " +
"key facts, decisions, identifiers, and open tasks. Reply with the summary only."
))
})
// Rewrite history to [summary-as-system, ...recent turns]. Writing to
// `session.history` directly is exactly what this kind of maintenance is for.
yield* Ref.set(
session.history,
Prompt.fromMessages(recent).pipe(
Prompt.prependSystem(`Summary of earlier conversation:\n${summary}\n\n`)
)
)
})

Call compactIfNeeded(session, response.usage.inputTokens.total ?? 0) at the end of each turn in the loop. Compaction is the one place where writing to session.history directly is the right move — for normal turns, prefer the generation methods, which keep the encode/decode/save helpers in sync.

A real agent runs while the user keeps interacting with it — adding a clarification, redirecting it, or stopping it outright. Run the loop on its own fiber, feed it user input through a Queue, and use Fiber.interrupt to stop it. Effect’s structured concurrency tears down the loop — including any in-flight model call — cleanly.

  1. Drain helper — pull every message currently queued without blocking, so a turn can fold in everything the user typed while the previous turn was running:

    import { Effect, Option, Queue } from "effect"
    const drain = <A>(queue: Queue.Queue<A>) =>
    Effect.gen(function*() {
    const items: Array<A> = []
    let next = yield* Queue.poll(queue)
    while (Option.isSome(next)) {
    items.push(next.value)
    next = yield* Queue.poll(queue)
    }
    return items
    })
  2. The loop — block until the user says something, drain any extra messages, then run the tool loop for that input and print the answer:

    import { Chat } from "effect/unstable/ai"
    // ...modelLayer + toolkit in scope
    const conversationLoop = (session: Chat.Service, inbox: Queue.Queue<string>) =>
    Effect.gen(function*() {
    while (true) {
    const first = yield* Queue.take(inbox) // waits for input
    const rest = yield* drain(inbox) // anything else queued meanwhile
    const turnInput = [first, ...rest].map((text) => ({
    role: "user" as const,
    content: text
    }))
    let response = yield* session.generateText({ prompt: turnInput, toolkit })
    .pipe(Effect.provide(modelLayer))
    while (response.toolCalls.length > 0) {
    response = yield* session.generateText({ prompt: [], toolkit })
    .pipe(Effect.provide(modelLayer))
    }
    yield* Effect.log(response.text)
    }
    })
  3. Drive it — fork the loop, push messages to steer it, and interrupt to stop:

    import { Effect, Fiber, Queue } from "effect"
    const program = Effect.gen(function*() {
    const inbox = yield* Queue.unbounded<string>()
    const session = yield* Chat.fromPrompt([
    { role: "system", content: "You are a helpful research assistant." }
    ])
    const fiber = yield* Effect.fork(conversationLoop(session, inbox))
    yield* Queue.offer(inbox, "Research the trade-offs of optimistic locking.")
    // ...later, steer it — picked up at the next turn boundary:
    yield* Queue.offer(inbox, "Focus on the write-contention failure modes.")
    // ...stop it; the in-flight turn is interrupted and the loop torn down.
    yield* Effect.sleep("2 minutes")
    yield* Fiber.interrupt(fiber)
    })

Because the loop blocks on Queue.take, an idle agent costs nothing. Steering is just another offer — messages are folded into the next turn, so a redirect lands the moment the current turn finishes rather than being lost. And interruption is free: you never wrote cancellation logic, structured concurrency handles it.

For agents that outlive a single request or process, don’t serialize by hand — use a persisted chat session, which saves its history to a backing store after every turn. The loop above is unchanged; only the session constructor differs. See Persisted chats for the full setup with Chat.layerPersisted and a BackingPersistence layer.

  • Chat — the stateful session and the minimal agentic loop this builds on.
  • Tools and Toolkits — define what your agent can do, with approval gating and failure modes.
  • Language Model — the generation API underneath, including streamText and response metadata.
  • ConcurrencyFiber, Queue, and the structured concurrency that makes steering and interruption free.
  • Observability — the spans every turn emits.