Skip to content

Commit 601ef28

Browse files
committed
wip
1 parent acc8fa8 commit 601ef28

File tree

3 files changed

+48
-27
lines changed

3 files changed

+48
-27
lines changed

typescript-sdk/packages/client/src/agent/__tests__/subscriber.test.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ describe("RunAgentSubscriber", () => {
316316
const mutatingSubscriber: RunAgentSubscriber = {
317317
onStateSnapshotEvent: jest.fn().mockReturnValue({
318318
state: { modifiedBySubscriber: true },
319+
stopPropagation: true, // Prevent the event from applying its snapshot
319320
}),
320321
onStateChanged: jest.fn(),
321322
};
@@ -618,8 +619,8 @@ describe("RunAgentSubscriber", () => {
618619
expect(specificSubscriber.onTextMessageStartEvent).toHaveBeenCalledWith(
619620
expect.objectContaining({
620621
event: textStartEvent,
621-
messages: agent.messages,
622-
state: agent.state,
622+
messages: [{ content: "Hello", id: "msg-1", role: "user" }], // Pre-mutation state
623+
state: { counter: 0 }, // Pre-mutation state
623624
agent,
624625
}),
625626
);
@@ -628,8 +629,8 @@ describe("RunAgentSubscriber", () => {
628629
expect.objectContaining({
629630
event: textContentEvent,
630631
textMessageBuffer: "Hello",
631-
messages: agent.messages,
632-
state: agent.state,
632+
messages: agent.messages, // Can check final state for content events
633+
state: agent.state, // Can check final state for content events
633634
agent,
634635
}),
635636
);

typescript-sdk/packages/client/src/agent/subscriber.ts

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -154,31 +154,38 @@ export async function runSubscribersWithMutation(
154154
let stopPropagation: boolean | undefined = undefined;
155155

156156
for (const subscriber of subscribers) {
157-
const mutation = await executor(
158-
subscriber,
159-
structuredClone_(messages),
160-
structuredClone_(state),
161-
);
162-
163-
if (mutation === undefined) {
164-
// Nothing returned – keep going
157+
try {
158+
const mutation = await executor(
159+
subscriber,
160+
structuredClone_(messages),
161+
structuredClone_(state),
162+
);
163+
164+
if (mutation === undefined) {
165+
// Nothing returned – keep going
166+
continue;
167+
}
168+
169+
// Merge messages/state so next subscriber sees latest view
170+
if (mutation.messages !== undefined) {
171+
messages = mutation.messages;
172+
}
173+
174+
if (mutation.state !== undefined) {
175+
state = mutation.state;
176+
}
177+
178+
stopPropagation = mutation.stopPropagation;
179+
180+
if (stopPropagation === true) {
181+
break;
182+
}
183+
} catch (error) {
184+
// Log subscriber errors but continue processing
185+
console.error("Subscriber error:", error);
186+
// Continue to next subscriber unless we want to stop propagation
165187
continue;
166188
}
167-
168-
// Merge messages/state so next subscriber sees latest view
169-
if (mutation.messages !== undefined) {
170-
messages = mutation.messages;
171-
}
172-
173-
if (mutation.state !== undefined) {
174-
state = mutation.state;
175-
}
176-
177-
stopPropagation = mutation.stopPropagation;
178-
179-
if (stopPropagation === true) {
180-
break;
181-
}
182189
}
183190

184191
return {

typescript-sdk/packages/client/src/apply/default.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@ export const defaultApplyEvents = (
6868

6969
return events$.pipe(
7070
concatMap(async (event) => {
71+
const mutation = await runSubscribersWithMutation(
72+
subscribers,
73+
messages,
74+
state,
75+
(subscriber, messages, state) =>
76+
subscriber.onEvent?.({ event, agent, input, messages, state }),
77+
);
78+
applyMutation(mutation);
79+
80+
if (mutation.stopPropagation === true) {
81+
return emitUpdates();
82+
}
83+
7184
switch (event.type) {
7285
case EventType.TEXT_MESSAGE_START: {
7386
const mutation = await runSubscribersWithMutation(

0 commit comments

Comments
 (0)