Skip to content

Commit 1ad3dea

Browse files
committed
optionally make RunAgentSubscriber async
1 parent 8a77487 commit 1ad3dea

File tree

3 files changed

+111
-94
lines changed

3 files changed

+111
-94
lines changed

typescript-sdk/packages/client/src/agent/agent.ts

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import { Message, State, RunAgentInput, BaseEvent } from "@ag-ui/core";
44
import { AgentConfig, RunAgentParameters } from "./types";
55
import { v4 as uuidv4 } from "uuid";
66
import { structuredClone_ } from "@/utils";
7-
import { catchError, map, tap } from "rxjs/operators";
7+
import { catchError, map, tap, mergeMap } from "rxjs/operators";
88
import { finalize } from "rxjs/operators";
9-
import { throwError, pipe, Observable } from "rxjs";
9+
import { pipe, Observable, from, of, EMPTY } from "rxjs";
1010
import { verifyEvents } from "@/verify";
1111
import { convertToLegacyEvents } from "@/legacy/convert";
1212
import { LegacyRuntimeProtocolEvent } from "@/legacy/types";
13-
import { lastValueFrom, of } from "rxjs";
13+
import { lastValueFrom } from "rxjs";
1414
import { transformChunks } from "@/chunks";
1515
import { AgentStateMutation, RunAgentSubscriber, runSubscribersWithMutation } from "./subscriber";
1616

@@ -58,7 +58,7 @@ export abstract class AbstractAgent {
5858
const input = this.prepareRunAgentInput(parameters);
5959
const subscribers = [...this.subscribers, subscriber ?? {}];
6060

61-
this.onInitialize(input, subscribers);
61+
await this.onInitialize(input, subscribers);
6262

6363
const pipeline = pipe(
6464
() => this.run(input),
@@ -70,7 +70,7 @@ export abstract class AbstractAgent {
7070
return this.onError(input, error, subscribers);
7171
}),
7272
finalize(() => {
73-
this.onFinalize(input, subscribers);
73+
void this.onFinalize(input, subscribers);
7474
}),
7575
);
7676

@@ -130,8 +130,8 @@ export abstract class AbstractAgent {
130130
};
131131
}
132132

