Skip to content

Latest commit

 

History

History
201 lines (153 loc) · 10.4 KB

File metadata and controls

201 lines (153 loc) · 10.4 KB

Observability

Agents emit structured events for every significant operation — RPC calls, state changes, schedule execution, workflow transitions, MCP connections, and more. These events are published to diagnostics channels and are silent by default (zero overhead when nobody is listening).

Event structure

Every event has these fields:

{
  type: "rpc",                        // what happened
  agent: "MyAgent",                   // which agent class emitted it
  name: "user-123",                   // which agent instance (Durable Object name)
  payload: { method: "getWeather" },  // details
  timestamp: 1758005142787            // when (ms since epoch)
}

agent and name identify the source agent — agent is the class name and name is the Durable Object instance name.

Channels

Events are routed to eight named channels based on their type:

Channel Event types Description
agents:state state:update State sync events
agents:rpc rpc, rpc:error RPC method calls and failures
agents:message message:request, message:response, message:clear, message:cancel, message:error, tool:result, tool:approval Chat message and tool lifecycle
agents:schedule schedule:create, schedule:execute, schedule:cancel, schedule:retry, schedule:error, queue:create, queue:retry, queue:error Scheduled and queued task lifecycle
agents:lifecycle connect, disconnect, destroy Agent connection and teardown
agents:workflow workflow:start, workflow:event, workflow:approved, workflow:rejected, workflow:terminated, workflow:paused, workflow:resumed, workflow:restarted Workflow state transitions
agents:mcp mcp:client:preconnect, mcp:client:connect, mcp:client:authorize, mcp:client:discover MCP client operations
agents:email email:receive, email:reply Email processing

Subscribing to events

Typed subscribe helper

The subscribe() function from agents/observability provides type-safe access to events on a specific channel:

import { subscribe } from "agents/observability";

const unsub = subscribe("rpc", (event) => {
  if (event.type === "rpc") {
    console.log(`RPC call: ${event.payload.method}`);
  }
  if (event.type === "rpc:error") {
    console.error(
      `RPC failed: ${event.payload.method}${event.payload.error}`
    );
  }
});

// Clean up when done
unsub();

The callback is fully typed — event is narrowed to only the event types that flow through that channel.

Raw diagnostics_channel

You can also subscribe directly using the Node.js API:

import { subscribe } from "node:diagnostics_channel";

subscribe("agents:schedule", (event) => {
  console.log(event);
});

Tail Workers (production)

In production, all diagnostics channel messages are automatically forwarded to Tail Workers. No subscription code is needed in the agent itself — attach a Tail Worker and access events via event.diagnosticsChannelEvents:

export default {
  async tail(events) {
    for (const event of events) {
      for (const msg of event.diagnosticsChannelEvents) {
        // msg.channel is "agents:rpc", "agents:workflow", etc.
        // msg.message is the typed event payload
        console.log(msg.timestamp, msg.channel, msg.message);
      }
    }
  }
};

This gives you structured, filterable observability in production with zero overhead in the agent hot path.

Custom observability

You can override the default implementation by providing your own Observability interface:

import { Agent } from "agents";
import type { Observability } from "agents/observability";

const myObservability: Observability = {
  emit(event) {
    // Send to your logging service, filter events, etc.
    if (event.type === "rpc:error") {
      myLogger.error(event.payload.method, event.payload.error);
    }
  }
};

class MyAgent extends Agent {
  override observability = myObservability;
}

Set observability to undefined to disable all event emission:

class MyAgent extends Agent {
  override observability = undefined;
}

Event reference

RPC events

Type Payload When
rpc { method, streaming? } A @callable method is invoked
rpc:error { method, error } A @callable method throws

State events

Type Payload When
state:update {} setState() is called

Message and tool events (AIChatAgent)

These events are emitted by AIChatAgent from @cloudflare/ai-chat. They track the chat message lifecycle, including client-side tool interactions.

Type Payload When
message:request {} A chat message is received
message:response {} A chat response stream completes
message:clear {} Chat history is cleared
message:cancel { requestId } A streaming request is cancelled
message:error { error } A chat stream fails
tool:result { toolCallId, toolName } A client tool result is received
tool:approval { toolCallId, approved } A tool call is approved or rejected

Schedule and queue events

Type Payload When
schedule:create { callback, id } A schedule is created
schedule:execute { callback, id } A scheduled callback starts
schedule:cancel { callback, id } A schedule is cancelled
schedule:retry { callback, id, attempt, maxAttempts } A scheduled callback is retried
schedule:error { callback, id, error, attempts } A scheduled callback fails after all retries
queue:create { callback, id } A task is enqueued
queue:retry { callback, id, attempt, maxAttempts } A queued callback is retried
queue:error { callback, id, error, attempts } A queued callback fails after all retries

Lifecycle events

Type Payload When
connect { connectionId } A WebSocket connection is established
disconnect { connectionId, code, reason } A WebSocket connection is closed
destroy {} The agent is destroyed

Workflow events

Type Payload When
workflow:start { workflowId, workflowName? } A workflow instance is started
workflow:event { workflowId, eventType? } An event is sent to a workflow
workflow:approved { workflowId, reason? } A workflow is approved
workflow:rejected { workflowId, reason? } A workflow is rejected
workflow:terminated { workflowId, workflowName? } A workflow is terminated
workflow:paused { workflowId, workflowName? } A workflow is paused
workflow:resumed { workflowId, workflowName? } A workflow is resumed
workflow:restarted { workflowId, workflowName? } A workflow is restarted

MCP events

Type Payload When
mcp:client:preconnect { serverId } Before connecting to an MCP server
mcp:client:connect { url, transport, state, error? } An MCP connection attempt completes or fails
mcp:client:authorize { serverId, authUrl, clientId? } An MCP OAuth flow begins
mcp:client:discover { url?, state?, error?, capability? } MCP capability discovery succeeds or fails

Email events

Type Payload When
email:receive { from, to, subject? } An email is received
email:reply { from, to, subject? } A reply email is sent