From e06e6ebd2820d2ddd3b2a44f2ab329ed29a78158 Mon Sep 17 00:00:00 2001 From: Markus Ecker Date: Wed, 23 Jul 2025 14:43:00 +0200 Subject: [PATCH 1/4] allow overlapping events --- .../agent/__tests__/agent-concurrent.test.ts | 527 +++++++++++++ .../__tests__/default.concurrent.test.ts | 642 ++++++++++++++++ .../__tests__/verify.concurrent.test.ts | 691 ++++++++++++++++++ .../verify/__tests__/verify.events.test.ts | 16 +- .../__tests__/verify.text-messages.test.ts | 673 ++++++----------- .../__tests__/verify.tool-calls.test.ts | 577 +++++++++++---- .../packages/client/src/verify/verify.ts | 164 ++--- 7 files changed, 2560 insertions(+), 730 deletions(-) create mode 100644 typescript-sdk/packages/client/src/agent/__tests__/agent-concurrent.test.ts create mode 100644 typescript-sdk/packages/client/src/apply/__tests__/default.concurrent.test.ts create mode 100644 typescript-sdk/packages/client/src/verify/__tests__/verify.concurrent.test.ts diff --git a/typescript-sdk/packages/client/src/agent/__tests__/agent-concurrent.test.ts b/typescript-sdk/packages/client/src/agent/__tests__/agent-concurrent.test.ts new file mode 100644 index 000000000..6caf62175 --- /dev/null +++ b/typescript-sdk/packages/client/src/agent/__tests__/agent-concurrent.test.ts @@ -0,0 +1,527 @@ +import { Observable, Subject } from "rxjs"; +import { AbstractAgent } from "../agent"; +import { + BaseEvent, + EventType, + RunAgentInput, + RunStartedEvent, + RunFinishedEvent, + TextMessageStartEvent, + TextMessageContentEvent, + TextMessageEndEvent, + ToolCallStartEvent, + ToolCallArgsEvent, + ToolCallEndEvent, + StepStartedEvent, + StepFinishedEvent, + Message, + AssistantMessage, +} from "@ag-ui/core"; + +// Mock agent implementation for testing concurrent events +class ConcurrentTestAgent extends AbstractAgent { + public eventsToEmit: BaseEvent[] = []; + public currentEventIndex = 0; + + constructor() { + super(); + this.debug = false; + } + + // Set the events this agent should emit + setEventsToEmit(events: BaseEvent[]) { + this.eventsToEmit = events; + this.currentEventIndex = 0; + } + + protected run(input: RunAgentInput): Observable { + return new Observable((subscriber) => { + // Emit all the pre-configured events + for (const event of this.eventsToEmit) { + subscriber.next(event); + } + subscriber.complete(); + }); + } +} + +describe("Agent concurrent operations integration", () => { + let agent: ConcurrentTestAgent; + + beforeEach(() => { + agent = new ConcurrentTestAgent(); + }); + + // Test: Concurrent text messages through full agent pipeline + it("should handle concurrent text messages through full agent pipeline", async () => { + // Configure events for concurrent text messages + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "First message ", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "Second message ", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "content", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "content", + } as TextMessageContentEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "msg2" } as TextMessageEndEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "msg1" } as TextMessageEndEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify messages were created correctly + expect(result.newMessages.length).toBe(2); + + const msg1 = result.newMessages.find((m) => m.id === "msg1"); + const msg2 = result.newMessages.find((m) => m.id === "msg2"); + + expect(msg1).toBeDefined(); + expect(msg2).toBeDefined(); + expect(msg1?.content).toBe("First message content"); + expect(msg2?.content).toBe("Second message content"); + expect(msg1?.role).toBe("assistant"); + expect(msg2?.role).toBe("assistant"); + }); + + // Test: Concurrent tool calls through full agent pipeline + it("should handle concurrent tool calls through full agent pipeline", async () => { + // Configure events for concurrent tool calls + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "msg1", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + parentMessageId: "msg2", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool2", + delta: '{"expr":', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '"test"}', + } as ToolCallArgsEvent, + { type: EventType.TOOL_CALL_ARGS, toolCallId: "tool2", delta: '"1+1"}' } as ToolCallArgsEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "tool1" } as ToolCallEndEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "tool2" } as ToolCallEndEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify tool call messages were created correctly + expect(result.newMessages.length).toBe(2); + + const msg1 = result.newMessages.find((m) => m.id === "msg1") as AssistantMessage; + const msg2 = result.newMessages.find((m) => m.id === "msg2") as AssistantMessage; + + expect(msg1).toBeDefined(); + expect(msg2).toBeDefined(); + expect(msg1?.toolCalls?.length).toBe(1); + expect(msg2?.toolCalls?.length).toBe(1); + + expect(msg1.toolCalls?.[0]?.id).toBe("tool1"); + expect(msg1.toolCalls?.[0]?.function.name).toBe("search"); + expect(msg1.toolCalls?.[0]?.function.arguments).toBe('{"query":"test"}'); + + expect(msg2.toolCalls?.[0]?.id).toBe("tool2"); + expect(msg2.toolCalls?.[0]?.function.name).toBe("calculate"); + expect(msg2.toolCalls?.[0]?.function.arguments).toBe('{"expr":"1+1"}'); + }); + + // Test: Mixed concurrent text messages and tool calls + it("should handle mixed concurrent text messages and tool calls", async () => { + // Configure events for mixed concurrent operations + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { type: EventType.STEP_STARTED, stepName: "thinking" } as StepStartedEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "thinking", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "search", + toolCallName: "web_search", + parentMessageId: "tool_msg", + } as ToolCallStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "thinking", + delta: "Let me search ", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "search", + delta: '{"query":"', + } as ToolCallArgsEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "thinking", + delta: "for that...", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "search", + delta: 'concurrent"}', + } as ToolCallArgsEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "status", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "status", + delta: "Processing...", + } as TextMessageContentEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "thinking" } as TextMessageEndEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "search" } as ToolCallEndEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "status" } as TextMessageEndEvent, + { type: EventType.STEP_FINISHED, stepName: "thinking" } as StepFinishedEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify all messages were created correctly + expect(result.newMessages.length).toBe(3); + + const thinkingMsg = result.newMessages.find((m) => m.id === "thinking"); + const statusMsg = result.newMessages.find((m) => m.id === "status"); + const toolMsg = result.newMessages.find((m) => m.id === "tool_msg") as AssistantMessage; + + expect(thinkingMsg).toBeDefined(); + expect(statusMsg).toBeDefined(); + expect(toolMsg).toBeDefined(); + + expect(thinkingMsg?.content).toBe("Let me search for that..."); + expect(statusMsg?.content).toBe("Processing..."); + expect(toolMsg?.toolCalls?.length).toBe(1); + expect(toolMsg.toolCalls?.[0]?.function.name).toBe("web_search"); + expect(toolMsg.toolCalls?.[0]?.function.arguments).toBe('{"query":"concurrent"}'); + }); + + // Test: Multiple tool calls on same message through full pipeline + it("should handle multiple tool calls on same message through full pipeline", async () => { + // Configure events for multiple tool calls on same message + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "shared_msg", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + parentMessageId: "shared_msg", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "tool3", + toolCallName: "format", + parentMessageId: "shared_msg", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"q":"a"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool2", + delta: '{"e":"b"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool3", + delta: '{"f":"c"}', + } as ToolCallArgsEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "tool2" } as ToolCallEndEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "tool1" } as ToolCallEndEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "tool3" } as ToolCallEndEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify one message with three tool calls + expect(result.newMessages.length).toBe(1); + + const sharedMsg = result.newMessages[0] as AssistantMessage; + expect(sharedMsg.id).toBe("shared_msg"); + expect(sharedMsg.toolCalls?.length).toBe(3); + + const toolCallIds = sharedMsg.toolCalls?.map((tc) => tc.id).sort(); + expect(toolCallIds).toEqual(["tool1", "tool2", "tool3"]); + + const tool1 = sharedMsg.toolCalls?.find((tc) => tc.id === "tool1"); + const tool2 = sharedMsg.toolCalls?.find((tc) => tc.id === "tool2"); + const tool3 = sharedMsg.toolCalls?.find((tc) => tc.id === "tool3"); + + expect(tool1?.function.name).toBe("search"); + expect(tool2?.function.name).toBe("calculate"); + expect(tool3?.function.name).toBe("format"); + + expect(tool1?.function.arguments).toBe('{"q":"a"}'); + expect(tool2?.function.arguments).toBe('{"e":"b"}'); + expect(tool3?.function.arguments).toBe('{"f":"c"}'); + }); + + // Test: Event ordering is preserved in message creation + it("should preserve event ordering in message creation", async () => { + // Configure events to test ordering + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg3", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "First", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "Second", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg3", + delta: "Third", + } as TextMessageContentEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "msg3" } as TextMessageEndEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "msg1" } as TextMessageEndEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "msg2" } as TextMessageEndEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify all messages exist with correct content + expect(result.newMessages.length).toBe(3); + + // Messages should be in the order they were started + expect(result.newMessages[0].id).toBe("msg1"); + expect(result.newMessages[1].id).toBe("msg2"); + expect(result.newMessages[2].id).toBe("msg3"); + + expect(result.newMessages[0].content).toBe("First"); + expect(result.newMessages[1].content).toBe("Second"); + expect(result.newMessages[2].content).toBe("Third"); + }); + + // Test: High-frequency concurrent events through full pipeline + it("should handle high-frequency concurrent events through full pipeline", async () => { + const numMessages = 5; + const numToolCalls = 5; + const events: BaseEvent[] = []; + + // Build event sequence + events.push({ + type: EventType.RUN_STARTED, + threadId: "test", + runId: "test", + } as RunStartedEvent); + + // Start all messages + for (let i = 0; i < numMessages; i++) { + events.push({ + type: EventType.TEXT_MESSAGE_START, + messageId: `msg${i}`, + role: "assistant", + } as TextMessageStartEvent); + } + + // Start all tool calls + for (let i = 0; i < numToolCalls; i++) { + events.push({ + type: EventType.TOOL_CALL_START, + toolCallId: `tool${i}`, + toolCallName: `tool_${i}`, + parentMessageId: `tool_msg${i}`, + } as ToolCallStartEvent); + } + + // Add content to all messages + for (let i = 0; i < numMessages; i++) { + events.push({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: `msg${i}`, + delta: `Content ${i}`, + } as TextMessageContentEvent); + } + + // Add args to all tool calls + for (let i = 0; i < numToolCalls; i++) { + events.push({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: `tool${i}`, + delta: `{"param":"value${i}"}`, + } as ToolCallArgsEvent); + } + + // End all messages + for (let i = numMessages - 1; i >= 0; i--) { + events.push({ + type: EventType.TEXT_MESSAGE_END, + messageId: `msg${i}`, + } as TextMessageEndEvent); + } + + // End all tool calls + for (let i = numToolCalls - 1; i >= 0; i--) { + events.push({ + type: EventType.TOOL_CALL_END, + toolCallId: `tool${i}`, + } as ToolCallEndEvent); + } + + events.push({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify all messages and tool calls were processed + expect(result.newMessages.length).toBe(numMessages + numToolCalls); + + // Verify text messages + for (let i = 0; i < numMessages; i++) { + const msg = result.newMessages.find((m) => m.id === `msg${i}`); + expect(msg).toBeDefined(); + expect(msg?.content).toBe(`Content ${i}`); + } + + // Verify tool call messages + for (let i = 0; i < numToolCalls; i++) { + const toolMsg = result.newMessages.find((m) => m.id === `tool_msg${i}`) as AssistantMessage; + expect(toolMsg).toBeDefined(); + expect(toolMsg?.toolCalls?.length).toBe(1); + expect(toolMsg.toolCalls?.[0]?.id).toBe(`tool${i}`); + expect(toolMsg.toolCalls?.[0]?.function.arguments).toBe(`{"param":"value${i}"}`); + } + }); + + // Test: Concurrent events with steps + it("should handle concurrent events with lifecycle steps", async () => { + const events: BaseEvent[] = [ + { type: EventType.RUN_STARTED, threadId: "test", runId: "test" } as RunStartedEvent, + { type: EventType.STEP_STARTED, stepName: "analysis" } as StepStartedEvent, + { + type: EventType.TEXT_MESSAGE_START, + messageId: "thinking", + role: "assistant", + } as TextMessageStartEvent, + { type: EventType.STEP_STARTED, stepName: "search" } as StepStartedEvent, + { + type: EventType.TOOL_CALL_START, + toolCallId: "search_tool", + toolCallName: "search", + parentMessageId: "tool_msg", + } as ToolCallStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "thinking", + delta: "Analyzing...", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + toolCallId: "search_tool", + delta: '{"query":"test"}', + } as ToolCallArgsEvent, + { type: EventType.STEP_FINISHED, stepName: "search" } as StepFinishedEvent, + { type: EventType.TEXT_MESSAGE_END, messageId: "thinking" } as TextMessageEndEvent, + { type: EventType.TOOL_CALL_END, toolCallId: "search_tool" } as ToolCallEndEvent, + { type: EventType.STEP_FINISHED, stepName: "analysis" } as StepFinishedEvent, + { type: EventType.RUN_FINISHED } as RunFinishedEvent, + ]; + + agent.setEventsToEmit(events); + + // Run the agent + const result = await agent.runAgent(); + + // Verify messages were created correctly even with concurrent steps + expect(result.newMessages.length).toBe(2); + + const thinkingMsg = result.newMessages.find((m) => m.id === "thinking"); + const toolMsg = result.newMessages.find((m) => m.id === "tool_msg") as AssistantMessage; + + expect(thinkingMsg?.content).toBe("Analyzing..."); + expect(toolMsg?.toolCalls?.length).toBe(1); + expect(toolMsg.toolCalls?.[0]?.function.name).toBe("search"); + }); +}); diff --git a/typescript-sdk/packages/client/src/apply/__tests__/default.concurrent.test.ts b/typescript-sdk/packages/client/src/apply/__tests__/default.concurrent.test.ts new file mode 100644 index 000000000..49cad4552 --- /dev/null +++ b/typescript-sdk/packages/client/src/apply/__tests__/default.concurrent.test.ts @@ -0,0 +1,642 @@ +import { Subject } from "rxjs"; +import { toArray } from "rxjs/operators"; +import { firstValueFrom } from "rxjs"; +import { defaultApplyEvents } from "../default"; +import { + BaseEvent, + EventType, + RunAgentInput, + RunStartedEvent, + RunFinishedEvent, + TextMessageStartEvent, + TextMessageContentEvent, + TextMessageEndEvent, + ToolCallStartEvent, + ToolCallArgsEvent, + ToolCallEndEvent, + Message, + AssistantMessage, +} from "@ag-ui/core"; +import { AbstractAgent } from "../../agent"; + +// Mock agent for testing +const FAKE_AGENT = { + messages: [], + state: {}, + agentId: "test-agent", +} as unknown as AbstractAgent; + +describe("defaultApplyEvents concurrent operations", () => { + // Test: Concurrent text messages should create separate messages + it("should handle concurrent text messages correctly", async () => { + // Create a subject and state for events + const events$ = new Subject(); + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + // Send events for concurrent text messages + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Start two concurrent text messages + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + role: "assistant", + } as TextMessageStartEvent); + + // Send content for both messages + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "First message content", + } as TextMessageContentEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "Second message content", + } as TextMessageContentEvent); + + // End messages in reverse order + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg2", + } as TextMessageEndEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Verify we have the expected number of state updates + expect(stateUpdates.length).toBeGreaterThan(0); + + // Check final state has both messages + const finalState = stateUpdates[stateUpdates.length - 1]; + expect(finalState.messages?.length).toBe(2); + + // Verify messages have correct IDs and content + const msg1 = finalState.messages?.find((m) => m.id === "msg1"); + const msg2 = finalState.messages?.find((m) => m.id === "msg2"); + + expect(msg1).toBeDefined(); + expect(msg2).toBeDefined(); + expect(msg1?.content).toBe("First message content"); + expect(msg2?.content).toBe("Second message content"); + expect(msg1?.role).toBe("assistant"); + expect(msg2?.role).toBe("assistant"); + }); + + // Test: Concurrent tool calls should create separate tool calls + it("should handle concurrent tool calls correctly", async () => { + // Create a subject and state for events + const events$ = new Subject(); + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + // Send events for concurrent tool calls + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Start two concurrent tool calls + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "msg1", + } as ToolCallStartEvent); + + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + parentMessageId: "msg2", + } as ToolCallStartEvent); + + // Send args for both tool calls + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test search"}', + } as ToolCallArgsEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool2", + delta: '{"expression":"1+1"}', + } as ToolCallArgsEvent); + + // End tool calls in reverse order + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool2", + } as ToolCallEndEvent); + + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Verify we have the expected number of state updates + expect(stateUpdates.length).toBeGreaterThan(0); + + // Check final state has both messages with tool calls + const finalState = stateUpdates[stateUpdates.length - 1]; + expect(finalState.messages?.length).toBe(2); + + // Verify tool calls are properly attached to messages + const msg1 = finalState.messages?.find((m) => m.id === "msg1") as AssistantMessage; + const msg2 = finalState.messages?.find((m) => m.id === "msg2") as AssistantMessage; + + expect(msg1).toBeDefined(); + expect(msg2).toBeDefined(); + expect(msg1?.toolCalls?.length).toBe(1); + expect(msg2?.toolCalls?.length).toBe(1); + + // Verify tool call details + expect(msg1.toolCalls?.[0]?.id).toBe("tool1"); + expect(msg1.toolCalls?.[0]?.function.name).toBe("search"); + expect(msg1.toolCalls?.[0]?.function.arguments).toBe('{"query":"test search"}'); + + expect(msg2.toolCalls?.[0]?.id).toBe("tool2"); + expect(msg2.toolCalls?.[0]?.function.name).toBe("calculate"); + expect(msg2.toolCalls?.[0]?.function.arguments).toBe('{"expression":"1+1"}'); + }); + + // Test: Mixed concurrent messages and tool calls + it("should handle mixed concurrent text messages and tool calls", async () => { + // Create a subject and state for events + const events$ = new Subject(); + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + // Send mixed concurrent events + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Start a text message + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "thinking_msg", + role: "assistant", + } as TextMessageStartEvent); + + // Start a tool call while message is active + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "search_tool", + toolCallName: "web_search", + parentMessageId: "tool_msg", + } as ToolCallStartEvent); + + // Add content to text message + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "thinking_msg", + delta: "Let me search for that information...", + } as TextMessageContentEvent); + + // Add args to tool call + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "search_tool", + delta: '{"query":"concurrent events"}', + } as ToolCallArgsEvent); + + // Start another text message + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "status_msg", + role: "assistant", + } as TextMessageStartEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "status_msg", + delta: "Processing your request...", + } as TextMessageContentEvent); + + // End everything in mixed order + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "thinking_msg", + } as TextMessageEndEvent); + + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "search_tool", + } as ToolCallEndEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "status_msg", + } as TextMessageEndEvent); + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Check final state + const finalState = stateUpdates[stateUpdates.length - 1]; + expect(finalState.messages?.length).toBe(3); + + // Verify all messages are present + const thinkingMsg = finalState.messages?.find((m) => m.id === "thinking_msg"); + const toolMsg = finalState.messages?.find((m) => m.id === "tool_msg") as AssistantMessage; + const statusMsg = finalState.messages?.find((m) => m.id === "status_msg"); + + expect(thinkingMsg).toBeDefined(); + expect(toolMsg).toBeDefined(); + expect(statusMsg).toBeDefined(); + + expect(thinkingMsg?.content).toBe("Let me search for that information..."); + expect(statusMsg?.content).toBe("Processing your request..."); + expect(toolMsg?.toolCalls?.length).toBe(1); + expect(toolMsg.toolCalls?.[0]?.function.name).toBe("web_search"); + }); + + // Test: Multiple tool calls on the same message + it("should handle multiple tool calls on the same parent message", async () => { + // Create a subject and state for events + const events$ = new Subject(); + + // Create initial state with an existing message + const parentMessageId = "parent_msg"; + const initialState: RunAgentInput = { + messages: [ + { + id: parentMessageId, + role: "assistant", + content: "I'll help you with multiple tools.", + toolCalls: [], + }, + ], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + // Send events for multiple tool calls on the same message + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Start multiple tool calls concurrently with the same parent message + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: parentMessageId, + } as ToolCallStartEvent); + + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + parentMessageId: parentMessageId, + } as ToolCallStartEvent); + + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool3", + toolCallName: "format", + parentMessageId: parentMessageId, + } as ToolCallStartEvent); + + // Send args for all tool calls + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool2", + delta: '{"expression":"2*3"}', + } as ToolCallArgsEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool3", + delta: '{"format":"json"}', + } as ToolCallArgsEvent); + + // End all tool calls + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool2", + } as ToolCallEndEvent); + + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool3", + } as ToolCallEndEvent); + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Check final state - should still have only one message with 3 tool calls + const finalState = stateUpdates[stateUpdates.length - 1]; + expect(finalState.messages?.length).toBe(1); + + const parentMsg = finalState.messages?.[0] as AssistantMessage; + expect(parentMsg.id).toBe(parentMessageId); + expect(parentMsg.toolCalls?.length).toBe(3); + + // Verify all tool calls are present + const toolCallIds = parentMsg.toolCalls?.map((tc) => tc.id).sort(); + expect(toolCallIds).toEqual(["tool1", "tool2", "tool3"]); + + // Verify tool call details + const searchTool = parentMsg.toolCalls?.find((tc) => tc.id === "tool1"); + const calcTool = parentMsg.toolCalls?.find((tc) => tc.id === "tool2"); + const formatTool = parentMsg.toolCalls?.find((tc) => tc.id === "tool3"); + + expect(searchTool?.function.name).toBe("search"); + expect(calcTool?.function.name).toBe("calculate"); + expect(formatTool?.function.name).toBe("format"); + + expect(searchTool?.function.arguments).toBe('{"query":"test"}'); + expect(calcTool?.function.arguments).toBe('{"expression":"2*3"}'); + expect(formatTool?.function.arguments).toBe('{"format":"json"}'); + }); + + // Test: High-frequency concurrent events + it("should handle high-frequency concurrent events", async () => { + // Create a subject and state for events + const events$ = new Subject(); + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Create many concurrent messages and tool calls + const numMessages = 10; + const numToolCalls = 10; + + // Start all messages + for (let i = 0; i < numMessages; i++) { + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: `msg${i}`, + role: "assistant", + } as TextMessageStartEvent); + } + + // Start all tool calls + for (let i = 0; i < numToolCalls; i++) { + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: `tool${i}`, + toolCallName: `tool_${i}`, + parentMessageId: `tool_msg${i}`, + } as ToolCallStartEvent); + } + + // Send content for all messages + for (let i = 0; i < numMessages; i++) { + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: `msg${i}`, + delta: `Content for message ${i}`, + } as TextMessageContentEvent); + } + + // Send args for all tool calls + for (let i = 0; i < numToolCalls; i++) { + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: `tool${i}`, + delta: `{"param${i}":"value${i}"}`, + } as ToolCallArgsEvent); + } + + // End all in reverse order + for (let i = numMessages - 1; i >= 0; i--) { + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: `msg${i}`, + } as TextMessageEndEvent); + } + + for (let i = numToolCalls - 1; i >= 0; i--) { + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: `tool${i}`, + } as ToolCallEndEvent); + } + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Check final state + const finalState = stateUpdates[stateUpdates.length - 1]; + + // Should have numMessages + numToolCalls messages total + expect(finalState.messages?.length).toBe(numMessages + numToolCalls); + + // Verify all text messages are present with correct content + for (let i = 0; i < numMessages; i++) { + const msg = finalState.messages?.find((m) => m.id === `msg${i}`); + expect(msg).toBeDefined(); + expect(msg?.content).toBe(`Content for message ${i}`); + expect(msg?.role).toBe("assistant"); + } + + // Verify all tool call messages are present with correct tool calls + for (let i = 0; i < numToolCalls; i++) { + const toolMsg = finalState.messages?.find((m) => m.id === `tool_msg${i}`) as AssistantMessage; + expect(toolMsg).toBeDefined(); + expect(toolMsg?.toolCalls?.length).toBe(1); + expect(toolMsg.toolCalls?.[0]?.id).toBe(`tool${i}`); + expect(toolMsg.toolCalls?.[0]?.function.name).toBe(`tool_${i}`); + expect(toolMsg.toolCalls?.[0]?.function.arguments).toBe(`{"param${i}":"value${i}"}`); + } + }); + + // Test: Interleaved content and args updates + it("should handle interleaved content and args updates correctly", async () => { + // Create a subject and state for events + const events$ = new Subject(); + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + // Create the observable stream + const result$ = defaultApplyEvents(initialState, events$, FAKE_AGENT, []); + + // Collect all emitted state updates in an array + const stateUpdatesPromise = firstValueFrom(result$.pipe(toArray())); + + events$.next({ type: EventType.RUN_STARTED } as RunStartedEvent); + + // Start concurrent message and tool call + events$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent); + + events$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "tool_msg1", + } as ToolCallStartEvent); + + // Interleave content and args updates + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "Searching ", + } as TextMessageContentEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"que', + } as ToolCallArgsEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "for ", + } as TextMessageContentEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: 'ry":"', + } as ToolCallArgsEvent); + + events$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "information...", + } as TextMessageContentEvent); + + events$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: 'test"}', + } as ToolCallArgsEvent); + + // End both + events$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + + events$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + // Complete the events stream + events$.complete(); + + // Wait for all state updates + const stateUpdates = await stateUpdatesPromise; + + // Check final state + const finalState = stateUpdates[stateUpdates.length - 1]; + expect(finalState.messages?.length).toBe(2); + + // Verify text message content is assembled correctly + const textMsg = finalState.messages?.find((m) => m.id === "msg1"); + expect(textMsg?.content).toBe("Searching for information..."); + + // Verify tool call args are assembled correctly + const toolMsg = finalState.messages?.find((m) => m.id === "tool_msg1") as AssistantMessage; + expect(toolMsg?.toolCalls?.[0]?.function.arguments).toBe('{"query":"test"}'); + }); +}); diff --git a/typescript-sdk/packages/client/src/verify/__tests__/verify.concurrent.test.ts b/typescript-sdk/packages/client/src/verify/__tests__/verify.concurrent.test.ts new file mode 100644 index 000000000..f6c115732 --- /dev/null +++ b/typescript-sdk/packages/client/src/verify/__tests__/verify.concurrent.test.ts @@ -0,0 +1,691 @@ +import { Subject } from "rxjs"; +import { toArray, catchError } from "rxjs/operators"; +import { firstValueFrom } from "rxjs"; +import { verifyEvents } from "../verify"; +import { + BaseEvent, + EventType, + AGUIError, + RunStartedEvent, + RunFinishedEvent, + TextMessageStartEvent, + TextMessageContentEvent, + TextMessageEndEvent, + ToolCallStartEvent, + ToolCallArgsEvent, + ToolCallEndEvent, + StepStartedEvent, + StepFinishedEvent, +} from "@ag-ui/core"; + +describe("verifyEvents concurrent operations", () => { + // Test: Concurrent text messages with different IDs should be allowed + it("should allow concurrent text messages with different IDs", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send concurrent text messages + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Start first message + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + // Start second message before first one ends + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + } as TextMessageStartEvent); + + // Content for both messages + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "Content for message 1", + } as TextMessageContentEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "Content for message 2", + } as TextMessageContentEvent); + + // End messages in different order + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg2", + } as TextMessageEndEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(8); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[1].type).toBe(EventType.TEXT_MESSAGE_START); + expect(result[2].type).toBe(EventType.TEXT_MESSAGE_START); + expect(result[7].type).toBe(EventType.RUN_FINISHED); + }); + + // Test: Concurrent tool calls with different IDs should be allowed + it("should allow concurrent tool calls with different IDs", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send concurrent tool calls + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Start first tool call + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + + // Start second tool call before first one ends + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + } as ToolCallStartEvent); + + // Args for both tool calls + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool2", + delta: '{"expression":"1+1"}', + } as ToolCallArgsEvent); + + // End tool calls in different order + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool2", + } as ToolCallEndEvent); + + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(8); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[1].type).toBe(EventType.TOOL_CALL_START); + expect(result[2].type).toBe(EventType.TOOL_CALL_START); + expect(result[7].type).toBe(EventType.RUN_FINISHED); + }); + + // Test: Overlapping text messages and tool calls should be allowed + it("should allow overlapping text messages and tool calls", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send overlapping text messages and tool calls + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Start a text message + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + // Start a tool call while message is active + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + + // Send content for both + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "Thinking...", + } as TextMessageContentEvent); + + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + + // Start another message while tool call is active + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + } as TextMessageStartEvent); + + // End in various orders + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg2", + delta: "Based on the search...", + } as TextMessageContentEvent); + + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg2", + } as TextMessageEndEvent); + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(11); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[10].type).toBe(EventType.RUN_FINISHED); + }); + + // Test: Steps and other lifecycle events should be allowed during concurrent messages/tool calls + it("should allow lifecycle events during concurrent messages and tool calls", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Start a step + source$.next({ + type: EventType.STEP_STARTED, + stepName: "search_step", + } as StepStartedEvent); + + // Start messages and tool calls within the step + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + + // Lifecycle events should be allowed + source$.next({ + type: EventType.STEP_STARTED, + stepName: "analysis_step", + } as StepStartedEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "Searching...", + } as TextMessageContentEvent); + + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + + // End everything + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + + source$.next({ + type: EventType.STEP_FINISHED, + stepName: "analysis_step", + } as StepFinishedEvent); + + source$.next({ + type: EventType.STEP_FINISHED, + stepName: "search_step", + } as StepFinishedEvent); + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(12); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[11].type).toBe(EventType.RUN_FINISHED); + }); + + // Test: Should reject duplicate message ID starts + it("should reject starting a text message with an ID already in progress", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'TEXT_MESSAGE_START' event: A text message with ID 'msg1' is already in progress`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + // Try to start the same message ID again + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(2); + }); + + // Test: Should reject duplicate tool call ID starts + it("should reject starting a tool call with an ID already in progress", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'TOOL_CALL_START' event: A tool call with ID 'tool1' is already in progress`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + + // Try to start the same tool call ID again + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "calculate", + } as ToolCallStartEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(2); + }); + + // Test: Should reject content for non-existent message ID + it("should reject content for non-existent message ID", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID 'nonexistent'`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Try to send content for a message that was never started + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "nonexistent", + delta: "test content", + } as TextMessageContentEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(1); + }); + + // Test: Should reject args for non-existent tool call ID + it("should reject args for non-existent tool call ID", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found with ID 'nonexistent'`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Try to send args for a tool call that was never started + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "nonexistent", + delta: '{"test":"value"}', + } as ToolCallArgsEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(1); + }); + + // Test: Should reject RUN_FINISHED while messages are still active + it("should reject RUN_FINISHED while text messages are still active", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'RUN_FINISHED' while text messages are still active: msg1, msg2`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg2", + } as TextMessageStartEvent); + + // Try to finish run while messages are still active + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(3); + }); + + // Test: Should reject RUN_FINISHED while tool calls are still active + it("should reject RUN_FINISHED while tool calls are still active", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain( + `Cannot send 'RUN_FINISHED' while tool calls are still active: tool1, tool2`, + ); + subscription.unsubscribe(); + }, + }); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool2", + toolCallName: "calculate", + } as ToolCallStartEvent); + + // Try to finish run while tool calls are still active + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source and wait for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify only events before the error were processed + expect(events.length).toBe(3); + }); + + // Test: Complex concurrent scenario with high frequency events + it("should handle complex concurrent scenario with many overlapping events", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Start multiple concurrent messages and tool calls + const messageIds = ["msg1", "msg2", "msg3", "msg4", "msg5"]; + const toolCallIds = ["tool1", "tool2", "tool3", "tool4", "tool5"]; + + // Start all messages + for (const msgId of messageIds) { + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: msgId, + } as TextMessageStartEvent); + } + + // Start all tool calls + for (const toolId of toolCallIds) { + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: toolId, + toolCallName: "test_tool", + } as ToolCallStartEvent); + } + + // Send content/args in random order + for (let i = 0; i < 3; i++) { + for (const msgId of messageIds) { + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: msgId, + delta: `Content ${i} for ${msgId}`, + } as TextMessageContentEvent); + } + + for (const toolId of toolCallIds) { + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: toolId, + delta: `{"step":${i}}`, + } as ToolCallArgsEvent); + } + } + + // End all in reverse order + for (const msgId of [...messageIds].reverse()) { + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: msgId, + } as TextMessageEndEvent); + } + + for (const toolId of [...toolCallIds].reverse()) { + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: toolId, + } as ToolCallEndEvent); + } + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify we have the expected number of events: + // 1 RUN_STARTED + 5 MSG_START + 5 TOOL_START + 15 MSG_CONTENT + 15 TOOL_ARGS + 5 MSG_END + 5 TOOL_END + 1 RUN_FINISHED = 52 + expect(result.length).toBe(52); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[51].type).toBe(EventType.RUN_FINISHED); + }); +}); diff --git a/typescript-sdk/packages/client/src/verify/__tests__/verify.events.test.ts b/typescript-sdk/packages/client/src/verify/__tests__/verify.events.test.ts index 09307bf83..2303d9e09 100644 --- a/typescript-sdk/packages/client/src/verify/__tests__/verify.events.test.ts +++ b/typescript-sdk/packages/client/src/verify/__tests__/verify.events.test.ts @@ -36,7 +36,7 @@ describe("verifyEvents general validation", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TEXT_MESSAGE_CONTENT' event: Message ID mismatch. The ID 'different-id' doesn't match the active message ID 'msg1'.`, + `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID 'different-id'. Start a text message with 'TEXT_MESSAGE_START' first.`, ); subscription.unsubscribe(); }, @@ -80,7 +80,7 @@ describe("verifyEvents general validation", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TEXT_MESSAGE_END' event: No active text message found. A 'TEXT_MESSAGE_START' event must be sent first.`, + `Cannot send 'TEXT_MESSAGE_END' event: No active text message found with ID 'msg1'. A 'TEXT_MESSAGE_START' event must be sent first.`, ); subscription.unsubscribe(); }, @@ -119,7 +119,7 @@ describe("verifyEvents general validation", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TOOL_CALL_ARGS' event: Tool call ID mismatch. The ID 'different-id' doesn't match the active tool call ID 't1'.`, + `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found with ID 'different-id'. Start a tool call with 'TOOL_CALL_START' first.`, ); subscription.unsubscribe(); }, @@ -164,7 +164,7 @@ describe("verifyEvents general validation", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TOOL_CALL_END' event: No active tool call found. A 'TOOL_CALL_START' event must be sent first.`, + `Cannot send 'TOOL_CALL_END' event: No active tool call found with ID 't1'. A 'TOOL_CALL_START' event must be sent first.`, ); subscription.unsubscribe(); }, @@ -399,7 +399,7 @@ describe("verifyEvents events", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TEXT_MESSAGE_CONTENT' event: Message ID mismatch. The ID 'different-id' doesn't match the active message ID 'msg1'.`, + `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID 'different-id'. Start a text message with 'TEXT_MESSAGE_START' first.`, ); subscription.unsubscribe(); }, @@ -443,7 +443,7 @@ describe("verifyEvents events", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TEXT_MESSAGE_END' event: No active text message found. A 'TEXT_MESSAGE_START' event must be sent first.`, + `Cannot send 'TEXT_MESSAGE_END' event: No active text message found with ID 'msg1'. A 'TEXT_MESSAGE_START' event must be sent first.`, ); subscription.unsubscribe(); }, @@ -482,7 +482,7 @@ describe("verifyEvents events", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TOOL_CALL_ARGS' event: Tool call ID mismatch. The ID 'different-id' doesn't match the active tool call ID 't1'.`, + `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found with ID 'different-id'. Start a tool call with 'TOOL_CALL_START' first.`, ); subscription.unsubscribe(); }, @@ -527,7 +527,7 @@ describe("verifyEvents events", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TOOL_CALL_END' event: No active tool call found. A 'TOOL_CALL_START' event must be sent first.`, + `Cannot send 'TOOL_CALL_END' event: No active tool call found with ID 't1'. A 'TOOL_CALL_START' event must be sent first.`, ); subscription.unsubscribe(); }, diff --git a/typescript-sdk/packages/client/src/verify/__tests__/verify.text-messages.test.ts b/typescript-sdk/packages/client/src/verify/__tests__/verify.text-messages.test.ts index c73e880c7..e0eeab844 100644 --- a/typescript-sdk/packages/client/src/verify/__tests__/verify.text-messages.test.ts +++ b/typescript-sdk/packages/client/src/verify/__tests__/verify.text-messages.test.ts @@ -25,8 +25,8 @@ import { } from "@ag-ui/core"; describe("verifyEvents text messages", () => { - // Test: Cannot send lifecycle events inside a text message - it("should not allow lifecycle events inside a text message", async () => { + // Test: Cannot send TEXT_MESSAGE_CONTENT before TEXT_MESSAGE_START + it("should not allow TEXT_MESSAGE_CONTENT before TEXT_MESSAGE_START", async () => { const source$ = new Subject(); const events: BaseEvent[] = []; @@ -36,40 +36,37 @@ describe("verifyEvents text messages", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send event type 'STEP_STARTED' after 'TEXT_MESSAGE_START'`, + `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID '1'`, ); subscription.unsubscribe(); }, }); - // Start a valid run and open a text message + // Start a valid run source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", runId: "test-run-id", } as RunStartedEvent); - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "1", - } as TextMessageStartEvent); - // Try to send a lifecycle event inside the text message + // Try to send content without starting a text message source$.next({ - type: EventType.STEP_STARTED, - stepName: "step1", - } as StepStartedEvent); + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "1", + delta: "content 1", + } as TextMessageContentEvent); // Complete the source and wait a bit for processing source$.complete(); await new Promise((resolve) => setTimeout(resolve, 100)); // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + expect(events.length).toBe(1); + expect(events[0].type).toBe(EventType.RUN_STARTED); }); - // Test: Cannot send tool-related events inside a text message - it("should not allow tool-related events inside a text message", async () => { + // Test: Cannot send TEXT_MESSAGE_END before TEXT_MESSAGE_START + it("should not allow TEXT_MESSAGE_END before TEXT_MESSAGE_START", async () => { const source$ = new Subject(); const events: BaseEvent[] = []; @@ -79,37 +76,32 @@ describe("verifyEvents text messages", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send event type 'TOOL_CALL_START' after 'TEXT_MESSAGE_START'`, + `Cannot send 'TEXT_MESSAGE_END' event: No active text message found with ID '1'`, ); subscription.unsubscribe(); }, }); - // Start a valid run and open a text message + // Start a valid run source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", runId: "test-run-id", } as RunStartedEvent); - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "1", - } as TextMessageStartEvent); - // Try to send a tool-related event inside the text message + // Try to end a text message without starting it source$.next({ - type: EventType.TOOL_CALL_START, - toolCallId: "t1", - toolCallName: "test-tool", - } as ToolCallStartEvent); + type: EventType.TEXT_MESSAGE_END, + messageId: "1", + } as TextMessageEndEvent); // Complete the source and wait a bit for processing source$.complete(); await new Promise((resolve) => setTimeout(resolve, 100)); // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + expect(events.length).toBe(1); + expect(events[0].type).toBe(EventType.RUN_STARTED); }); // Test: Should allow TEXT_MESSAGE_CONTENT inside a text message @@ -219,22 +211,21 @@ describe("verifyEvents text messages", () => { expect(result[3].type).toBe(EventType.RAW); }); - // Test: Should not allow CUSTOM inside a text message - it("should not allow CUSTOM inside a text message", async () => { + // Test: Should allow CUSTOM inside a text message + it("should allow CUSTOM inside a text message", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain(`Cannot send event type 'CUSTOM' after 'TEXT_MESSAGE_START'`); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a text message + // Send a valid sequence with a custom event inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -244,84 +235,48 @@ describe("verifyEvents text messages", () => { type: EventType.TEXT_MESSAGE_START, messageId: "1", } as TextMessageStartEvent); - - // Try to send a meta event inside the text message + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "1", + delta: "test content", + } as TextMessageContentEvent); source$.next({ type: EventType.CUSTOM, - name: "PredictState", - value: [{ state_key: "test", tool: "test-tool" }], + name: "test_event", + value: "test_value", } as CustomEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); - }); - - // Test: Should not allow STATE_SNAPSHOT inside a text message - it("should not allow STATE_SNAPSHOT inside a text message", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'STATE_SNAPSHOT' after 'TEXT_MESSAGE_START'`, - ); - subscription.unsubscribe(); - }, - }); - - // Start a valid run and open a text message source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); - source$.next({ - type: EventType.TEXT_MESSAGE_START, + type: EventType.TEXT_MESSAGE_END, messageId: "1", - } as TextMessageStartEvent); - - // Try to send a state snapshot inside the text message - source$.next({ - type: EventType.STATE_SNAPSHOT, - snapshot: { test: true }, - } as StateSnapshotEvent); + } as TextMessageEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.CUSTOM); }); - // Test: Should not allow STATE_DELTA inside a text message - it("should not allow STATE_DELTA inside a text message", async () => { + // Test: Should allow STATE_SNAPSHOT inside a text message + it("should allow STATE_SNAPSHOT inside a text message", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'STATE_DELTA' after 'TEXT_MESSAGE_START'`, - ); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a text message + // Send a valid sequence with a state snapshot inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -331,123 +286,50 @@ describe("verifyEvents text messages", () => { type: EventType.TEXT_MESSAGE_START, messageId: "1", } as TextMessageStartEvent); - - // Try to send a state delta inside the text message - source$.next({ - type: EventType.STATE_DELTA, - delta: [{ op: "add", path: "/test", value: true }], - } as StateDeltaEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); - }); - - // Test: Should not allow MESSAGES_SNAPSHOT inside a text message - it("should not allow MESSAGES_SNAPSHOT inside a text message", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'MESSAGES_SNAPSHOT' after 'TEXT_MESSAGE_START'`, - ); - subscription.unsubscribe(); - }, - }); - - // Start a valid run and open a text message - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); source$.next({ - type: EventType.TEXT_MESSAGE_START, + type: EventType.TEXT_MESSAGE_CONTENT, messageId: "1", - } as TextMessageStartEvent); - - // Try to send a messages snapshot inside the text message + delta: "test content", + } as TextMessageContentEvent); source$.next({ - type: EventType.MESSAGES_SNAPSHOT, - messages: [{ role: "user", content: "test" }], - } as MessagesSnapshotEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); - }); - - // Test: Cannot send RUN_FINISHED inside a text message - it("should not allow RUN_FINISHED inside a text message", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'RUN_FINISHED' after 'TEXT_MESSAGE_START'`, - ); - subscription.unsubscribe(); + type: EventType.STATE_SNAPSHOT, + snapshot: { + state: "test_state", + data: { foo: "bar" }, }, - }); - - // Start a valid run and open a text message - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); + } as StateSnapshotEvent); source$.next({ - type: EventType.TEXT_MESSAGE_START, + type: EventType.TEXT_MESSAGE_END, messageId: "1", - } as TextMessageStartEvent); - - // Try to send RUN_FINISHED inside the text message + } as TextMessageEndEvent); source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.STATE_SNAPSHOT); }); - // NEW TEST: Missing TEXT_MESSAGE_END - it("should not allow RUN_FINISHED when a text message hasn't been closed", async () => { + // Test: Should allow STATE_DELTA inside a text message + it("should allow STATE_DELTA inside a text message", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - "Cannot send event type 'RUN_FINISHED' after 'TEXT_MESSAGE_START': Send 'TEXT_MESSAGE_END' first.", - ); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a text message + // Send a valid sequence with a state delta inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -460,82 +342,44 @@ describe("verifyEvents text messages", () => { source$.next({ type: EventType.TEXT_MESSAGE_CONTENT, messageId: "1", - delta: "content 1", + delta: "test content", } as TextMessageContentEvent); - - // Try to end the run without closing the text message - source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(3); - expect(events[2].type).toBe(EventType.TEXT_MESSAGE_CONTENT); - }); - - // NEW TEST: Nesting text messages - it("should not allow nested text messages", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - "Cannot send event type 'TEXT_MESSAGE_START' after 'TEXT_MESSAGE_START': Send 'TEXT_MESSAGE_END' first.", - ); - subscription.unsubscribe(); - }, - }); - - // Start a valid run and open a text message source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); + type: EventType.STATE_DELTA, + delta: [{ op: "add", path: "/result", value: "success" }], + } as StateDeltaEvent); source$.next({ - type: EventType.TEXT_MESSAGE_START, + type: EventType.TEXT_MESSAGE_END, messageId: "1", - } as TextMessageStartEvent); - - // Try to start a nested text message - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "2", - } as TextMessageStartEvent); + } as TextMessageEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.STATE_DELTA); }); - // NEW TEST: Mismatched message IDs - it("should not allow text message content with mismatched IDs", async () => { + // Test: Should allow MESSAGES_SNAPSHOT inside a text message + it("should allow MESSAGES_SNAPSHOT inside a text message", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - "Cannot send 'TEXT_MESSAGE_CONTENT' event: Message ID mismatch. The ID '2' doesn't match the active message ID '1'.", - ); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a text message with ID "1" + // Send a valid sequence with a messages snapshot inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -545,147 +389,47 @@ describe("verifyEvents text messages", () => { type: EventType.TEXT_MESSAGE_START, messageId: "1", } as TextMessageStartEvent); - - // Try to send content with a different ID - source$.next({ - type: EventType.TEXT_MESSAGE_CONTENT, - messageId: "2", - delta: "content 2", - } as TextMessageContentEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); - }); - - // NEW TEST: TEXT_MESSAGE_CONTENT before START - it("should not allow text message content without a prior start event", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - "Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found. Start a text message with 'TEXT_MESSAGE_START' first.", - ); - subscription.unsubscribe(); - }, - }); - - // Start a valid run but skip starting a text message - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); - - // Try to send content without starting a message source$.next({ type: EventType.TEXT_MESSAGE_CONTENT, messageId: "1", - delta: "content 1", + delta: "test content", } as TextMessageContentEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(1); - expect(events[0].type).toBe(EventType.RUN_STARTED); - }); - - // NEW TEST: TEXT_MESSAGE_END before START - it("should not allow ending a text message that was never started", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - "Cannot send 'TEXT_MESSAGE_END' event: No active text message found. A 'TEXT_MESSAGE_START' event must be sent first.", - ); - subscription.unsubscribe(); - }, - }); - - // Start a valid run but skip starting a text message source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); - - // Try to end a message that was never started + type: EventType.MESSAGES_SNAPSHOT, + messages: [{ role: "user", content: "test", id: "test-id" }], + } as MessagesSnapshotEvent); source$.next({ type: EventType.TEXT_MESSAGE_END, messageId: "1", } as TextMessageEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(1); - expect(events[0].type).toBe(EventType.RUN_STARTED); - }); - - // NEW TEST: Starting text message outside of a run - it("should not allow starting a text message before RUN_STARTED", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain("First event must be 'RUN_STARTED'"); - subscription.unsubscribe(); - }, - }); - - // Try to start a text message before RUN_STARTED - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "1", - } as TextMessageStartEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); + // Await the promise and expect no errors + const result = await promise; - // Verify no events were processed - expect(events.length).toBe(0); + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.MESSAGES_SNAPSHOT); }); - // NEW TEST: Mismatched IDs for TEXT_MESSAGE_END - it("should not allow text message end with mismatched ID", async () => { + // Test: Should allow lifecycle events (STEP_STARTED/STEP_FINISHED) during text messages + it("should allow lifecycle events during text messages", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain("Cannot send 'TEXT_MESSAGE_END' event: Message ID mismatch"); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a text message with ID "1" + // Send a valid sequence with lifecycle events inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -695,29 +439,39 @@ describe("verifyEvents text messages", () => { type: EventType.TEXT_MESSAGE_START, messageId: "1", } as TextMessageStartEvent); + source$.next({ + type: EventType.STEP_STARTED, + stepName: "test-step", + } as StepStartedEvent); source$.next({ type: EventType.TEXT_MESSAGE_CONTENT, messageId: "1", - delta: "content 1", + delta: "test content", } as TextMessageContentEvent); - - // Try to end with a different ID + source$.next({ + type: EventType.STEP_FINISHED, + stepName: "test-step", + } as StepFinishedEvent); source$.next({ type: EventType.TEXT_MESSAGE_END, - messageId: "2", + messageId: "1", } as TextMessageEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(3); - expect(events[2].type).toBe(EventType.TEXT_MESSAGE_CONTENT); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(7); + expect(result[2].type).toBe(EventType.STEP_STARTED); + expect(result[4].type).toBe(EventType.STEP_FINISHED); }); - // NEW TEST: Empty text messages (no content) - it("should allow empty text messages with no content", async () => { + // Test: Should allow tool calls to start during text messages + it("should allow tool calls to start during text messages", async () => { const source$ = new Subject(); // Set up subscription and collect events @@ -730,7 +484,7 @@ describe("verifyEvents text messages", () => { ), ); - // Send a valid sequence with an empty text message + // Send a valid sequence with tool calls inside a text message source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -740,6 +494,30 @@ describe("verifyEvents text messages", () => { type: EventType.TEXT_MESSAGE_START, messageId: "1", } as TextMessageStartEvent); + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "1", + delta: "Starting search...", + } as TextMessageContentEvent); + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "tool1", + toolCallName: "search", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "tool1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "tool1", + } as ToolCallEndEvent); + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "1", + delta: "Search completed.", + } as TextMessageContentEvent); source$.next({ type: EventType.TEXT_MESSAGE_END, messageId: "1", @@ -753,51 +531,13 @@ describe("verifyEvents text messages", () => { const result = await promise; // Verify all events were processed - expect(result.length).toBe(4); - expect(result[1].type).toBe(EventType.TEXT_MESSAGE_START); - expect(result[2].type).toBe(EventType.TEXT_MESSAGE_END); - }); - - // NEW TEST: Missing/undefined IDs for TEXT_MESSAGE_START - it("should not allow text messages with undefined or null IDs", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain("requires a valid message ID"); - subscription.unsubscribe(); - }, - }); - - // Start a valid run - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); - - // Try to start a text message with undefined ID - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "undefined-id", - role: "assistant", - } as TextMessageStartEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify events processed before the error - expect(events.length).toBe(2); - expect(events[0].type).toBe(EventType.RUN_STARTED); - expect(events[1].type).toBe(EventType.TEXT_MESSAGE_START); + expect(result.length).toBe(9); + expect(result[3].type).toBe(EventType.TOOL_CALL_START); + expect(result[4].type).toBe(EventType.TOOL_CALL_ARGS); + expect(result[5].type).toBe(EventType.TOOL_CALL_END); }); - // NEW TEST: Sequential text messages + // Test: Sequential text messages it("should allow multiple sequential text messages", async () => { const source$ = new Subject(); @@ -866,7 +606,7 @@ describe("verifyEvents text messages", () => { expect(result[6].type).toBe(EventType.TEXT_MESSAGE_END); }); - // NEW TEST: Text message at run boundaries + // Test: Text message at run boundaries it("should allow text messages immediately after RUN_STARTED and before RUN_FINISHED", async () => { const source$ = new Subject(); @@ -914,4 +654,33 @@ describe("verifyEvents text messages", () => { expect(result[3].type).toBe(EventType.TEXT_MESSAGE_END); expect(result[4].type).toBe(EventType.RUN_FINISHED); }); + + // Test: Starting text message before RUN_STARTED + it("should not allow starting a text message before RUN_STARTED", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain("First event must be 'RUN_STARTED'"); + subscription.unsubscribe(); + }, + }); + + // Try to start a text message before RUN_STARTED + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "1", + } as TextMessageStartEvent); + + // Complete the source and wait a bit for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify no events were processed + expect(events.length).toBe(0); + }); }); diff --git a/typescript-sdk/packages/client/src/verify/__tests__/verify.tool-calls.test.ts b/typescript-sdk/packages/client/src/verify/__tests__/verify.tool-calls.test.ts index 61e2a2277..5e68a3431 100644 --- a/typescript-sdk/packages/client/src/verify/__tests__/verify.tool-calls.test.ts +++ b/typescript-sdk/packages/client/src/verify/__tests__/verify.tool-calls.test.ts @@ -25,8 +25,8 @@ import { } from "@ag-ui/core"; describe("verifyEvents tool calls", () => { - // Test: Cannot send lifecycle events inside a tool call - it("should not allow lifecycle events inside a tool call", async () => { + // Test: Cannot send TOOL_CALL_ARGS before TOOL_CALL_START + it("should not allow TOOL_CALL_ARGS before TOOL_CALL_START", async () => { const source$ = new Subject(); const events: BaseEvent[] = []; @@ -36,85 +36,37 @@ describe("verifyEvents tool calls", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send event type 'STEP_STARTED' after 'TOOL_CALL_START'`, + `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found with ID 't1'`, ); subscription.unsubscribe(); }, }); - // Start a valid run and open a tool call + // Start a valid run source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", runId: "test-run-id", } as RunStartedEvent); - source$.next({ - type: EventType.TOOL_CALL_START, - toolCallId: "t1", - toolCallName: "test-tool", - } as ToolCallStartEvent); - - // Try to send a lifecycle event inside the tool call - source$.next({ - type: EventType.STEP_STARTED, - stepName: "step1", - } as StepStartedEvent); - - // Complete the source and wait a bit for processing - source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); - }); - - // Test: Cannot send text message events inside a tool call - it("should not allow text message events inside a tool call", async () => { - const source$ = new Subject(); - const events: BaseEvent[] = []; - - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'TEXT_MESSAGE_START' after 'TOOL_CALL_START'`, - ); - subscription.unsubscribe(); - }, - }); - // Start a valid run and open a tool call - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); + // Try to send args without starting a tool call source$.next({ - type: EventType.TOOL_CALL_START, + type: EventType.TOOL_CALL_ARGS, toolCallId: "t1", - toolCallName: "test-tool", - } as ToolCallStartEvent); - - // Try to send a text message event inside the tool call - source$.next({ - type: EventType.TEXT_MESSAGE_START, - messageId: "1", - } as TextMessageStartEvent); + delta: "test args", + } as ToolCallArgsEvent); // Complete the source and wait a bit for processing source$.complete(); await new Promise((resolve) => setTimeout(resolve, 100)); // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + expect(events.length).toBe(1); + expect(events[0].type).toBe(EventType.RUN_STARTED); }); - // Test: Cannot start a nested tool call - it("should not allow nested tool calls", async () => { + // Test: Cannot send TOOL_CALL_END before TOOL_CALL_START + it("should not allow TOOL_CALL_END before TOOL_CALL_START", async () => { const source$ = new Subject(); const events: BaseEvent[] = []; @@ -124,38 +76,32 @@ describe("verifyEvents tool calls", () => { error: (err) => { expect(err).toBeInstanceOf(AGUIError); expect(err.message).toContain( - `Cannot send 'TOOL_CALL_START' event: A tool call is already in progress`, + `Cannot send 'TOOL_CALL_END' event: No active tool call found with ID 't1'`, ); subscription.unsubscribe(); }, }); - // Start a valid run and open a tool call + // Start a valid run source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", runId: "test-run-id", } as RunStartedEvent); - source$.next({ - type: EventType.TOOL_CALL_START, - toolCallId: "t1", - toolCallName: "test-tool", - } as ToolCallStartEvent); - // Try to start another tool call inside the first one + // Try to end a tool call without starting it source$.next({ - type: EventType.TOOL_CALL_START, - toolCallId: "t2", - toolCallName: "test-tool-2", - } as ToolCallStartEvent); + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); // Complete the source and wait a bit for processing source$.complete(); await new Promise((resolve) => setTimeout(resolve, 100)); // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + expect(events.length).toBe(1); + expect(events[0].type).toBe(EventType.RUN_STARTED); }); // Test: Should allow TOOL_CALL_ARGS and TOOL_CALL_END inside a tool call @@ -267,22 +213,21 @@ describe("verifyEvents tool calls", () => { expect(result[3].type).toBe(EventType.RAW); }); - // Test: Should not allow CUSTOM inside a tool call - it("should not allow CUSTOM inside a tool call", async () => { + // Test: Should allow CUSTOM inside a tool call + it("should allow CUSTOM inside a tool call", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain(`Cannot send event type 'CUSTOM' after 'TOOL_CALL_START'`); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a tool call + // Send a valid sequence with a custom event inside a tool call source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -293,41 +238,48 @@ describe("verifyEvents tool calls", () => { toolCallId: "t1", toolCallName: "test-tool", } as ToolCallStartEvent); - - // Try to send a meta event inside the tool call + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); source$.next({ type: EventType.CUSTOM, - name: "PredictState", - value: [{ state_key: "test", tool: "test-tool" }], + name: "test_event", + value: "test_value", } as CustomEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.CUSTOM); }); - // Test: Should not allow STATE_SNAPSHOT inside a tool call - it("should not allow STATE_SNAPSHOT inside a tool call", async () => { + // Test: Should allow STATE_SNAPSHOT inside a tool call + it("should allow STATE_SNAPSHOT inside a tool call", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'STATE_SNAPSHOT' after 'TOOL_CALL_START'`, - ); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a tool call + // Send a valid sequence with a state snapshot inside a tool call source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -338,40 +290,50 @@ describe("verifyEvents tool calls", () => { toolCallId: "t1", toolCallName: "test-tool", } as ToolCallStartEvent); - - // Try to send a state snapshot inside the tool call + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); source$.next({ type: EventType.STATE_SNAPSHOT, - snapshot: { test: true }, + snapshot: { + state: "test_state", + data: { foo: "bar" }, + }, } as StateSnapshotEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.STATE_SNAPSHOT); }); - // Test: Should not allow STATE_DELTA inside a tool call - it("should not allow STATE_DELTA inside a tool call", async () => { + // Test: Should allow STATE_DELTA inside a tool call + it("should allow STATE_DELTA inside a tool call", async () => { const source$ = new Subject(); - const events: BaseEvent[] = []; - // Create a subscription that will complete only after an error - const subscription = verifyEvents(false)(source$).subscribe({ - next: (event) => events.push(event), - error: (err) => { - expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'STATE_DELTA' after 'TOOL_CALL_START'`, - ); - subscription.unsubscribe(); - }, - }); + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); - // Start a valid run and open a tool call + // Send a valid sequence with a state delta inside a tool call source$.next({ type: EventType.RUN_STARTED, threadId: "test-thread-id", @@ -382,24 +344,329 @@ describe("verifyEvents tool calls", () => { toolCallId: "t1", toolCallName: "test-tool", } as ToolCallStartEvent); - - // Try to send a state delta inside the tool call + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); source$.next({ type: EventType.STATE_DELTA, - delta: [{ op: "add", path: "/test", value: true }], + delta: [{ op: "add", path: "/result", value: "success" }], } as StateDeltaEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); - // Complete the source and wait a bit for processing + // Complete the source source$.complete(); - await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.STATE_DELTA); }); - // Test: Should not allow MESSAGES_SNAPSHOT inside a tool call - it("should not allow MESSAGES_SNAPSHOT inside a tool call", async () => { + // Test: Should allow MESSAGES_SNAPSHOT inside a tool call + it("should allow MESSAGES_SNAPSHOT inside a tool call", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send a valid sequence with a messages snapshot inside a tool call + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t1", + toolCallName: "test-tool", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); + source$.next({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [{ role: "user", content: "test", id: "test-id" }], + } as MessagesSnapshotEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(6); + expect(result[3].type).toBe(EventType.MESSAGES_SNAPSHOT); + }); + + // Test: Should allow lifecycle events (STEP_STARTED/STEP_FINISHED) during tool calls + it("should allow lifecycle events during tool calls", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send a valid sequence with lifecycle events inside a tool call + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t1", + toolCallName: "test-tool", + } as ToolCallStartEvent); + source$.next({ + type: EventType.STEP_STARTED, + stepName: "test-step", + } as StepStartedEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); + source$.next({ + type: EventType.STEP_FINISHED, + stepName: "test-step", + } as StepFinishedEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(7); + expect(result[2].type).toBe(EventType.STEP_STARTED); + expect(result[4].type).toBe(EventType.STEP_FINISHED); + }); + + // Test: Should allow text messages to start during tool calls + it("should allow text messages to start during tool calls", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send a valid sequence with text messages inside a tool call + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t1", + toolCallName: "test-tool", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "Preparing...", + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TEXT_MESSAGE_START, + messageId: "msg1", + } as TextMessageStartEvent); + source$.next({ + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg1", + delta: "Tool is processing...", + } as TextMessageContentEvent); + source$.next({ + type: EventType.TEXT_MESSAGE_END, + messageId: "msg1", + } as TextMessageEndEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "Completed.", + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(9); + expect(result[3].type).toBe(EventType.TEXT_MESSAGE_START); + expect(result[4].type).toBe(EventType.TEXT_MESSAGE_CONTENT); + expect(result[5].type).toBe(EventType.TEXT_MESSAGE_END); + }); + + // Test: Sequential tool calls + it("should allow multiple sequential tool calls", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send a valid sequence with multiple tool calls + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // First tool call + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t1", + toolCallName: "search", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + + // Second tool call + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t2", + toolCallName: "calculate", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t2", + delta: '{"expression":"1+1"}', + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t2", + } as ToolCallEndEvent); + + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(8); + expect(result[1].type).toBe(EventType.TOOL_CALL_START); + expect(result[2].type).toBe(EventType.TOOL_CALL_ARGS); + expect(result[3].type).toBe(EventType.TOOL_CALL_END); + expect(result[4].type).toBe(EventType.TOOL_CALL_START); + expect(result[5].type).toBe(EventType.TOOL_CALL_ARGS); + expect(result[6].type).toBe(EventType.TOOL_CALL_END); + }); + + // Test: Tool call at run boundaries + it("should allow tool calls immediately after RUN_STARTED and before RUN_FINISHED", async () => { + const source$ = new Subject(); + + // Set up subscription and collect events + const promise = firstValueFrom( + verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + throw err; + }), + ), + ); + + // Send tool call immediately after run start and before run end + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + source$.next({ + type: EventType.TOOL_CALL_START, + toolCallId: "t1", + toolCallName: "test-tool", + } as ToolCallStartEvent); + source$.next({ + type: EventType.TOOL_CALL_ARGS, + toolCallId: "t1", + delta: "test args", + } as ToolCallArgsEvent); + source$.next({ + type: EventType.TOOL_CALL_END, + toolCallId: "t1", + } as ToolCallEndEvent); + source$.next({ type: EventType.RUN_FINISHED } as RunFinishedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect no errors + const result = await promise; + + // Verify all events were processed + expect(result.length).toBe(5); + expect(result[0].type).toBe(EventType.RUN_STARTED); + expect(result[1].type).toBe(EventType.TOOL_CALL_START); + expect(result[3].type).toBe(EventType.TOOL_CALL_END); + expect(result[4].type).toBe(EventType.RUN_FINISHED); + }); + + // Test: Starting tool call before RUN_STARTED + it("should not allow starting a tool call before RUN_STARTED", async () => { const source$ = new Subject(); const events: BaseEvent[] = []; @@ -408,37 +675,23 @@ describe("verifyEvents tool calls", () => { next: (event) => events.push(event), error: (err) => { expect(err).toBeInstanceOf(AGUIError); - expect(err.message).toContain( - `Cannot send event type 'MESSAGES_SNAPSHOT' after 'TOOL_CALL_START'`, - ); + expect(err.message).toContain("First event must be 'RUN_STARTED'"); subscription.unsubscribe(); }, }); - // Start a valid run and open a tool call - source$.next({ - type: EventType.RUN_STARTED, - threadId: "test-thread-id", - runId: "test-run-id", - } as RunStartedEvent); + // Try to start a tool call before RUN_STARTED source$.next({ type: EventType.TOOL_CALL_START, toolCallId: "t1", toolCallName: "test-tool", } as ToolCallStartEvent); - // Try to send a messages snapshot inside the tool call - source$.next({ - type: EventType.MESSAGES_SNAPSHOT, - messages: [{ role: "user", content: "test" }], - } as MessagesSnapshotEvent); - // Complete the source and wait a bit for processing source$.complete(); await new Promise((resolve) => setTimeout(resolve, 100)); - // Verify only events before the error were processed - expect(events.length).toBe(2); - expect(events[1].type).toBe(EventType.TOOL_CALL_START); + // Verify no events were processed + expect(events.length).toBe(0); }); }); diff --git a/typescript-sdk/packages/client/src/verify/verify.ts b/typescript-sdk/packages/client/src/verify/verify.ts index 9196ecc6c..9088c047d 100644 --- a/typescript-sdk/packages/client/src/verify/verify.ts +++ b/typescript-sdk/packages/client/src/verify/verify.ts @@ -6,8 +6,8 @@ export const verifyEvents = (debug: boolean) => (source$: Observable): Observable => { // Declare variables in closure to maintain state across events - let activeMessageId: string | undefined; - let activeToolCallId: string | undefined; + let activeMessages = new Map(); // Map of message ID -> active status + let activeToolCalls = new Map(); // Map of tool call ID -> active status let runFinished = false; let runError = false; // New flag to track if RUN_ERROR has been sent // New flags to track first/last event requirements @@ -46,56 +46,6 @@ export const verifyEvents = ); } - // Forbid lifecycle events and tool events inside a text message - if (activeMessageId !== undefined) { - // Define allowed event types inside a text message - const allowedEventTypes = [ - EventType.TEXT_MESSAGE_CONTENT, - EventType.TEXT_MESSAGE_END, - EventType.RAW, - ]; - - // If the event type is not in the allowed list, throw an error - if (!allowedEventTypes.includes(eventType)) { - return throwError( - () => - new AGUIError( - `Cannot send event type '${eventType}' after 'TEXT_MESSAGE_START': Send 'TEXT_MESSAGE_END' first.`, - ), - ); - } - } - - // Forbid lifecycle events and text message events inside a tool call - if (activeToolCallId !== undefined) { - // Define allowed event types inside a tool call - const allowedEventTypes = [ - EventType.TOOL_CALL_ARGS, - EventType.TOOL_CALL_END, - EventType.RAW, - ]; - - // If the event type is not in the allowed list, throw an error - if (!allowedEventTypes.includes(eventType)) { - // Special handling for nested tool calls for better error message - if (eventType === EventType.TOOL_CALL_START) { - return throwError( - () => - new AGUIError( - `Cannot send 'TOOL_CALL_START' event: A tool call is already in progress. Complete it with 'TOOL_CALL_END' first.`, - ), - ); - } - - return throwError( - () => - new AGUIError( - `Cannot send event type '${eventType}' after 'TOOL_CALL_START': Send 'TOOL_CALL_END' first.`, - ), - ); - } - } - // Handle first event requirement and prevent multiple RUN_STARTED if (!firstEventReceived) { firstEventReceived = true; @@ -116,36 +66,31 @@ export const verifyEvents = switch (eventType) { // Text message flow case EventType.TEXT_MESSAGE_START: { - // Can't start a message if one is already in progress - if (activeMessageId !== undefined) { + const messageId = (event as any).messageId; + + // Check if this message is already in progress + if (activeMessages.has(messageId)) { return throwError( () => new AGUIError( - `Cannot send 'TEXT_MESSAGE_START' event: A text message is already in progress. Complete it with 'TEXT_MESSAGE_END' first.`, + `Cannot send 'TEXT_MESSAGE_START' event: A text message with ID '${messageId}' is already in progress. Complete it with 'TEXT_MESSAGE_END' first.`, ), ); } - activeMessageId = (event as any).messageId; + activeMessages.set(messageId, true); return of(event); } case EventType.TEXT_MESSAGE_CONTENT: { - // Must be in a message and IDs must match - if (activeMessageId === undefined) { - return throwError( - () => - new AGUIError( - `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found. Start a text message with 'TEXT_MESSAGE_START' first.`, - ), - ); - } + const messageId = (event as any).messageId; - if ((event as any).messageId !== activeMessageId) { + // Must be in a message with this ID + if (!activeMessages.has(messageId)) { return throwError( () => new AGUIError( - `Cannot send 'TEXT_MESSAGE_CONTENT' event: Message ID mismatch. The ID '${(event as any).messageId}' doesn't match the active message ID '${activeMessageId}'.`, + `Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID '${messageId}'. Start a text message with 'TEXT_MESSAGE_START' first.`, ), ); } @@ -154,62 +99,50 @@ export const verifyEvents = } case EventType.TEXT_MESSAGE_END: { - // Must be in a message and IDs must match - if (activeMessageId === undefined) { - return throwError( - () => - new AGUIError( - `Cannot send 'TEXT_MESSAGE_END' event: No active text message found. A 'TEXT_MESSAGE_START' event must be sent first.`, - ), - ); - } + const messageId = (event as any).messageId; - if ((event as any).messageId !== activeMessageId) { + // Must be in a message with this ID + if (!activeMessages.has(messageId)) { return throwError( () => new AGUIError( - `Cannot send 'TEXT_MESSAGE_END' event: Message ID mismatch. The ID '${(event as any).messageId}' doesn't match the active message ID '${activeMessageId}'.`, + `Cannot send 'TEXT_MESSAGE_END' event: No active text message found with ID '${messageId}'. A 'TEXT_MESSAGE_START' event must be sent first.`, ), ); } - // Reset message state - activeMessageId = undefined; + // Remove message from active set + activeMessages.delete(messageId); return of(event); } // Tool call flow case EventType.TOOL_CALL_START: { - // Can't start a tool call if one is already in progress - if (activeToolCallId !== undefined) { + const toolCallId = (event as any).toolCallId; + + // Check if this tool call is already in progress + if (activeToolCalls.has(toolCallId)) { return throwError( () => new AGUIError( - `Cannot send 'TOOL_CALL_START' event: A tool call is already in progress. Complete it with 'TOOL_CALL_END' first.`, + `Cannot send 'TOOL_CALL_START' event: A tool call with ID '${toolCallId}' is already in progress. Complete it with 'TOOL_CALL_END' first.`, ), ); } - activeToolCallId = (event as any).toolCallId; + activeToolCalls.set(toolCallId, true); return of(event); } case EventType.TOOL_CALL_ARGS: { - // Must be in a tool call and IDs must match - if (activeToolCallId === undefined) { - return throwError( - () => - new AGUIError( - `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found. Start a tool call with 'TOOL_CALL_START' first.`, - ), - ); - } + const toolCallId = (event as any).toolCallId; - if ((event as any).toolCallId !== activeToolCallId) { + // Must be in a tool call with this ID + if (!activeToolCalls.has(toolCallId)) { return throwError( () => new AGUIError( - `Cannot send 'TOOL_CALL_ARGS' event: Tool call ID mismatch. The ID '${(event as any).toolCallId}' doesn't match the active tool call ID '${activeToolCallId}'.`, + `Cannot send 'TOOL_CALL_ARGS' event: No active tool call found with ID '${toolCallId}'. Start a tool call with 'TOOL_CALL_START' first.`, ), ); } @@ -218,27 +151,20 @@ export const verifyEvents = } case EventType.TOOL_CALL_END: { - // Must be in a tool call and IDs must match - if (activeToolCallId === undefined) { - return throwError( - () => - new AGUIError( - `Cannot send 'TOOL_CALL_END' event: No active tool call found. A 'TOOL_CALL_START' event must be sent first.`, - ), - ); - } + const toolCallId = (event as any).toolCallId; - if ((event as any).toolCallId !== activeToolCallId) { + // Must be in a tool call with this ID + if (!activeToolCalls.has(toolCallId)) { return throwError( () => new AGUIError( - `Cannot send 'TOOL_CALL_END' event: Tool call ID mismatch. The ID '${(event as any).toolCallId}' doesn't match the active tool call ID '${activeToolCallId}'.`, + `Cannot send 'TOOL_CALL_END' event: No active tool call found with ID '${toolCallId}'. A 'TOOL_CALL_START' event must be sent first.`, ), ); } - // Reset tool call state - activeToolCallId = undefined; + // Remove tool call from active set + activeToolCalls.delete(toolCallId); return of(event); } @@ -289,6 +215,28 @@ export const verifyEvents = ); } + // Check that all messages are finished before run ends + if (activeMessages.size > 0) { + const unfinishedMessages = Array.from(activeMessages.keys()).join(", "); + return throwError( + () => + new AGUIError( + `Cannot send 'RUN_FINISHED' while text messages are still active: ${unfinishedMessages}`, + ), + ); + } + + // Check that all tool calls are finished before run ends + if (activeToolCalls.size > 0) { + const unfinishedToolCalls = Array.from(activeToolCalls.keys()).join(", "); + return throwError( + () => + new AGUIError( + `Cannot send 'RUN_FINISHED' while tool calls are still active: ${unfinishedToolCalls}`, + ), + ); + } + runFinished = true; return of(event); } From ab65b99db6f00476c07d044367662caa614b7317 Mon Sep 17 00:00:00 2001 From: Markus Ecker Date: Wed, 23 Jul 2025 15:08:42 +0200 Subject: [PATCH 2/4] fix message content --- .../packages/client/src/apply/default.ts | 106 ++- .../__tests__/convert.concurrent.test.ts | 829 ++++++++++++++++++ .../packages/client/src/legacy/convert.ts | 8 +- 3 files changed, 908 insertions(+), 35 deletions(-) create mode 100644 typescript-sdk/packages/client/src/legacy/__tests__/convert.concurrent.test.ts diff --git a/typescript-sdk/packages/client/src/apply/default.ts b/typescript-sdk/packages/client/src/apply/default.ts index 0e9deb04c..795abe915 100644 --- a/typescript-sdk/packages/client/src/apply/default.ts +++ b/typescript-sdk/packages/client/src/apply/default.ts @@ -116,6 +116,15 @@ export const defaultApplyEvents = ( } case EventType.TEXT_MESSAGE_CONTENT: { + const { messageId, delta } = event as TextMessageContentEvent; + + // Find the target message by ID + const targetMessage = messages.find((m) => m.id === messageId); + if (!targetMessage) { + console.warn(`TEXT_MESSAGE_CONTENT: No message found with ID '${messageId}'`); + return emitUpdates(); + } + const mutation = await runSubscribersWithMutation( subscribers, messages, @@ -127,17 +136,14 @@ export const defaultApplyEvents = ( state, agent, input, - textMessageBuffer: messages[messages.length - 1].content ?? "", + textMessageBuffer: targetMessage.content ?? "", }), ); applyMutation(mutation); if (mutation.stopPropagation !== true) { - const { delta } = event as TextMessageContentEvent; - - // Get the last message and append the content - const lastMessage = messages[messages.length - 1]; - lastMessage.content = lastMessage.content! + delta; + // Append content to the correct message by ID + targetMessage.content = (targetMessage.content || "") + delta; applyMutation({ messages }); } @@ -145,6 +151,15 @@ export const defaultApplyEvents = ( } case EventType.TEXT_MESSAGE_END: { + const { messageId } = event as TextMessageEndEvent; + + // Find the target message by ID + const targetMessage = messages.find((m) => m.id === messageId); + if (!targetMessage) { + console.warn(`TEXT_MESSAGE_END: No message found with ID '${messageId}'`); + return emitUpdates(); + } + const mutation = await runSubscribersWithMutation( subscribers, messages, @@ -156,7 +171,7 @@ export const defaultApplyEvents = ( state, agent, input, - textMessageBuffer: messages[messages.length - 1].content ?? "", + textMessageBuffer: targetMessage.content ?? "", }), ); applyMutation(mutation); @@ -164,7 +179,7 @@ export const defaultApplyEvents = ( await Promise.all( subscribers.map((subscriber) => { subscriber.onNewMessage?.({ - message: messages[messages.length - 1], + message: targetMessage, messages, state, agent, @@ -233,17 +248,34 @@ export const defaultApplyEvents = ( } case EventType.TOOL_CALL_ARGS: { + const { toolCallId, delta } = event as ToolCallArgsEvent; + + // Find the message containing this tool call + const targetMessage = messages.find((m) => + (m as AssistantMessage).toolCalls?.some((tc) => tc.id === toolCallId), + ) as AssistantMessage; + + if (!targetMessage) { + console.warn( + `TOOL_CALL_ARGS: No message found containing tool call with ID '${toolCallId}'`, + ); + return emitUpdates(); + } + + // Find the specific tool call + const targetToolCall = targetMessage.toolCalls!.find((tc) => tc.id === toolCallId); + if (!targetToolCall) { + console.warn(`TOOL_CALL_ARGS: No tool call found with ID '${toolCallId}'`); + return emitUpdates(); + } + const mutation = await runSubscribersWithMutation( subscribers, messages, state, (subscriber, messages, state) => { - const toolCalls = - (messages[messages.length - 1] as AssistantMessage)?.toolCalls ?? []; - const toolCallBuffer = - toolCalls.length > 0 ? toolCalls[toolCalls.length - 1].function.arguments : ""; - const toolCallName = - toolCalls.length > 0 ? toolCalls[toolCalls.length - 1].function.name : ""; + const toolCallBuffer = targetToolCall.function.arguments; + const toolCallName = targetToolCall.function.name; let partialToolCallArgs = {}; try { // Parse from toolCallBuffer only (before current delta is applied) @@ -265,17 +297,8 @@ export const defaultApplyEvents = ( applyMutation(mutation); if (mutation.stopPropagation !== true) { - const { delta } = event as ToolCallArgsEvent; - - // Get the last message - const lastMessage = messages[messages.length - 1] as AssistantMessage; - - // Get the last tool call - const lastToolCall = lastMessage.toolCalls![lastMessage.toolCalls!.length - 1]; - - // Append the arguments - lastToolCall.function.arguments += delta; - + // Append the arguments to the correct tool call by ID + targetToolCall.function.arguments += delta; applyMutation({ messages }); } @@ -283,17 +306,34 @@ export const defaultApplyEvents = ( } case EventType.TOOL_CALL_END: { + const { toolCallId } = event as ToolCallEndEvent; + + // Find the message containing this tool call + const targetMessage = messages.find((m) => + (m as AssistantMessage).toolCalls?.some((tc) => tc.id === toolCallId), + ) as AssistantMessage; + + if (!targetMessage) { + console.warn( + `TOOL_CALL_END: No message found containing tool call with ID '${toolCallId}'`, + ); + return emitUpdates(); + } + + // Find the specific tool call + const targetToolCall = targetMessage.toolCalls!.find((tc) => tc.id === toolCallId); + if (!targetToolCall) { + console.warn(`TOOL_CALL_END: No tool call found with ID '${toolCallId}'`); + return emitUpdates(); + } + const mutation = await runSubscribersWithMutation( subscribers, messages, state, (subscriber, messages, state) => { - const toolCalls = - (messages[messages.length - 1] as AssistantMessage)?.toolCalls ?? []; - const toolCallArgsString = - toolCalls.length > 0 ? toolCalls[toolCalls.length - 1].function.arguments : ""; - const toolCallName = - toolCalls.length > 0 ? toolCalls[toolCalls.length - 1].function.name : ""; + const toolCallArgsString = targetToolCall.function.arguments; + const toolCallName = targetToolCall.function.name; let toolCallArgs = {}; try { toolCallArgs = JSON.parse(toolCallArgsString); @@ -314,9 +354,7 @@ export const defaultApplyEvents = ( await Promise.all( subscribers.map((subscriber) => { subscriber.onNewToolCall?.({ - toolCall: (messages[messages.length - 1] as AssistantMessage).toolCalls![ - (messages[messages.length - 1] as AssistantMessage).toolCalls!.length - 1 - ], + toolCall: targetToolCall, messages, state, agent, diff --git a/typescript-sdk/packages/client/src/legacy/__tests__/convert.concurrent.test.ts b/typescript-sdk/packages/client/src/legacy/__tests__/convert.concurrent.test.ts new file mode 100644 index 000000000..b8e1c8192 --- /dev/null +++ b/typescript-sdk/packages/client/src/legacy/__tests__/convert.concurrent.test.ts @@ -0,0 +1,829 @@ +import { convertToLegacyEvents } from "../convert"; +import { of } from "rxjs"; +import { toArray } from "rxjs/operators"; +import { + BaseEvent, + EventType, + TextMessageStartEvent, + TextMessageContentEvent, + TextMessageEndEvent, + ToolCallStartEvent, + ToolCallArgsEvent, + ToolCallEndEvent, + CustomEvent, + StepStartedEvent, + StepFinishedEvent, +} from "@ag-ui/core"; +import { LegacyRuntimeProtocolEvent } from "../types"; + +describe("convertToLegacyEvents - Concurrent Operations", () => { + const defaultParams = { + threadId: "test-thread", + runId: "test-run", + agentName: "test-agent", + }; + + it("should handle concurrent text messages correctly", async () => { + const mockEvents: BaseEvent[] = [ + // Start two concurrent text messages + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "msg2", + role: "assistant", + } as TextMessageStartEvent, + + // Send content for both messages + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "msg1", + delta: "First message content", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "msg2", + delta: "Second message content", + } as TextMessageContentEvent, + + // End messages in reverse order + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "msg2", + } as TextMessageEndEvent, + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "msg1", + } as TextMessageEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(6); + + // Verify message starts + expect(events[0].type).toBe("TextMessageStart"); + expect(events[1].type).toBe("TextMessageStart"); + if (events[0].type === "TextMessageStart" && events[1].type === "TextMessageStart") { + expect(events[0].messageId).toBe("msg1"); + expect(events[1].messageId).toBe("msg2"); + } + + // Verify message content + expect(events[2].type).toBe("TextMessageContent"); + expect(events[3].type).toBe("TextMessageContent"); + if (events[2].type === "TextMessageContent" && events[3].type === "TextMessageContent") { + expect(events[2].messageId).toBe("msg1"); + expect(events[2].content).toBe("First message content"); + expect(events[3].messageId).toBe("msg2"); + expect(events[3].content).toBe("Second message content"); + } + + // Verify message ends (in reverse order) + expect(events[4].type).toBe("TextMessageEnd"); + expect(events[5].type).toBe("TextMessageEnd"); + if (events[4].type === "TextMessageEnd" && events[5].type === "TextMessageEnd") { + expect(events[4].messageId).toBe("msg2"); + expect(events[5].messageId).toBe("msg1"); + } + }); + + it("should handle concurrent tool calls correctly", async () => { + const mockEvents: BaseEvent[] = [ + // Start two concurrent tool calls + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "msg1", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "tool2", + toolCallName: "calculate", + parentMessageId: "msg2", + } as ToolCallStartEvent, + + // Send args for both tool calls + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "tool1", + delta: '{"query":"test search"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "tool2", + delta: '{"expression":"2+2"}', + } as ToolCallArgsEvent, + + // End tool calls in reverse order + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "tool2", + } as ToolCallEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "tool1", + } as ToolCallEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(6); + + // Verify tool call starts + expect(events[0].type).toBe("ActionExecutionStart"); + expect(events[1].type).toBe("ActionExecutionStart"); + if (events[0].type === "ActionExecutionStart" && events[1].type === "ActionExecutionStart") { + expect(events[0].actionExecutionId).toBe("tool1"); + expect(events[0].actionName).toBe("search"); + expect(events[0].parentMessageId).toBe("msg1"); + expect(events[1].actionExecutionId).toBe("tool2"); + expect(events[1].actionName).toBe("calculate"); + expect(events[1].parentMessageId).toBe("msg2"); + } + + // Verify tool call args + expect(events[2].type).toBe("ActionExecutionArgs"); + expect(events[3].type).toBe("ActionExecutionArgs"); + if (events[2].type === "ActionExecutionArgs" && events[3].type === "ActionExecutionArgs") { + expect(events[2].actionExecutionId).toBe("tool1"); + expect(events[2].args).toBe('{"query":"test search"}'); + expect(events[3].actionExecutionId).toBe("tool2"); + expect(events[3].args).toBe('{"expression":"2+2"}'); + } + + // Verify tool call ends (in reverse order) + expect(events[4].type).toBe("ActionExecutionEnd"); + expect(events[5].type).toBe("ActionExecutionEnd"); + if (events[4].type === "ActionExecutionEnd" && events[5].type === "ActionExecutionEnd") { + expect(events[4].actionExecutionId).toBe("tool2"); + expect(events[5].actionExecutionId).toBe("tool1"); + } + }); + + it("should handle mixed concurrent text messages and tool calls", async () => { + const mockEvents: BaseEvent[] = [ + // Start a text message + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "thinking_msg", + role: "assistant", + } as TextMessageStartEvent, + + // Start a tool call while message is active + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "search_tool", + toolCallName: "web_search", + parentMessageId: "tool_msg", + } as ToolCallStartEvent, + + // Add content to text message + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "thinking_msg", + delta: "Let me search for that...", + } as TextMessageContentEvent, + + // Add args to tool call + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "search_tool", + delta: '{"query":"concurrent events"}', + } as ToolCallArgsEvent, + + // Start another text message + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "status_msg", + role: "assistant", + } as TextMessageStartEvent, + + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "status_msg", + delta: "Processing...", + } as TextMessageContentEvent, + + // End everything + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "thinking_msg", + } as TextMessageEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "search_tool", + } as ToolCallEndEvent, + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "status_msg", + } as TextMessageEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(9); + + // Check the sequence matches expected pattern + const expectedTypes = [ + "TextMessageStart", // thinking_msg start + "ActionExecutionStart", // search_tool start + "TextMessageContent", // thinking_msg content + "ActionExecutionArgs", // search_tool args + "TextMessageStart", // status_msg start + "TextMessageContent", // status_msg content + "TextMessageEnd", // thinking_msg end + "ActionExecutionEnd", // search_tool end + "TextMessageEnd", // status_msg end + ]; + + for (let i = 0; i < expectedTypes.length; i++) { + expect(events[i].type).toBe(expectedTypes[i]); + } + + // Verify specific content + const thinkingContent = events.find( + (e) => e.type === "TextMessageContent" && (e as any).messageId === "thinking_msg", + ); + expect(thinkingContent).toBeDefined(); + if (thinkingContent?.type === "TextMessageContent") { + expect(thinkingContent.content).toBe("Let me search for that..."); + } + + const toolArgs = events.find( + (e) => e.type === "ActionExecutionArgs" && (e as any).actionExecutionId === "search_tool", + ); + expect(toolArgs).toBeDefined(); + if (toolArgs?.type === "ActionExecutionArgs") { + expect(toolArgs.args).toBe('{"query":"concurrent events"}'); + } + }); + + it("should handle multiple tool calls on same parent message", async () => { + const mockEvents: BaseEvent[] = [ + // Start multiple tool calls with same parent + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "search1", + toolCallName: "search", + parentMessageId: "agent_msg", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "calc1", + toolCallName: "calculate", + parentMessageId: "agent_msg", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "format1", + toolCallName: "format", + parentMessageId: "agent_msg", + } as ToolCallStartEvent, + + // Send args for all tool calls + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "search1", + delta: '{"query":"test"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "calc1", + delta: '{"expression":"2*3"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "format1", + delta: '{"format":"json"}', + } as ToolCallArgsEvent, + + // End all tool calls + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "search1", + } as ToolCallEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "calc1", + } as ToolCallEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "format1", + } as ToolCallEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(9); + + // Verify all start events have same parent + const startEvents = events.filter((e) => e.type === "ActionExecutionStart"); + expect(startEvents).toHaveLength(3); + for (const event of startEvents) { + if (event.type === "ActionExecutionStart") { + expect(event.parentMessageId).toBe("agent_msg"); + } + } + + // Verify args events match correct tool calls + const argsEvents = events.filter((e) => e.type === "ActionExecutionArgs"); + expect(argsEvents).toHaveLength(3); + + const searchArgs = argsEvents.find((e) => (e as any).actionExecutionId === "search1"); + expect(searchArgs).toBeDefined(); + if (searchArgs?.type === "ActionExecutionArgs") { + expect(searchArgs.args).toBe('{"query":"test"}'); + } + + const calcArgs = argsEvents.find((e) => (e as any).actionExecutionId === "calc1"); + expect(calcArgs).toBeDefined(); + if (calcArgs?.type === "ActionExecutionArgs") { + expect(calcArgs.args).toBe('{"expression":"2*3"}'); + } + + const formatArgs = argsEvents.find((e) => (e as any).actionExecutionId === "format1"); + expect(formatArgs).toBeDefined(); + if (formatArgs?.type === "ActionExecutionArgs") { + expect(formatArgs.args).toBe('{"format":"json"}'); + } + }); + + it("should handle high-frequency concurrent events", async () => { + const mockEvents: BaseEvent[] = []; + + // Create many concurrent messages and tool calls + const numMessages = 5; + const numToolCalls = 5; + + // Start all messages + for (let i = 0; i < numMessages; i++) { + mockEvents.push({ + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now() + i, + messageId: `msg${i}`, + role: "assistant", + } as TextMessageStartEvent); + } + + // Start all tool calls + for (let i = 0; i < numToolCalls; i++) { + mockEvents.push({ + type: EventType.TOOL_CALL_START, + timestamp: Date.now() + numMessages + i, + toolCallId: `tool${i}`, + toolCallName: `tool_${i}`, + parentMessageId: `tool_msg${i}`, + } as ToolCallStartEvent); + } + + // Send content for all messages + for (let i = 0; i < numMessages; i++) { + mockEvents.push({ + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now() + numMessages + numToolCalls + i, + messageId: `msg${i}`, + delta: `Content for message ${i}`, + } as TextMessageContentEvent); + } + + // Send args for all tool calls + for (let i = 0; i < numToolCalls; i++) { + mockEvents.push({ + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now() + numMessages * 2 + numToolCalls + i, + toolCallId: `tool${i}`, + delta: `{"param${i}":"value${i}"}`, + } as ToolCallArgsEvent); + } + + // End all in reverse order + for (let i = numMessages - 1; i >= 0; i--) { + mockEvents.push({ + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now() + numMessages * 2 + numToolCalls * 2 + (numMessages - 1 - i), + messageId: `msg${i}`, + } as TextMessageEndEvent); + } + + for (let i = numToolCalls - 1; i >= 0; i--) { + mockEvents.push({ + type: EventType.TOOL_CALL_END, + timestamp: Date.now() + numMessages * 3 + numToolCalls * 2 + (numToolCalls - 1 - i), + toolCallId: `tool${i}`, + } as ToolCallEndEvent); + } + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + // Should have: numMessages starts + numToolCalls starts + numMessages content + numToolCalls args + numMessages ends + numToolCalls ends + const expectedLength = numMessages * 3 + numToolCalls * 3; + expect(events).toHaveLength(expectedLength); + + // Verify all message starts are present + const messageStarts = events.filter((e) => e.type === "TextMessageStart"); + expect(messageStarts).toHaveLength(numMessages); + for (let i = 0; i < numMessages; i++) { + const start = messageStarts.find((e) => (e as any).messageId === `msg${i}`); + expect(start).toBeDefined(); + } + + // Verify all tool call starts are present + const toolStarts = events.filter((e) => e.type === "ActionExecutionStart"); + expect(toolStarts).toHaveLength(numToolCalls); + for (let i = 0; i < numToolCalls; i++) { + const start = toolStarts.find((e) => (e as any).actionExecutionId === `tool${i}`); + expect(start).toBeDefined(); + if (start?.type === "ActionExecutionStart") { + expect(start.actionName).toBe(`tool_${i}`); + } + } + + // Verify all message content is present + const messageContent = events.filter((e) => e.type === "TextMessageContent"); + expect(messageContent).toHaveLength(numMessages); + for (let i = 0; i < numMessages; i++) { + const content = messageContent.find((e) => (e as any).messageId === `msg${i}`); + expect(content).toBeDefined(); + if (content?.type === "TextMessageContent") { + expect(content.content).toBe(`Content for message ${i}`); + } + } + + // Verify all tool call args are present + const toolArgs = events.filter((e) => e.type === "ActionExecutionArgs"); + expect(toolArgs).toHaveLength(numToolCalls); + for (let i = 0; i < numToolCalls; i++) { + const args = toolArgs.find((e) => (e as any).actionExecutionId === `tool${i}`); + expect(args).toBeDefined(); + if (args?.type === "ActionExecutionArgs") { + expect(args.args).toBe(`{"param${i}":"value${i}"}`); + } + } + }); + + it("should handle interleaved content and args updates correctly", async () => { + const mockEvents: BaseEvent[] = [ + // Start concurrent message and tool call + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "msg1", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "tool1", + toolCallName: "search", + parentMessageId: "tool_msg1", + } as ToolCallStartEvent, + + // Interleave content and args updates + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "msg1", + delta: "Searching ", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "tool1", + delta: '{"que', + } as ToolCallArgsEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "msg1", + delta: "for ", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "tool1", + delta: 'ry":"', + } as ToolCallArgsEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "msg1", + delta: "information...", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "tool1", + delta: 'test"}', + } as ToolCallArgsEvent, + + // End both + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "msg1", + } as TextMessageEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "tool1", + } as ToolCallEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(10); + + // Verify the interleaved pattern + expect(events[0].type).toBe("TextMessageStart"); + expect(events[1].type).toBe("ActionExecutionStart"); + expect(events[2].type).toBe("TextMessageContent"); + expect(events[3].type).toBe("ActionExecutionArgs"); + expect(events[4].type).toBe("TextMessageContent"); + expect(events[5].type).toBe("ActionExecutionArgs"); + expect(events[6].type).toBe("TextMessageContent"); + expect(events[7].type).toBe("ActionExecutionArgs"); + expect(events[8].type).toBe("TextMessageEnd"); + expect(events[9].type).toBe("ActionExecutionEnd"); + + // Verify content chunks + const contentEvents = events.filter((e) => e.type === "TextMessageContent"); + expect(contentEvents).toHaveLength(3); + if (contentEvents[0]?.type === "TextMessageContent") { + expect(contentEvents[0].content).toBe("Searching "); + } + if (contentEvents[1]?.type === "TextMessageContent") { + expect(contentEvents[1].content).toBe("for "); + } + if (contentEvents[2]?.type === "TextMessageContent") { + expect(contentEvents[2].content).toBe("information..."); + } + + // Verify args chunks + const argsEvents = events.filter((e) => e.type === "ActionExecutionArgs"); + expect(argsEvents).toHaveLength(3); + if (argsEvents[0]?.type === "ActionExecutionArgs") { + expect(argsEvents[0].args).toBe('{"que'); + } + if (argsEvents[1]?.type === "ActionExecutionArgs") { + expect(argsEvents[1].args).toBe('ry":"'); + } + if (argsEvents[2]?.type === "ActionExecutionArgs") { + expect(argsEvents[2].args).toBe('test"}'); + } + }); + + it("should handle concurrent operations with predictive state updates", async () => { + const mockEvents: BaseEvent[] = [ + // Set up predictive state + { + type: EventType.CUSTOM, + timestamp: Date.now(), + name: "PredictState", + value: [ + { + state_key: "search_results", + tool: "search", + tool_argument: "query", + }, + { + state_key: "calculation", + tool: "calculate", + tool_argument: "expression", + }, + ], + } as CustomEvent, + + // Start concurrent tool calls + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "search1", + toolCallName: "search", + parentMessageId: "msg1", + } as ToolCallStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "calc1", + toolCallName: "calculate", + parentMessageId: "msg2", + } as ToolCallStartEvent, + + // Send args that should trigger state updates + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "search1", + delta: '{"query":"concurrent test"}', + } as ToolCallArgsEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "calc1", + delta: '{"expression":"5*5"}', + } as ToolCallArgsEvent, + + // End tool calls + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "search1", + } as ToolCallEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "calc1", + } as ToolCallEndEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + // Should have: PredictState + 2 starts + 2 args + 2 state updates + 2 ends = 9 events + expect(events).toHaveLength(9); + + // First event should be the meta event + expect(events[0].type).toBe("MetaEvent"); + + // Should have state update events triggered by the tool call args + const stateEvents = events.filter((e) => e.type === "AgentStateMessage"); + expect(stateEvents).toHaveLength(2); + + // Verify first state update (from search) + if (stateEvents[0]?.type === "AgentStateMessage") { + const state = JSON.parse(stateEvents[0].state); + expect(state.search_results).toBe("concurrent test"); + } + + // Verify second state update (from calculation) + if (stateEvents[1]?.type === "AgentStateMessage") { + const state = JSON.parse(stateEvents[1].state); + expect(state.calculation).toBe("5*5"); + } + }); + + it("should handle concurrent operations with lifecycle steps", async () => { + const mockEvents: BaseEvent[] = [ + // Start a step + { + type: EventType.STEP_STARTED, + timestamp: Date.now(), + stepName: "processing", + } as StepStartedEvent, + + // Start concurrent operations during the step + { + type: EventType.TEXT_MESSAGE_START, + timestamp: Date.now(), + messageId: "thinking_msg", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TOOL_CALL_START, + timestamp: Date.now(), + toolCallId: "search_tool", + toolCallName: "search", + parentMessageId: "tool_msg", + } as ToolCallStartEvent, + + // Add content and args + { + type: EventType.TEXT_MESSAGE_CONTENT, + timestamp: Date.now(), + messageId: "thinking_msg", + delta: "Analyzing...", + } as TextMessageContentEvent, + { + type: EventType.TOOL_CALL_ARGS, + timestamp: Date.now(), + toolCallId: "search_tool", + delta: '{"query":"analysis"}', + } as ToolCallArgsEvent, + + // End operations + { + type: EventType.TEXT_MESSAGE_END, + timestamp: Date.now(), + messageId: "thinking_msg", + } as TextMessageEndEvent, + { + type: EventType.TOOL_CALL_END, + timestamp: Date.now(), + toolCallId: "search_tool", + } as ToolCallEndEvent, + + // End the step + { + type: EventType.STEP_FINISHED, + timestamp: Date.now(), + stepName: "processing", + } as StepFinishedEvent, + ]; + + const events = (await convertToLegacyEvents( + defaultParams.threadId, + defaultParams.runId, + defaultParams.agentName, + )(of(...mockEvents)) + .pipe(toArray()) + .toPromise()) as LegacyRuntimeProtocolEvent[]; + + expect(events).toHaveLength(8); + + // Verify the sequence includes step lifecycle and concurrent operations + expect(events[0].type).toBe("AgentStateMessage"); // Step start + expect(events[1].type).toBe("TextMessageStart"); + expect(events[2].type).toBe("ActionExecutionStart"); + expect(events[3].type).toBe("TextMessageContent"); + expect(events[4].type).toBe("ActionExecutionArgs"); + expect(events[5].type).toBe("TextMessageEnd"); + expect(events[6].type).toBe("ActionExecutionEnd"); + expect(events[7].type).toBe("AgentStateMessage"); // Step end + + // Verify step states + const stepStates = events.filter((e) => e.type === "AgentStateMessage"); + expect(stepStates).toHaveLength(2); + if (stepStates[0]?.type === "AgentStateMessage") { + expect(stepStates[0].active).toBe(true); + } + if (stepStates[1]?.type === "AgentStateMessage") { + expect(stepStates[1].active).toBe(false); + } + }); +}); diff --git a/typescript-sdk/packages/client/src/legacy/convert.ts b/typescript-sdk/packages/client/src/legacy/convert.ts index b79335563..4df7e48d6 100644 --- a/typescript-sdk/packages/client/src/legacy/convert.ts +++ b/typescript-sdk/packages/client/src/legacy/convert.ts @@ -127,7 +127,13 @@ export const convertToLegacyEvents = case EventType.TOOL_CALL_ARGS: { const argsEvent = event as ToolCallArgsEvent; - const currentToolCall = currentToolCalls[currentToolCalls.length - 1]; + // Find the tool call by ID instead of using the last one + const currentToolCall = currentToolCalls.find((tc) => tc.id === argsEvent.toolCallId); + if (!currentToolCall) { + console.warn(`TOOL_CALL_ARGS: No tool call found with ID '${argsEvent.toolCallId}'`); + return []; + } + currentToolCall.function.arguments += argsEvent.delta; let didUpdateState = false; From 0d755be3cfce948f1494ddecf80fbbe779257e23 Mon Sep 17 00:00:00 2001 From: Markus Ecker Date: Wed, 30 Jul 2025 09:57:43 +0200 Subject: [PATCH 3/4] add CLAUDE.md --- CLAUDE.md | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..9c3f04517 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,103 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Common Development Commands + +### TypeScript SDK (Main Development) +```bash +# Navigate to typescript-sdk directory for all TypeScript work +cd typescript-sdk + +# Install dependencies (using pnpm) +pnpm install + +# Build all packages +pnpm build + +# Run development mode +pnpm dev + +# Run linting +pnpm lint + +# Run type checking +pnpm check-types + +# Run tests +pnpm test + +# Format code +pnpm format + +# Clean build artifacts +pnpm clean + +# Full clean build +pnpm build:clean +``` + +### Python SDK +```bash +# Navigate to python-sdk directory +cd python-sdk + +# Install dependencies (using poetry) +poetry install + +# Run tests +poetry run pytest + +# Build distribution +poetry build +``` + +### Running Specific Integration Tests +```bash +# For TypeScript packages/integrations +cd typescript-sdk/packages/ +pnpm test + +# For running a single test file +pnpm test path/to/test.spec.ts +``` + +## High-Level Architecture + +AG-UI is an event-based protocol that standardizes agent-user interactions. The codebase is organized as a monorepo with the following structure: + +### Core Protocol Architecture +- **Event-Driven Communication**: All agent-UI communication happens through typed events (BaseEvent and its subtypes) +- **Transport Agnostic**: Protocol supports SSE, WebSockets, HTTP binary, and custom transports +- **Observable Pattern**: Uses RxJS Observables for streaming agent responses + +### Key Abstractions +1. **AbstractAgent**: Base class that all agents must implement with a `run(input: RunAgentInput) -> Observable` method +2. **HttpAgent**: Standard HTTP client supporting SSE and binary protocols for connecting to agent endpoints +3. **Event Types**: Lifecycle events (RUN_STARTED/FINISHED), message events (TEXT_MESSAGE_*), tool events (TOOL_CALL_*), and state management events (STATE_SNAPSHOT/DELTA) + +### Repository Structure +- `/typescript-sdk/`: Main TypeScript implementation + - `/packages/`: Core protocol packages (@ag-ui/core, @ag-ui/client, @ag-ui/encoder, @ag-ui/proto) + - `/integrations/`: Framework integrations (langgraph, mastra, crewai, etc.) + - `/apps/`: Example applications including the AG-UI Dojo demo viewer +- `/python-sdk/`: Python implementation of the protocol +- `/docs/`: Documentation site content + +### Integration Pattern +Each framework integration follows a similar pattern: +1. Implements the AbstractAgent interface +2. Translates framework-specific events to AG-UI protocol events +3. Provides both TypeScript client and Python server implementations +4. Includes examples demonstrating key AG-UI features (agentic chat, generative UI, human-in-the-loop, etc.) + +### State Management +- Uses STATE_SNAPSHOT for complete state representations +- Uses STATE_DELTA with JSON Patch (RFC 6902) for efficient incremental updates +- MESSAGES_SNAPSHOT provides conversation history + +### Development Workflow +- Turbo is used for monorepo build orchestration +- Each package has independent versioning +- Integration tests demonstrate protocol compliance +- The AG-UI Dojo app showcases all protocol features with live examples \ No newline at end of file From 04fcb87ee02bcaf71cd03d024c3d96ea2f91b2c2 Mon Sep 17 00:00:00 2001 From: Markus Ecker Date: Wed, 30 Jul 2025 09:59:24 +0200 Subject: [PATCH 4/4] feat: export RunAgentResult type from agent module --- typescript-sdk/packages/client/src/agent/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/typescript-sdk/packages/client/src/agent/index.ts b/typescript-sdk/packages/client/src/agent/index.ts index 09541bc42..a72ce2ca6 100644 --- a/typescript-sdk/packages/client/src/agent/index.ts +++ b/typescript-sdk/packages/client/src/agent/index.ts @@ -1,3 +1,4 @@ export { AbstractAgent } from "./agent"; +export type { RunAgentResult } from "./agent"; export { HttpAgent } from "./http"; export type { AgentConfig } from "./types";