133-
protected onInitialize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
134-
const onRunInitializedMutation = runSubscribersWithMutation(
133+
protected async onInitialize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
134+
const onRunInitializedMutation = await runSubscribersWithMutation(
135135
subscribers,
136136
this.messages,
137137
this.state,
@@ -168,46 +168,53 @@ export abstract class AbstractAgent {
168168
}
169169

170170
protected onError(input: RunAgentInput, error: Error, subscribers: RunAgentSubscriber[]) {
171-
const onRunFailedMutation = runSubscribersWithMutation(
172-
subscribers,
173-
this.messages,
174-
this.state,
175-
(subscriber, messages, state) =>
176-
subscriber.onRunFailed?.({ error, messages, state, agent: this, input }),
177-
);
171+
return from(
172+
runSubscribersWithMutation(
173+
subscribers,
174+
this.messages,
175+
this.state,
176+
(subscriber, messages, state) =>
177+
subscriber.onRunFailed?.({ error, messages, state, agent: this, input }),
178+
),
179+
).pipe(
180+
map((onRunFailedMutation) => {
181+
const mutation = onRunFailedMutation as AgentStateMutation;
182+
if (mutation.messages !== undefined || mutation.state !== undefined) {
183+
if (mutation.messages !== undefined) {
184+
this.messages = mutation.messages;
185+
subscribers.forEach((subscriber) => {
186+
subscriber.onMessagesChanged?.({
187+
messages: this.messages,
188+
agent: this,
189+
input,
190+
});
191+
});
192+
}
193+
if (mutation.state !== undefined) {
194+
this.state = mutation.state;
195+
subscribers.forEach((subscriber) => {
196+
subscriber.onStateChanged?.({
197+
state: this.state,
198+
agent: this,
199+
input,
200+
});
201+
});
202+
}
203+
}
178204

179-
if (onRunFailedMutation.messages !== undefined || onRunFailedMutation.state !== undefined) {
180-
if (onRunFailedMutation.messages !== undefined) {
181-
this.messages = onRunFailedMutation.messages;
182-
subscribers.forEach((subscriber) => {
183-
subscriber.onMessagesChanged?.({
184-
messages: this.messages,
185-
agent: this,
186-
input,
187-
});
188-
});
189-
}
190-
if (onRunFailedMutation.state !== undefined) {
191-
this.state = onRunFailedMutation.state;
192-
subscribers.forEach((subscriber) => {
193-
subscriber.onStateChanged?.({
194-
state: this.state,
195-
agent: this,
196-
input,
197-
});
198-
});
199-
}
200-
}
201-
if (onRunFailedMutation.stopPropagation !== true) {
202-
console.error("Agent execution failed:", error);
203-
return throwError(() => error);
204-
} else {
205-
return of(null);
206-
}
205+
if (mutation.stopPropagation !== true) {
206+
console.error("Agent execution failed:", error);
207+
throw error;
208+
}
209+
210+
return null as unknown as never;
211+
}),
212+
mergeMap((value) => (value === null ? EMPTY : of(value))),
213+
);
207214
}
208215

209-
protected onFinalize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
210-
const onRunFinalizedMutation = runSubscribersWithMutation(
216+
protected async onFinalize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
217+
const onRunFinalizedMutation = await runSubscribersWithMutation(
211218
subscribers,
212219
this.messages,
213220
this.state,

typescript-sdk/packages/client/src/agent/subscriber.ts

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,116 +37,124 @@ export interface RunAgentSubscriberParams {
3737
input: RunAgentInput;
3838
}
3939

40-
// TODO make shit async
40+
// Utility type to allow callbacks to be implemented either synchronously or asynchronously.
41+
export type MaybePromise<T> = T | Promise<T>;
42+
4143
export interface RunAgentSubscriber {
4244
// Request lifecycle
4345
onRunInitialized?(
4446
params: RunAgentSubscriberParams,
45-
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
47+
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | undefined>;
4648
onRunFailed?(
4749
params: { error: Error } & RunAgentSubscriberParams,
48-
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
50+
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | undefined>;
4951
onRunFinalized?(
5052
params: RunAgentSubscriberParams,
51-
): Omit<AgentStateMutation, "stopPropagation"> | undefined;
53+
): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | undefined>;
5254

5355
// Events
54-
onEvent?(params: { event: BaseEvent } & RunAgentSubscriberParams): AgentStateMutation | undefined;
56+
onEvent?(
57+
params: { event: BaseEvent } & RunAgentSubscriberParams,
58+
): MaybePromise<AgentStateMutation | undefined>;
5559

5660
onRunStartedEvent?(
5761
params: { event: RunStartedEvent } & RunAgentSubscriberParams,
58-
): AgentStateMutation | undefined;
62+
): MaybePromise<AgentStateMutation | undefined>;
5963
onRunFinishedEvent?(
6064
params: { event: RunFinishedEvent } & RunAgentSubscriberParams,
61-
): AgentStateMutation | undefined;
65+
): MaybePromise<AgentStateMutation | undefined>;
6266
onRunErrorEvent?(
6367
params: { event: RunErrorEvent } & RunAgentSubscriberParams,
64-
): AgentStateMutation | undefined;
68+
): MaybePromise<AgentStateMutation | undefined>;
6569

6670
onStepStartedEvent?(
6771
params: { event: StepStartedEvent } & RunAgentSubscriberParams,
68-
): AgentStateMutation | undefined;
72+
): MaybePromise<AgentStateMutation | undefined>;
6973
onStepFinishedEvent?(
7074
params: { event: StepFinishedEvent } & RunAgentSubscriberParams,
71-
): AgentStateMutation | undefined;
75+
): MaybePromise<AgentStateMutation | undefined>;
7276

7377
onTextMessageStartEvent?(
7478
params: { event: TextMessageStartEvent } & RunAgentSubscriberParams,
75-
): AgentStateMutation | undefined;
79+
): MaybePromise<AgentStateMutation | undefined>;
7680
onTextMessageContentEvent?(
7781
params: {
7882
event: TextMessageContentEvent;
7983
textMessageBuffer: string;
8084
} & RunAgentSubscriberParams,
81-
): AgentStateMutation | undefined;
85+
): MaybePromise<AgentStateMutation | undefined>;
8286
onTextMessageEndEvent?(
8387
params: { event: TextMessageEndEvent; textMessageBuffer: string } & RunAgentSubscriberParams,
84-
): AgentStateMutation | undefined;
88+
): MaybePromise<AgentStateMutation | undefined>;
8589

