Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/concepts/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ The `RunFinished` event indicates that an agent has successfully completed all
its work for the current run. Upon receiving this event, frontends should
finalize any UI states that were waiting on the agent's completion. This event
marks a clean termination point and indicates that no further processing will
occur in this run unless explicitly requested.
occur in this run unless explicitly requested. The optional `result` field can
contain any output data produced by the agent run.

| Property | Description |
| ---------- | ----------------------------- |
| `threadId` | ID of the conversation thread |
| `runId` | ID of the agent run |
| `result` | Optional result data from run |

### RunError

Expand Down
10 changes: 6 additions & 4 deletions docs/sdk/js/core/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ type RunFinishedEvent = BaseEvent & {
type: EventType.RUN_FINISHED
threadId: string
runId: string
result?: any
}
```

| Property | Type | Description |
| ---------- | -------- | ----------------------------- |
| `threadId` | `string` | ID of the conversation thread |
| `runId` | `string` | ID of the agent run |
| Property | Type | Description |
| ---------- | ---------------- | ------------------------------ |
| `threadId` | `string` | ID of the conversation thread |
| `runId` | `string` | ID of the agent run |
| `result` | `any` (optional) | Result data from the agent run |

### RunErrorEvent

Expand Down
10 changes: 6 additions & 4 deletions docs/sdk/python/core/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ class RunFinishedEvent(BaseEvent):
type: Literal[EventType.RUN_FINISHED]
thread_id: str
run_id: str
result: Optional[Any] = None
```

| Property | Type | Description |
| ----------- | ----- | ----------------------------- |
| `thread_id` | `str` | ID of the conversation thread |
| `run_id` | `str` | ID of the agent run |
| Property | Type | Description |
| ----------- | --------------- | ------------------------------ |
| `thread_id` | `str` | ID of the conversation thread |
| `run_id` | `str` | ID of the agent run |
| `result` | `Optional[Any]` | Result data from the agent run |

### RunErrorEvent

Expand Down
1 change: 1 addition & 0 deletions python-sdk/ag_ui/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class RunFinishedEvent(BaseEvent):
type: Literal[EventType.RUN_FINISHED]
thread_id: str
run_id: str
result: Optional[Any] = None


class RunErrorEvent(BaseEvent):
Expand Down
195 changes: 176 additions & 19 deletions typescript-sdk/packages/client/src/agent/agent.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { defaultApplyEvents } from "@/apply/default";
import { Message, State, RunAgentInput, RunAgent, ApplyEvents, BaseEvent } from "@ag-ui/core";
import { Message, State, RunAgentInput, BaseEvent } from "@ag-ui/core";

import { AgentConfig, RunAgentParameters } from "./types";
import { v4 as uuidv4 } from "uuid";
import { structuredClone_ } from "@/utils";
import { catchError, map, tap } from "rxjs/operators";
import { catchError, map, tap, mergeMap } from "rxjs/operators";
import { finalize } from "rxjs/operators";
import { throwError, pipe, Observable } from "rxjs";
import { pipe, Observable, from, of, EMPTY } from "rxjs";
import { verifyEvents } from "@/verify";
import { convertToLegacyEvents } from "@/legacy/convert";
import { LegacyRuntimeProtocolEvent } from "@/legacy/types";
import { lastValueFrom, of } from "rxjs";
import { lastValueFrom } from "rxjs";
import { transformChunks } from "@/chunks";
import { AgentStateMutation, RunAgentSubscriber, runSubscribersWithMutation } from "./subscriber";

