Skip to content

Commit 341e880

Browse files
Emit final snapshot before run finished event
1 parent 8e4a640 commit 341e880

File tree

2 files changed

+108
-1
lines changed

2 files changed

+108
-1
lines changed

sdks/typescript/packages/client/src/apply/__tests__/default.text-message.test.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Subject } from "rxjs";
22
import { toArray } from "rxjs/operators";
3-
import { firstValueFrom } from "rxjs";
3+
import { firstValueFrom, of } from "rxjs";
44
import {
55
BaseEvent,
66
EventType,
@@ -9,6 +9,7 @@ import {
99
TextMessageContentEvent,
1010
TextMessageEndEvent,
1111
RunAgentInput,
12+
RunFinishedEvent,
1213
} from "@ag-ui/core";
1314
import { defaultApplyEvents } from "../default";
1415
import { AbstractAgent } from "@/agent";
@@ -188,4 +189,59 @@ describe("defaultApplyEvents with text messages", () => {
188189
// Verify no additional updates after either TEXT_MESSAGE_END
189190
expect(stateUpdates.length).toBe(4);
190191
});
192+
193+
it("should emit a messages snapshot when the run finishes", async () => {
194+
const initialState: RunAgentInput = {
195+
messages: [],
196+
state: {},
197+
threadId: "test-thread",
198+
runId: "test-run",
199+
tools: [],
200+
context: [],
201+
};
202+
203+
const subscriber = {
204+
onMessagesSnapshotEvent: jest.fn(),
205+
onEvent: jest.fn(),
206+
};
207+
208+
const events: BaseEvent[] = [
209+
{
210+
type: EventType.TEXT_MESSAGE_START,
211+
messageId: "msg-1",
212+
role: "assistant",
213+
} as TextMessageStartEvent,
214+
{
215+
type: EventType.TEXT_MESSAGE_CONTENT,
216+
messageId: "msg-1",
217+
delta: "Hello world!",
218+
} as TextMessageContentEvent,
219+
{
220+
type: EventType.TEXT_MESSAGE_END,
221+
messageId: "msg-1",
222+
} as TextMessageEndEvent,
223+
{
224+
type: EventType.RUN_FINISHED,
225+
threadId: "test-thread",
226+
runId: "test-run",
227+
} as RunFinishedEvent,
228+
];
229+
230+
const result$ = defaultApplyEvents(initialState, of(...events), FAKE_AGENT, [subscriber as any]);
231+
const mutations = await firstValueFrom(result$.pipe(toArray()));
232+
233+
expect(subscriber.onMessagesSnapshotEvent).toHaveBeenCalledTimes(1);
234+
const snapshotArgs = subscriber.onMessagesSnapshotEvent.mock.calls[0][0];
235+
expect(snapshotArgs.event.type).toBe(EventType.MESSAGES_SNAPSHOT);
236+
expect(snapshotArgs.event.messages).toHaveLength(1);
237+
expect(snapshotArgs.event.messages?.[0]?.content).toBe("Hello world!");
238+
239+
const eventTypes = subscriber.onEvent.mock.calls.map((call) => call[0].event.type);
240+
expect(eventTypes[eventTypes.length - 2]).toBe(EventType.MESSAGES_SNAPSHOT);
241+
expect(eventTypes[eventTypes.length - 1]).toBe(EventType.RUN_FINISHED);
242+
243+
const finalMutation = mutations[mutations.length - 1];
244+
expect(finalMutation.messages).toHaveLength(1);
245+
expect(finalMutation.messages?.[0]?.content).toBe("Hello world!");
246+
});
191247
});

sdks/typescript/packages/client/src/apply/default.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,59 @@ export const defaultApplyEvents = (
6969
return EMPTY;
7070
};
7171

72+
const emitMessagesSnapshot = async () => {
73+
const snapshotEvent: MessagesSnapshotEvent = {
74+
type: EventType.MESSAGES_SNAPSHOT,
75+
messages: structuredClone_(messages),
76+
};
77+
78+
const snapshotOnEventMutation = await runSubscribersWithMutation(
79+
subscribers,
80+
messages,
81+
state,
82+
(subscriber, messages, state) =>
83+
subscriber.onEvent?.({
84+
event: snapshotEvent,
85+
agent,
86+
input,
87+
messages,
88+
state,
89+
}),
90+
);
91+
applyMutation(snapshotOnEventMutation);
92+
93+
if (snapshotOnEventMutation.stopPropagation === true) {
94+
return;
95+
}
96+
97+
snapshotEvent.messages = structuredClone_(messages);
98+
99+
const snapshotMutation = await runSubscribersWithMutation(
100+
subscribers,
101+
messages,
102+
state,
103+
(subscriber, messages, state) =>
104+
subscriber.onMessagesSnapshotEvent?.({
105+
event: snapshotEvent,
106+
messages,
107+
state,
108+
agent,
109+
input,
110+
}),
111+
);
112+
applyMutation(snapshotMutation);
113+
114+
if (snapshotMutation.stopPropagation !== true && snapshotMutation.messages === undefined) {
115+
applyMutation({ messages: structuredClone_(messages) });
116+
}
117+
};
118+
72119
return events$.pipe(
73120
concatMap(async (event) => {
121+
if (event.type === EventType.RUN_FINISHED) {
122+
await emitMessagesSnapshot();
123+
}
124+
74125
const mutation = await runSubscribersWithMutation(
75126
subscribers,
76127
messages,

0 commit comments

Comments
 (0)