8690
onToolCallStartEvent?(
8791
params: { event: ToolCallStartEvent } & RunAgentSubscriberParams,
88-
): AgentStateMutation | undefined;
92+
): MaybePromise<AgentStateMutation | undefined>;
8993
onToolCallArgsEvent?(
9094
params: {
9195
event: ToolCallArgsEvent;
9296
toolCallBuffer: string;
9397
toolCallName: string;
9498
} & RunAgentSubscriberParams,
95-
): AgentStateMutation | undefined;
99+
): MaybePromise<AgentStateMutation | undefined>;
96100
onToolCallEndEvent?(
97101
params: {
98102
event: ToolCallEndEvent;
99103
toolCallBuffer: string;
100104
toolCallName: string;
101105
} & RunAgentSubscriberParams,
102-
): AgentStateMutation | undefined;
106+
): MaybePromise<AgentStateMutation | undefined>;
103107

104108
onToolCallResultEvent?(
105109
params: { event: ToolCallResultEvent } & RunAgentSubscriberParams,
106-
): AgentStateMutation | undefined;
110+
): MaybePromise<AgentStateMutation | undefined>;
107111

108112
onStateSnapshotEvent?(
109113
params: { event: StateSnapshotEvent } & RunAgentSubscriberParams,
110-
): AgentStateMutation | undefined;
114+
): MaybePromise<AgentStateMutation | undefined>;
111115

112116
onStateDeltaEvent?(
113117
params: { event: StateDeltaEvent } & RunAgentSubscriberParams,
114-
): AgentStateMutation | undefined;
118+
): MaybePromise<AgentStateMutation | undefined>;
115119

116120
onMessagesSnapshotEvent?(
117121
params: { event: MessagesSnapshotEvent } & RunAgentSubscriberParams,
118-
): AgentStateMutation | undefined;
122+
): MaybePromise<AgentStateMutation | undefined>;
119123

120124
onRawEvent?(
121125
params: { event: RawEvent } & RunAgentSubscriberParams,
122-
): AgentStateMutation | undefined;
126+
): MaybePromise<AgentStateMutation | undefined>;
123127

124128
onCustomEvent?(
125129
params: { event: CustomEvent } & RunAgentSubscriberParams,
126-
): AgentStateMutation | undefined;
130+
): MaybePromise<AgentStateMutation | undefined>;
127131

128132
// State changes
129-
onMessagesChanged?(params: Omit<RunAgentSubscriberParams, "state">): void;
130-
onStateChanged?(params: Omit<RunAgentSubscriberParams, "messages">): void;
133+
onMessagesChanged?(params: Omit<RunAgentSubscriberParams, "state">): MaybePromise<void>;
134+
onStateChanged?(params: Omit<RunAgentSubscriberParams, "messages">): MaybePromise<void>;
131135
}
132136

133-
export function runSubscribersWithMutation(
137+
export async function runSubscribersWithMutation(
134138
subscribers: RunAgentSubscriber[],
135139
initialMessages: Message[],
136140
initialState: State,
137141
executor: (
138142
subscriber: RunAgentSubscriber,
139143
messages: Message[],
140144
state: State,
141-
) => AgentStateMutation | undefined,
142-
): AgentStateMutation {
145+
) => MaybePromise<AgentStateMutation | undefined>,
146+
): Promise<AgentStateMutation> {
143147
let messages: Message[] = initialMessages;
144148
let state: State = initialState;
145149

146-
let stopPropagation = undefined;
150+
let stopPropagation: boolean | undefined = undefined;
147151

148152
for (const subscriber of subscribers) {
149-
const mutation = executor(subscriber, structuredClone_(messages), structuredClone_(state));
153+
const mutation = await executor(
154+
subscriber,
155+
structuredClone_(messages),
156+
structuredClone_(state),
157+
);
150158

151159
if (mutation === undefined) {
152160
// Nothing returned – keep going

0 commit comments

Comments
 (0)