export abstract class AbstractAgent {
public agentId?: string;
Expand All @@ -20,6 +21,7 @@ export abstract class AbstractAgent {
public messages: Message[];
public state: State;
public debug: boolean = false;
public subscribers: RunAgentSubscriber[] = [];

constructor({
agentId,
Expand All @@ -37,47 +39,89 @@ export abstract class AbstractAgent {
this.debug = debug ?? false;
}

protected abstract run(...args: Parameters<RunAgent>): ReturnType<RunAgent>;
public subscribe(subscriber: RunAgentSubscriber) {
this.subscribers.push(subscriber);
return {
unsubscribe: () => {
this.subscribers = this.subscribers.filter((s) => s !== subscriber);
},
};
}

public async runAgent(parameters?: RunAgentParameters): Promise<void> {
protected abstract run(input: RunAgentInput): Observable<BaseEvent>;

public async runAgent(
parameters?: RunAgentParameters,
subscriber?: RunAgentSubscriber,
): Promise<void> {
this.agentId = this.agentId ?? uuidv4();
const input = this.prepareRunAgentInput(parameters);
let result: any = undefined;
const subscribers: RunAgentSubscriber[] = [
{
onRunFinishedEvent: (params) => {
result = params.result;
},
},
...this.subscribers,
subscriber ?? {},
];

await this.onInitialize(input, subscribers);

const pipeline = pipe(
() => this.run(input),
transformChunks(this.debug),
verifyEvents(this.debug),
(source$) => this.apply(input, source$),
(source$) => this.processApplyEvents(input, source$),
(source$) => this.apply(input, source$, subscribers),
(source$) => this.processApplyEvents(input, source$, subscribers),
catchError((error) => {
this.onError(error);
return throwError(() => error);
return this.onError(input, error, subscribers);
}),
finalize(() => {
this.onFinalize();
void this.onFinalize(input, subscribers);
}),
);

return lastValueFrom(pipeline(of(null))).then(() => {});
return lastValueFrom(pipeline(of(null))).then(() => result);
}

public abortRun() {}

protected apply(...args: Parameters<ApplyEvents>): ReturnType<ApplyEvents> {
return defaultApplyEvents(...args);
protected apply(
input: RunAgentInput,
events$: Observable<BaseEvent>,
subscribers: RunAgentSubscriber[],
): Observable<AgentStateMutation> {
return defaultApplyEvents(input, events$, this, subscribers);
}

protected processApplyEvents(
input: RunAgentInput,
events$: ReturnType<ApplyEvents>,
): ReturnType<ApplyEvents> {
events$: Observable<AgentStateMutation>,
subscribers: RunAgentSubscriber[],
): Observable<AgentStateMutation> {
return events$.pipe(
tap((event) => {
if (event.messages) {
this.messages = event.messages;
subscribers.forEach((subscriber) => {
subscriber.onMessagesChanged?.({
messages: this.messages,
agent: this,
input,
});
});
}
if (event.state) {
this.state = event.state;
subscribers.forEach((subscriber) => {
subscriber.onStateChanged?.({
state: this.state,
agent: this,
input,
});
});
}
}),
);
Expand All @@ -95,11 +139,124 @@ export abstract class AbstractAgent {
};
}

protected onError(error: Error) {
console.error("Agent execution failed:", error);
protected async onInitialize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
const onRunInitializedMutation = await runSubscribersWithMutation(
subscribers,
this.messages,
this.state,
(subscriber, messages, state) =>
subscriber.onRunInitialized?.({ messages, state, agent: this, input }),
);
if (
onRunInitializedMutation.messages !== undefined ||
onRunInitializedMutation.state !== undefined
) {
if (onRunInitializedMutation.messages) {
this.messages = onRunInitializedMutation.messages;
input.messages = onRunInitializedMutation.messages;
subscribers.forEach((subscriber) => {
subscriber.onMessagesChanged?.({
messages: this.messages,
agent: this,
input,
});
});
}
if (onRunInitializedMutation.state) {
this.state = onRunInitializedMutation.state;
input.state = onRunInitializedMutation.state;
subscribers.forEach((subscriber) => {
subscriber.onStateChanged?.({
state: this.state,
agent: this,
input,
});
});
}
}
}

protected onError(input: RunAgentInput, error: Error, subscribers: RunAgentSubscriber[]) {
return from(
runSubscribersWithMutation(
subscribers,
this.messages,
this.state,
(subscriber, messages, state) =>
subscriber.onRunFailed?.({ error, messages, state, agent: this, input }),
),
).pipe(
map((onRunFailedMutation) => {
const mutation = onRunFailedMutation as AgentStateMutation;
if (mutation.messages !== undefined || mutation.state !== undefined) {
if (mutation.messages !== undefined) {
this.messages = mutation.messages;
subscribers.forEach((subscriber) => {
subscriber.onMessagesChanged?.({
messages: this.messages,
agent: this,
input,
});
});
}
if (mutation.state !== undefined) {
this.state = mutation.state;
subscribers.forEach((subscriber) => {
subscriber.onStateChanged?.({
state: this.state,
agent: this,
input,
});
});
}
}

if (mutation.stopPropagation !== true) {
console.error("Agent execution failed:", error);
throw error;
}

return null as unknown as never;
}),
mergeMap((value) => (value === null ? EMPTY : of(value))),
);
}

protected onFinalize() {}
protected async onFinalize(input: RunAgentInput, subscribers: RunAgentSubscriber[]) {
const onRunFinalizedMutation = await runSubscribersWithMutation(
subscribers,
this.messages,
this.state,
(subscriber, messages, state) =>
subscriber.onRunFinalized?.({ messages, state, agent: this, input }),
);

if (
onRunFinalizedMutation.messages !== undefined ||
onRunFinalizedMutation.state !== undefined
) {
if (onRunFinalizedMutation.messages !== undefined) {
this.messages = onRunFinalizedMutation.messages;
subscribers.forEach((subscriber) => {
subscriber.onMessagesChanged?.({
messages: this.messages,
agent: this,
input,
});
});
}
if (onRunFinalizedMutation.state !== undefined) {
this.state = onRunFinalizedMutation.state;
subscribers.forEach((subscriber) => {
subscriber.onStateChanged?.({
state: this.state,
agent: this,
input,
});
});
}
}
}

public clone() {
const cloned = Object.create(Object.getPrototypeOf(this));
Expand Down
4 changes: 2 additions & 2 deletions typescript-sdk/packages/client/src/agent/http.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbstractAgent } from "./agent";
import { runHttpRequest, HttpEvent } from "@/run/http-request";
import { runHttpRequest } from "@/run/http-request";
import { HttpAgentConfig, RunAgentParameters } from "./types";
import { RunAgent, RunAgentInput, BaseEvent } from "@ag-ui/core";
import { RunAgentInput, BaseEvent } from "@ag-ui/core";
import { structuredClone_ } from "@/utils";
import { transformHttpEventStream } from "@/transform/http";
import { Observable } from "rxjs";
Expand Down
Loading