Skip to content

Commit 82bac56

Browse files
committed
complete subscriber handling
1 parent 9fbb4fe commit 82bac56

File tree

3 files changed

+667
-192
lines changed

3 files changed

+667
-192
lines changed

typescript-sdk/packages/client/src/agent/agent.ts

Lines changed: 138 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { convertToLegacyEvents } from "@/legacy/convert";
1212
import { LegacyRuntimeProtocolEvent } from "@/legacy/types";
1313
import { lastValueFrom, of } from "rxjs";
1414
import { transformChunks } from "@/chunks";
15-
import { AgentStateMutation, RunAgentSubscriber } from "./subscriber";
15+
import { AgentStateMutation, RunAgentSubscriber, runSubscribersWithMutation } from "./subscriber";
1616

1717
export abstract class AbstractAgent {
1818
public agentId?: string;
@@ -50,22 +50,27 @@ export abstract class AbstractAgent {
5050

5151
protected abstract run(input: RunAgentInput): Observable<BaseEvent>;
5252

53-
public async runAgent(parameters?: RunAgentParameters): Promise<void> {
53+
public async runAgent(
54+
parameters?: RunAgentParameters,
55+
subscriber?: RunAgentSubscriber,
56+
): Promise<void> {
5457
this.agentId = this.agentId ?? uuidv4();
5558
const input = this.prepareRunAgentInput(parameters);
59+
const subscribers = [...this.subscribers, subscriber ?? {}];
60+
61+
this.onInitialize(input, subscribers);
5662

5763
const pipeline = pipe(
5864
() => this.run(input),
5965
transformChunks(this.debug),
6066
verifyEvents(this.debug),
61-
(source$) => this.apply(input, source$),
62-
(source$) => this.processApplyEvents(input, source$),
67+
(source$) => this.apply(input, source$, subscribers),
68+
(source$) => this.processApplyEvents(input, source$, subscribers),
6369
catchError((error) => {
64-
this.onError(error);
65-
return throwError(() => error);
70+
return this.onError(input, error, subscribers);
6671
}),
6772
finalize(() => {
68-
this.onFinalize();
73+
this.onFinalize(input, subscribers);
6974
}),
7075
);
7176

@@ -77,21 +82,37 @@ export abstract class AbstractAgent {
7782
protected apply(
7883
input: RunAgentInput,
7984
events$: Observable<BaseEvent>,
85+
subscribers: RunAgentSubscriber[],
8086
): Observable<AgentStateMutation> {
81-
return defaultApplyEvents(input, events$);
87+
return defaultApplyEvents(input, events$, this, subscribers);
8288
}
8389

8490
protected processApplyEvents(
8591
input: RunAgentInput,
8692
events$: Observable<AgentStateMutation>,
93+
subscribers: RunAgentSubscriber[],
8794
): Observable<AgentStateMutation> {
8895
return events$.pipe(
8996
tap((event) => {
9097
if (event.messages) {
9198
this.messages = event.messages;
99+
subscribers.forEach((subscriber) => {
100+
subscriber.onMessagesChanged?.({
101+
messages: this.messages,
102+
agent: this,
103+
input,
104+
});
105+
});
92106
}
93107
if (event.state) {
94108
this.state = event.state;
109+
subscribers.forEach((subscriber) => {
110+
subscriber.onStateChanged?.({
111+
state: this.state,
112+
agent: this,
113+
input,
114+
});
115+
});
95116
}
96117
}),
97118
);
@@ -109,11 +130,117 @@ export abstract class AbstractAgent {
109130
};
110131
}
111132

112-
protected onError(error: Error) {
113-
console.error("Agent execution failed:", error);
133+
protected onInitialize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
134+
const onRunInitializedMutation = runSubscribersWithMutation(
135+
subscribers,
136+
this.messages,
137+
this.state,
138+
(subscriber, messages, state) =>
139+
subscriber.onRunInitialized?.({ messages, state, agent: this, input }),
140+
);
141+
if (
142+
onRunInitializedMutation.messages !== undefined ||
143+
onRunInitializedMutation.state !== undefined
144+
) {
145+
if (onRunInitializedMutation.messages) {
146+
this.messages = onRunInitializedMutation.messages;
147+
input.messages = onRunInitializedMutation.messages;
148+
subscribers.forEach((subscriber) => {
149+
subscriber.onMessagesChanged?.({
150+
messages: this.messages,
151+
agent: this,
152+
input,
153+
});
154+
});
155+
}
156+
if (onRunInitializedMutation.state) {
157+
this.state = onRunInitializedMutation.state;
158+
input.state = onRunInitializedMutation.state;
159+
subscribers.forEach((subscriber) => {
160+
subscriber.onStateChanged?.({
161+
state: this.state,
162+
agent: this,
163+
input,
164+
});
165+
});
166+
}
167+
}
114168
}
115169

116-
protected onFinalize() {}
170+
protected onError(input: RunAgentInput, error: Error, subscribers: RunAgentSubscriber[]) {
171+
const onRunFailedMutation = runSubscribersWithMutation(
172+
subscribers,
173+
this.messages,
174+
this.state,
175+
(subscriber, messages, state) =>
176+
subscriber.onRunFailed?.({ error, messages, state, agent: this, input }),
177+
);
178+
179+
if (onRunFailedMutation.messages !== undefined || onRunFailedMutation.state !== undefined) {
180+
if (onRunFailedMutation.messages !== undefined) {
181+
this.messages = onRunFailedMutation.messages;
182+
subscribers.forEach((subscriber) => {
183+
subscriber.onMessagesChanged?.({
184+
messages: this.messages,
185+
agent: this,
186+
input,
187+
});
188+
});
189+
}
190+
if (onRunFailedMutation.state !== undefined) {
191+
this.state = onRunFailedMutation.state;
192+
subscribers.forEach((subscriber) => {
193+
subscriber.onStateChanged?.({
194+
state: this.state,
195+
agent: this,
196+
input,
197+
});
198+
});
199+
}
200+
}
201+
if (onRunFailedMutation.stopPropagation !== true) {
202+
console.error("Agent execution failed:", error);
203+
return throwError(() => error);
204+
} else {
205+
return of(null);
206+
}
207+
}
208+
209+
protected onFinalize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
210+
const onRunFinalizedMutation = runSubscribersWithMutation(
211+
subscribers,
212+
this.messages,
213+
this.state,
214+
(subscriber, messages, state) =>
215+
subscriber.onRunFinalized?.({ messages, state, agent: this, input }),
216+
);
217+
218+
if (
219+
onRunFinalizedMutation.messages !== undefined ||
220+
onRunFinalizedMutation.state !== undefined
221+
) {
222+
if (onRunFinalizedMutation.messages !== undefined) {
223+
this.messages = onRunFinalizedMutation.messages;
224+
subscribers.forEach((subscriber) => {
225+
subscriber.onMessagesChanged?.({
226+
messages: this.messages,
227+
agent: this,
228+
input,
229+
});
230+
});
231+
}
232+
if (onRunFinalizedMutation.state !== undefined) {
233+
this.state = onRunFinalizedMutation.state;
234+
subscribers.forEach((subscriber) => {
235+
subscriber.onStateChanged?.({
236+
state: this.state,
237+
agent: this,
238+
input,
239+
});
240+
});
241+
}
242+
}
243+
}
117244

118245
public clone() {
119246
const cloned = Object.create(Object.getPrototypeOf(this));

typescript-sdk/packages/client/src/agent/subscriber.ts

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ import {
1919
ToolCallStartEvent,
2020
MessagesSnapshotEvent,
2121
RawEvent,
22+
CustomEvent,
2223
} from "@ag-ui/core";
2324
import { AbstractAgent } from "./agent";
25+
import { structuredClone_ } from "@/utils";
2426

2527
export interface AgentStateMutation {
2628
messages?: Message[];
@@ -35,88 +37,141 @@ export interface RunAgentSubscriberParams {
3537
input: RunAgentInput;
3638
}
3739

40+
// TODO make shit async
3841
export interface RunAgentSubscriber {
39-
// Lifecycle
40-
onInitializeRun(params: RunAgentSubscriberParams): AgentStateMutation | undefined;
41-
onError(params: { error: Error } & RunAgentSubscriberParams): AgentStateMutation | undefined;
42-
onFinalizeRun(params: RunAgentSubscriberParams): AgentStateMutation | undefined;
42+
// Request lifecycle
43+
onRunInitialized?(
44+
params: RunAgentSubscriberParams,
45+
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
46+
onRunFailed?(
47+
params: { error: Error } & RunAgentSubscriberParams,
48+
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
49+
onRunFinalized?(
50+
params: RunAgentSubscriberParams,
51+
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
4352

4453
// Events
45-
onEvent(params: { event: BaseEvent } & RunAgentSubscriberParams): AgentStateMutation | undefined;
54+
onEvent?(params: { event: BaseEvent } & RunAgentSubscriberParams): AgentStateMutation | undefined;
4655

47-
onRunStartedEvent(
56+
onRunStartedEvent?(
4857
params: { event: RunStartedEvent } & RunAgentSubscriberParams,
4958
): AgentStateMutation | undefined;
50-
onRunFinishedEvent(
59+
onRunFinishedEvent?(
5160
params: { event: RunFinishedEvent } & RunAgentSubscriberParams,
5261
): AgentStateMutation | undefined;
53-
onRunErrorEvent(
62+
onRunErrorEvent?(
5463
params: { event: RunErrorEvent } & RunAgentSubscriberParams,
5564
): AgentStateMutation | undefined;
5665

57-
onStepStartedEvent(
66+
onStepStartedEvent?(
5867
params: { event: StepStartedEvent } & RunAgentSubscriberParams,
5968
): AgentStateMutation | undefined;
60-
onStepFinishedEvent(
69+
onStepFinishedEvent?(
6170
params: { event: StepFinishedEvent } & RunAgentSubscriberParams,
6271
): AgentStateMutation | undefined;
6372

64-
onTextMessageStartEvent(
73+
onTextMessageStartEvent?(
6574
params: { event: TextMessageStartEvent } & RunAgentSubscriberParams,
6675
): AgentStateMutation | undefined;
67-
onTextMessageContentEvent(
76+
onTextMessageContentEvent?(
6877
params: {
6978
event: TextMessageContentEvent;
7079
textMessageBuffer: string;
7180
} & RunAgentSubscriberParams,
7281
): AgentStateMutation | undefined;
73-
onTextMessageEndEvent(
82+
onTextMessageEndEvent?(
7483
params: { event: TextMessageEndEvent; textMessageBuffer: string } & RunAgentSubscriberParams,
7584
): AgentStateMutation | undefined;
7685

77-
onToolCallStartEvent(
86+
onToolCallStartEvent?(
7887
params: { event: ToolCallStartEvent } & RunAgentSubscriberParams,
7988
): AgentStateMutation | undefined;
80-
onToolCallArgsEvent(
89+
onToolCallArgsEvent?(
8190
params: {
8291
event: ToolCallArgsEvent;
8392
toolCallBuffer: string;
8493
toolCallName: string;
8594
} & RunAgentSubscriberParams,
8695
): AgentStateMutation | undefined;
87-
onToolCallEndEvent(
96+
onToolCallEndEvent?(
8897
params: {
8998
event: ToolCallEndEvent;
9099
toolCallBuffer: string;
91100
toolCallName: string;
92101
} & RunAgentSubscriberParams,
93102
): AgentStateMutation | undefined;
94103

95-
onToolCallResultEvent(
104+
onToolCallResultEvent?(
96105
params: { event: ToolCallResultEvent } & RunAgentSubscriberParams,
97106
): AgentStateMutation | undefined;
98107

99-
onStateSnapshotEvent(
108+
onStateSnapshotEvent?(
100109
params: { event: StateSnapshotEvent } & RunAgentSubscriberParams,
101110
): AgentStateMutation | undefined;
102111

103-
onStateDeltaEvent(
104-
params: { event: StateDeltaEvent; previousState: State } & RunAgentSubscriberParams,
112+
onStateDeltaEvent?(
113+
params: { event: StateDeltaEvent } & RunAgentSubscriberParams,
105114
): AgentStateMutation | undefined;
106115

107-
onMessagesSnapshotEvent(
116+
onMessagesSnapshotEvent?(
108117
params: { event: MessagesSnapshotEvent } & RunAgentSubscriberParams,
109118
): AgentStateMutation | undefined;
110119

111-
onRawEvent(
120+
onRawEvent?(
112121
params: { event: RawEvent } & RunAgentSubscriberParams,
113122
): AgentStateMutation | undefined;
114123

115-
onCustomEvent(
124+
onCustomEvent?(
116125
params: { event: CustomEvent } & RunAgentSubscriberParams,
117126
): AgentStateMutation | undefined;
118127

119128
// State changes
120-
onMessagesChanged(params: { messages: Message[] } & RunAgentSubscriberParams): void;
121-
onStateChanged(params: { state: State } & RunAgentSubscriberParams): void;
129+
onMessagesChanged?(params: Omit<RunAgentSubscriberParams, "state">): void;
130+
onStateChanged?(params: Omit<RunAgentSubscriberParams, "messages">): void;
131+
}
132+
133+
export function runSubscribersWithMutation(
134+
subscribers: RunAgentSubscriber[],
135+
initialMessages: Message[],
136+
initialState: State,
137+
executor: (
138+
subscriber: RunAgentSubscriber,
139+
messages: Message[],
140+
state: State,
141+
) => AgentStateMutation | undefined,
142+
): AgentStateMutation {
143+
let messages: Message[] = initialMessages;
144+
let state: State = initialState;
145+
146+
let stopPropagation = undefined;
147+
148+
for (const subscriber of subscribers) {
149+
const mutation = executor(subscriber, structuredClone_(messages), structuredClone_(state));
150+
151+
if (mutation === undefined) {
152+
// Nothing returned – keep going
153+
continue;
154+
}
155+
156+
// Merge messages/state so next subscriber sees latest view
157+
if (mutation.messages !== undefined) {
158+
messages = mutation.messages;
159+
}
160+
161+
if (mutation.state !== undefined) {
162+
state = mutation.state;
163+
}
164+
165+
stopPropagation = mutation.stopPropagation;
166+
167+
if (stopPropagation === true) {
168+
break;
169+
}
170+
}
171+
172+
return {
173+
...(JSON.stringify(messages) !== JSON.stringify(initialMessages) ? { messages } : {}),
174+
...(JSON.stringify(state) !== JSON.stringify(initialState) ? { state } : {}),
175+
...(stopPropagation !== undefined ? { stopPropagation } : {}),
176+
};
122177
}

0 commit comments

Comments
 (0)