Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import {
StepStartedEvent,
StepFinishedEvent,
} from "@ag-ui/core";
import { RunResumedEvent, RunSuspendedEvent } from "@ag-ui/core/src";

describe("verifyEvents lifecycle", () => {
// Test: RUN_STARTED must be the first event
it("should require RUN_STARTED as the first event", async () => {
it("should require RUN_STARTED as the first event if no RUN_SUSPENDED", async () => {
const source$ = new Subject<BaseEvent>();
const result$ = verifyEvents(false)(source$).pipe(
catchError((err) => {
Expand Down Expand Up @@ -86,6 +87,100 @@ describe("verifyEvents lifecycle", () => {
expect(events[0].type).toBe(EventType.RUN_STARTED);
});

it("should require RUN_RESUMED as the first event RUN_SUSPENDED", async () => {
const source$ = new Subject<BaseEvent>();
const result$ = verifyEvents(false)(source$).pipe(
toArray(),
catchError((err) => {
expect(err).toBeInstanceOf(AGUIError);
expect(err.message).toContain("First event in suspended run must be 'RUN_RESUMED'");
throw err;
}),
);

// Set up subscription
const promise = firstValueFrom(result$).catch((e) => e);

// Start run and send event
source$.next({
type: EventType.RUN_STARTED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunStartedEvent);

// Suspense run
source$.next({
type: EventType.RUN_SUSPENDED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunSuspendedEvent);

// Send another RUN_STARTED event
source$.next({
type: EventType.RUN_STARTED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunStartedEvent);

// Complete the source
source$.complete();

// Await the promise and expect it to be an error
const result = await promise;
expect(result).toBeInstanceOf(AGUIError);
});

it("should not allow multiple RUN_RESUMED events", async () => {
const source$ = new Subject<BaseEvent>();
const events: BaseEvent[] = [];

// Create a subscription that will complete only after an error
const subscription = verifyEvents(false)(source$).subscribe({
next: (event) => events.push(event),
error: (err) => {
expect(err).toBeInstanceOf(AGUIError);
expect(err.message).toContain("Cannot send 'RUN_RESUMED' without a previous 'RUN_SUSPENDED' event");
subscription.unsubscribe();
},
});

// Send first RUN_STARTED (should be accepted)
source$.next({
type: EventType.RUN_STARTED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunStartedEvent);

// Suspend run (should be accepted)
source$.next({
type: EventType.RUN_SUSPENDED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunSuspendedEvent);

// Resume run (should be accepted)
source$.next({
type: EventType.RUN_RESUMED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunResumedEvent);

// Resume run again. This should be rejected.
source$.next({
type: EventType.RUN_RESUMED,
threadId: "test-thread-id",
runId: "test-run-id",
} as RunResumedEvent);

// Complete the source and wait a bit for processing
source$.complete();
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify one event was processed before the error
expect(events.length).toBe(3);
expect(events[2].type).toBe(EventType.RUN_RESUMED);
});

// Test: No events should be allowed after RUN_FINISHED (except RUN_ERROR)
it("should not allow events after RUN_FINISHED (except RUN_ERROR)", async () => {
const source$ = new Subject<BaseEvent>();
Expand Down
30 changes: 30 additions & 0 deletions typescript-sdk/packages/client/src/verify/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const verifyEvents =
let activeThinkingStep = false;
let activeThinkingStepMessage = false;
let runStarted = false; // Track if a run has started
let runSuspended = false;

// Function to reset state for a new run
const resetRunState = () => {
Expand All @@ -28,6 +29,7 @@ export const verifyEvents =
runFinished = false;
runError = false;
runStarted = true;
runSuspended = false;
};

return source$.pipe(
Expand Down Expand Up @@ -57,6 +59,13 @@ export const verifyEvents =
`Cannot send event type '${eventType}': The run has already finished with 'RUN_FINISHED'. Start a new run with 'RUN_STARTED'.`,
),
);
} else if (runSuspended && eventType !== EventType.RUN_ERROR && eventType !== EventType.RUN_RESUMED) {
return throwError(
() =>
new AGUIError(
`First event in suspended run must be 'RUN_RESUMED'`,
),
);
}

// Handle first event requirement and sequential RUN_STARTED
Expand Down Expand Up @@ -262,6 +271,27 @@ export const verifyEvents =
return of(event);
}

case EventType.RUN_RESUMED: {
if (!runSuspended) {
return throwError(
() =>
new AGUIError(
`Cannot send 'RUN_RESUMED' without a previous 'RUN_SUSPENDED' event`,
),
);
}

runSuspended = false;

return of(event);
}

case EventType.RUN_SUSPENDED: {
runSuspended = true;

return of(event);
}

case EventType.RUN_ERROR: {
// RUN_ERROR can happen at any time
runError = true; // Set flag to prevent any further events
Expand Down
24 changes: 24 additions & 0 deletions typescript-sdk/packages/core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ export enum EventType {
THINKING_END = "THINKING_END",
STATE_SNAPSHOT = "STATE_SNAPSHOT",
STATE_DELTA = "STATE_DELTA",
INTERRUPT = "INTERRUPT",
MESSAGES_SNAPSHOT = "MESSAGES_SNAPSHOT",
RAW = "RAW",
CUSTOM = "CUSTOM",
RUN_STARTED = "RUN_STARTED",
RUN_FINISHED = "RUN_FINISHED",
RUN_SUSPENDED = "RUN_SUSPENDED",
RUN_RESUMED = "RUN_RESUMED",
RUN_ERROR = "RUN_ERROR",
STEP_STARTED = "STEP_STARTED",
STEP_FINISHED = "STEP_FINISHED",
Expand Down Expand Up @@ -134,6 +137,11 @@ export const StateDeltaEventSchema = BaseEventSchema.extend({
delta: z.array(z.any()), // JSON Patch (RFC 6902)
});

export const InterruptEventSchema = BaseEventSchema.extend({
type: z.literal(EventType.INTERRUPT),
content: z.any(),
});

export const MessagesSnapshotEventSchema = BaseEventSchema.extend({
type: z.literal(EventType.MESSAGES_SNAPSHOT),
messages: z.array(MessageSchema),
Expand Down Expand Up @@ -164,6 +172,19 @@ export const RunFinishedEventSchema = BaseEventSchema.extend({
result: z.any().optional(),
});

export const RunResumedEventSchema = BaseEventSchema.extend({
type: z.literal(EventType.RUN_RESUMED),
threadId: z.string(),
runId: z.string(),
});

export const RunSuspendedEventSchema = BaseEventSchema.extend({
type: z.literal(EventType.RUN_SUSPENDED),
threadId: z.string(),
runId: z.string(),
result: z.any().optional(),
});

export const RunErrorEventSchema = BaseEventSchema.extend({
type: z.literal(EventType.RUN_ERROR),
message: z.string(),
Expand Down Expand Up @@ -224,11 +245,14 @@ export type ThinkingStartEvent = z.infer<typeof ThinkingStartEventSchema>;
export type ThinkingEndEvent = z.infer<typeof ThinkingEndEventSchema>;
export type StateSnapshotEvent = z.infer<typeof StateSnapshotEventSchema>;
export type StateDeltaEvent = z.infer<typeof StateDeltaEventSchema>;
export type InterruptEvent = z.infer<typeof InterruptEventSchema>;
export type MessagesSnapshotEvent = z.infer<typeof MessagesSnapshotEventSchema>;
export type RawEvent = z.infer<typeof RawEventSchema>;
export type CustomEvent = z.infer<typeof CustomEventSchema>;
export type RunStartedEvent = z.infer<typeof RunStartedEventSchema>;
export type RunFinishedEvent = z.infer<typeof RunFinishedEventSchema>;
export type RunResumedEvent = z.infer<typeof RunResumedEventSchema>;
export type RunSuspendedEvent = z.infer<typeof RunSuspendedEventSchema>;
export type RunErrorEvent = z.infer<typeof RunErrorEventSchema>;
export type StepStartedEvent = z.infer<typeof StepStartedEventSchema>;
export type StepFinishedEvent = z.infer<typeof StepFinishedEventSchema>;
Loading