diff --git a/typescript-sdk/packages/client/src/verify/__tests__/verify.lifecycle.test.ts b/typescript-sdk/packages/client/src/verify/__tests__/verify.lifecycle.test.ts index 849cec6b4..e9e9e6557 100644 --- a/typescript-sdk/packages/client/src/verify/__tests__/verify.lifecycle.test.ts +++ b/typescript-sdk/packages/client/src/verify/__tests__/verify.lifecycle.test.ts @@ -18,10 +18,11 @@ import { StepStartedEvent, StepFinishedEvent, } from "@ag-ui/core"; +import { RunResumedEvent, RunSuspendedEvent } from "@ag-ui/core/src"; describe("verifyEvents lifecycle", () => { // Test: RUN_STARTED must be the first event - it("should require RUN_STARTED as the first event", async () => { + it("should require RUN_STARTED as the first event if no RUN_SUSPENDED", async () => { const source$ = new Subject(); const result$ = verifyEvents(false)(source$).pipe( catchError((err) => { @@ -86,6 +87,100 @@ describe("verifyEvents lifecycle", () => { expect(events[0].type).toBe(EventType.RUN_STARTED); }); + it("should require RUN_RESUMED as the first event RUN_SUSPENDED", async () => { + const source$ = new Subject(); + const result$ = verifyEvents(false)(source$).pipe( + toArray(), + catchError((err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain("First event in suspended run must be 'RUN_RESUMED'"); + throw err; + }), + ); + + // Set up subscription + const promise = firstValueFrom(result$).catch((e) => e); + + // Start run and send event + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Suspense run + source$.next({ + type: EventType.RUN_SUSPENDED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunSuspendedEvent); + + // Send another RUN_STARTED event + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Complete the source + source$.complete(); + + // Await the promise and expect it to be an error + const result = await promise; + expect(result).toBeInstanceOf(AGUIError); + }); + + it("should not allow multiple RUN_RESUMED events", async () => { + const source$ = new Subject(); + const events: BaseEvent[] = []; + + // Create a subscription that will complete only after an error + const subscription = verifyEvents(false)(source$).subscribe({ + next: (event) => events.push(event), + error: (err) => { + expect(err).toBeInstanceOf(AGUIError); + expect(err.message).toContain("Cannot send 'RUN_RESUMED' without a previous 'RUN_SUSPENDED' event"); + subscription.unsubscribe(); + }, + }); + + // Send first RUN_STARTED (should be accepted) + source$.next({ + type: EventType.RUN_STARTED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunStartedEvent); + + // Suspend run (should be accepted) + source$.next({ + type: EventType.RUN_SUSPENDED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunSuspendedEvent); + + // Resume run (should be accepted) + source$.next({ + type: EventType.RUN_RESUMED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunResumedEvent); + + // Resume run again. This should be rejected. + source$.next({ + type: EventType.RUN_RESUMED, + threadId: "test-thread-id", + runId: "test-run-id", + } as RunResumedEvent); + + // Complete the source and wait a bit for processing + source$.complete(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify one event was processed before the error + expect(events.length).toBe(3); + expect(events[2].type).toBe(EventType.RUN_RESUMED); + }); + // Test: No events should be allowed after RUN_FINISHED (except RUN_ERROR) it("should not allow events after RUN_FINISHED (except RUN_ERROR)", async () => { const source$ = new Subject(); diff --git a/typescript-sdk/packages/client/src/verify/verify.ts b/typescript-sdk/packages/client/src/verify/verify.ts index 217d6e423..3690353c7 100644 --- a/typescript-sdk/packages/client/src/verify/verify.ts +++ b/typescript-sdk/packages/client/src/verify/verify.ts @@ -17,6 +17,7 @@ export const verifyEvents = let activeThinkingStep = false; let activeThinkingStepMessage = false; let runStarted = false; // Track if a run has started + let runSuspended = false; // Function to reset state for a new run const resetRunState = () => { @@ -28,6 +29,7 @@ export const verifyEvents = runFinished = false; runError = false; runStarted = true; + runSuspended = false; }; return source$.pipe( @@ -57,6 +59,13 @@ export const verifyEvents = `Cannot send event type '${eventType}': The run has already finished with 'RUN_FINISHED'. Start a new run with 'RUN_STARTED'.`, ), ); + } else if (runSuspended && eventType !== EventType.RUN_ERROR && eventType !== EventType.RUN_RESUMED) { + return throwError( + () => + new AGUIError( + `First event in suspended run must be 'RUN_RESUMED'`, + ), + ); } // Handle first event requirement and sequential RUN_STARTED @@ -262,6 +271,27 @@ export const verifyEvents = return of(event); } + case EventType.RUN_RESUMED: { + if (!runSuspended) { + return throwError( + () => + new AGUIError( + `Cannot send 'RUN_RESUMED' without a previous 'RUN_SUSPENDED' event`, + ), + ); + } + + runSuspended = false; + + return of(event); + } + + case EventType.RUN_SUSPENDED: { + runSuspended = true; + + return of(event); + } + case EventType.RUN_ERROR: { // RUN_ERROR can happen at any time runError = true; // Set flag to prevent any further events diff --git a/typescript-sdk/packages/core/src/events.ts b/typescript-sdk/packages/core/src/events.ts index a95fc8e15..e019efb07 100644 --- a/typescript-sdk/packages/core/src/events.ts +++ b/typescript-sdk/packages/core/src/events.ts @@ -26,11 +26,14 @@ export enum EventType { THINKING_END = "THINKING_END", STATE_SNAPSHOT = "STATE_SNAPSHOT", STATE_DELTA = "STATE_DELTA", + INTERRUPT = "INTERRUPT", MESSAGES_SNAPSHOT = "MESSAGES_SNAPSHOT", RAW = "RAW", CUSTOM = "CUSTOM", RUN_STARTED = "RUN_STARTED", RUN_FINISHED = "RUN_FINISHED", + RUN_SUSPENDED = "RUN_SUSPENDED", + RUN_RESUMED = "RUN_RESUMED", RUN_ERROR = "RUN_ERROR", STEP_STARTED = "STEP_STARTED", STEP_FINISHED = "STEP_FINISHED", @@ -134,6 +137,11 @@ export const StateDeltaEventSchema = BaseEventSchema.extend({ delta: z.array(z.any()), // JSON Patch (RFC 6902) }); +export const InterruptEventSchema = BaseEventSchema.extend({ + type: z.literal(EventType.INTERRUPT), + content: z.any(), +}); + export const MessagesSnapshotEventSchema = BaseEventSchema.extend({ type: z.literal(EventType.MESSAGES_SNAPSHOT), messages: z.array(MessageSchema), @@ -164,6 +172,19 @@ export const RunFinishedEventSchema = BaseEventSchema.extend({ result: z.any().optional(), }); +export const RunResumedEventSchema = BaseEventSchema.extend({ + type: z.literal(EventType.RUN_RESUMED), + threadId: z.string(), + runId: z.string(), +}); + +export const RunSuspendedEventSchema = BaseEventSchema.extend({ + type: z.literal(EventType.RUN_SUSPENDED), + threadId: z.string(), + runId: z.string(), + result: z.any().optional(), +}); + export const RunErrorEventSchema = BaseEventSchema.extend({ type: z.literal(EventType.RUN_ERROR), message: z.string(), @@ -224,11 +245,14 @@ export type ThinkingStartEvent = z.infer; export type ThinkingEndEvent = z.infer; export type StateSnapshotEvent = z.infer; export type StateDeltaEvent = z.infer; +export type InterruptEvent = z.infer; export type MessagesSnapshotEvent = z.infer; export type RawEvent = z.infer; export type CustomEvent = z.infer; export type RunStartedEvent = z.infer; export type RunFinishedEvent = z.infer; +export type RunResumedEvent = z.infer; +export type RunSuspendedEvent = z.infer; export type RunErrorEvent = z.infer; export type StepStartedEvent = z.infer; export type StepFinishedEvent = z.infer;