Stateful agent with tool execution and event streaming. Built on @mariozechner/pi-ai.
npm install @mariozechner/pi-agent-coreimport { Agent } from "@mariozechner/pi-agent-core";
import { getModel } from "@mariozechner/pi-ai";
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful assistant.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
},
});
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
// Stream just the new text chunk
process.stdout.write(event.assistantMessageEvent.delta);
}
});
await agent.prompt("Hello!");The agent works with AgentMessage, a flexible type that can include:
- Standard LLM messages (
user,assistant,toolResult) - Custom app-specific message types via declaration merging
LLMs only understand user, assistant, and toolResult. The convertToLlm function bridges this gap by filtering and transforming messages before each LLM call.
AgentMessage[] → transformContext() → AgentMessage[] → convertToLlm() → Message[] → LLM
(optional) (required)
- transformContext: Prune old messages, inject external context
- convertToLlm: Filter out UI-only messages, convert custom types to LLM format
The agent emits events for UI updates. Understanding the event sequence helps build responsive interfaces.
When you call prompt("Hello"):
prompt("Hello")
├─ agent_start
├─ turn_start
├─ message_start { message: userMessage } // Your prompt
├─ message_end { message: userMessage }
├─ message_start { message: assistantMessage } // LLM starts responding
├─ message_update { message: partial... } // Streaming chunks
├─ message_update { message: partial... }
├─ message_end { message: assistantMessage } // Complete response
├─ turn_end { message, toolResults: [] }
└─ agent_end { messages: [...] }
If the assistant calls tools, the loop continues:
prompt("Read config.json")
├─ agent_start
├─ turn_start
├─ message_start/end { userMessage }
├─ message_start { assistantMessage with toolCall }
├─ message_update...
├─ message_end { assistantMessage }
├─ tool_execution_start { toolCallId, toolName, args }
├─ tool_execution_update { partialResult } // If tool streams
├─ tool_execution_end { toolCallId, result }
├─ message_start/end { toolResultMessage }
├─ turn_end { message, toolResults: [toolResult] }
│
├─ turn_start // Next turn
├─ message_start { assistantMessage } // LLM responds to tool result
├─ message_update...
├─ message_end
├─ turn_end
└─ agent_end
Tool execution mode is configurable:
parallel(default): preflight tool calls sequentially, execute allowed tools concurrently, emit finaltool_execution_endandtoolResultmessages in assistant source ordersequential: execute tool calls one by one, matching the historical behavior
The beforeToolCall hook runs after tool_execution_start and validated argument parsing. It can block execution. The afterToolCall hook runs after tool execution finishes and before tool_execution_end and final tool result message events are emitted.
When you use the Agent class, assistant message_end processing is treated as a barrier before tool preflight begins. That means beforeToolCall sees agent state that already includes the assistant message that requested the tool call.
continue() resumes from existing context without adding a new message. Use it for retries after errors.
// After an error, retry from current state
await agent.continue();The last message in context must be user or toolResult (not assistant).
| Event | Description |
|---|---|
agent_start |
Agent begins processing |
agent_end |
Agent completes with all new messages |
turn_start |
New turn begins (one LLM call + tool executions) |
turn_end |
Turn completes with assistant message and tool results |
message_start |
Any message begins (user, assistant, toolResult) |
message_update |
Assistant only. Includes assistantMessageEvent with delta |
message_end |
Message completes |
tool_execution_start |
Tool begins |
tool_execution_update |
Tool streams progress |
tool_execution_end |
Tool completes |
const agent = new Agent({
// Initial state
initialState: {
systemPrompt: string,
model: Model<any>,
thinkingLevel: "off" | "minimal" | "low" | "medium" | "high" | "xhigh",
tools: AgentTool<any>[],
messages: AgentMessage[],
},
// Convert AgentMessage[] to LLM Message[] (required for custom message types)
convertToLlm: (messages) => messages.filter(...),
// Transform context before convertToLlm (for pruning, compaction)
transformContext: async (messages, signal) => pruneOldMessages(messages),
// Steering mode: "one-at-a-time" (default) or "all"
steeringMode: "one-at-a-time",
// Follow-up mode: "one-at-a-time" (default) or "all"
followUpMode: "one-at-a-time",
// Custom stream function (for proxy backends)
streamFn: streamProxy,
// Session ID for provider caching
sessionId: "session-123",
// Dynamic API key resolution (for expiring OAuth tokens)
getApiKey: async (provider) => refreshToken(),
// Tool execution mode: "parallel" (default) or "sequential"
toolExecution: "parallel",
// Preflight each tool call after args are validated. Can block execution.
beforeToolCall: async ({ toolCall, args, context }) => {
if (toolCall.name === "bash") {
return { block: true, reason: "bash is disabled" };
}
},
// Postprocess each tool result before final tool events are emitted.
afterToolCall: async ({ toolCall, result, isError, context }) => {
if (!isError) {
return { details: { ...result.details, audited: true } };
}
},
// Custom thinking budgets for token-based providers
thinkingBudgets: {
minimal: 128,
low: 512,
medium: 1024,
high: 2048,
},
});interface AgentState {
systemPrompt: string;
model: Model<any>;
thinkingLevel: ThinkingLevel;
tools: AgentTool<any>[];
messages: AgentMessage[];
isStreaming: boolean;
streamMessage: AgentMessage | null; // Current partial during streaming
pendingToolCalls: Set<string>;
error?: string;
}Access via agent.state. During streaming, streamMessage contains the partial assistant message.
// Text prompt
await agent.prompt("Hello");
// With images
await agent.prompt("What's in this image?", [
{ type: "image", data: base64Data, mimeType: "image/jpeg" }
]);
// AgentMessage directly
await agent.prompt({ role: "user", content: "Hello", timestamp: Date.now() });
// Continue from current context (last message must be user or toolResult)
await agent.continue();agent.setSystemPrompt("New prompt");
agent.setModel(getModel("openai", "gpt-4o"));
agent.setThinkingLevel("medium");
agent.setTools([myTool]);
agent.setToolExecution("sequential");
agent.setBeforeToolCall(async ({ toolCall }) => undefined);
agent.setAfterToolCall(async ({ toolCall, result }) => undefined);
agent.replaceMessages(newMessages);
agent.appendMessage(message);
agent.clearMessages();
agent.reset(); // Clear everythingagent.sessionId = "session-123";
agent.thinkingBudgets = {
minimal: 128,
low: 512,
medium: 1024,
high: 2048,
};agent.abort(); // Cancel current operation
await agent.waitForIdle(); // Wait for completionconst unsubscribe = agent.subscribe((event) => {
console.log(event.type);
});
unsubscribe();Steering messages let you interrupt the agent while tools are running. Follow-up messages let you queue work after the agent would otherwise stop.
agent.setSteeringMode("one-at-a-time");
agent.setFollowUpMode("one-at-a-time");
// While agent is running tools
agent.steer({
role: "user",
content: "Stop! Do this instead.",
timestamp: Date.now(),
});
// After the agent finishes its current work
agent.followUp({
role: "user",
content: "Also summarize the result.",
timestamp: Date.now(),
});
const steeringMode = agent.getSteeringMode();
const followUpMode = agent.getFollowUpMode();
agent.clearSteeringQueue();
agent.clearFollowUpQueue();
agent.clearAllQueues();Use clearSteeringQueue, clearFollowUpQueue, or clearAllQueues to drop queued messages.
When steering messages are detected after a turn completes:
- All tool calls from the current assistant message have already finished
- Steering messages are injected
- The LLM responds on the next turn
Follow-up messages are checked only when there are no more tool calls and no steering messages. If any are queued, they are injected and another turn runs.
Extend AgentMessage via declaration merging:
declare module "@mariozechner/pi-agent-core" {
interface CustomAgentMessages {
notification: { role: "notification"; text: string; timestamp: number };
}
}
// Now valid
const msg: AgentMessage = { role: "notification", text: "Info", timestamp: Date.now() };Handle custom types in convertToLlm:
const agent = new Agent({
convertToLlm: (messages) => messages.flatMap(m => {
if (m.role === "notification") return []; // Filter out
return [m];
}),
});Define tools using AgentTool:
import { Type } from "@sinclair/typebox";
const readFileTool: AgentTool = {
name: "read_file",
label: "Read File", // For UI display
description: "Read a file's contents",
parameters: Type.Object({
path: Type.String({ description: "File path" }),
}),
execute: async (toolCallId, params, signal, onUpdate) => {
const content = await fs.readFile(params.path, "utf-8");
// Optional: stream progress
onUpdate?.({ content: [{ type: "text", text: "Reading..." }], details: {} });
return {
content: [{ type: "text", text: content }],
details: { path: params.path, size: content.length },
};
},
};
agent.setTools([readFileTool]);Throw an error when a tool fails. Do not return error messages as content.
execute: async (toolCallId, params, signal, onUpdate) => {
if (!fs.existsSync(params.path)) {
throw new Error(`File not found: ${params.path}`);
}
// Return content only on success
return { content: [{ type: "text", text: "..." }] };
}Thrown errors are caught by the agent and reported to the LLM as tool errors with isError: true.
For browser apps that proxy through a backend:
import { Agent, streamProxy } from "@mariozechner/pi-agent-core";
const agent = new Agent({
streamFn: (model, context, options) =>
streamProxy(model, context, {
...options,
authToken: "...",
proxyUrl: "https://your-server.com",
}),
});For direct control without the Agent class:
import { agentLoop, agentLoopContinue } from "@mariozechner/pi-agent-core";
const context: AgentContext = {
systemPrompt: "You are helpful.",
messages: [],
tools: [],
};
const config: AgentLoopConfig = {
model: getModel("openai", "gpt-4o"),
convertToLlm: (msgs) => msgs.filter(m => ["user", "assistant", "toolResult"].includes(m.role)),
toolExecution: "parallel",
beforeToolCall: async ({ toolCall, args, context }) => undefined,
afterToolCall: async ({ toolCall, result, isError, context }) => undefined,
};
const userMessage = { role: "user", content: "Hello", timestamp: Date.now() };
for await (const event of agentLoop([userMessage], context, config)) {
console.log(event.type);
}
// Continue from existing context
for await (const event of agentLoopContinue(context, config)) {
console.log(event.type);
}These low-level streams are observational. They preserve event order, but they do not wait for your async event handling to settle before later producer phases continue. If you need message processing to act as a barrier before tool preflight, use the Agent class instead of raw agentLoop() or agentLoopContinue().
MIT