Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/sdk/js/client/abstract-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,42 @@ Creates a deep copy of the agent instance.
clone(): AbstractAgent
```

### connectAgent()

Establishes a persistent connection with an agent that implements the
`connect()` method.

```typescript
connectAgent(parameters?: RunAgentParameters, subscriber?: AgentSubscriber): Promise<RunAgentResult>
```

Similar to `runAgent()` but uses the `connect()` method internally. The agent
must implement `connect()` or this functionality must be provided by a framework
like [CopilotKit](https://copilotkit.ai).

## Observable Properties

### events$

An observable stream of all events emitted during agent execution.

```typescript
events$: Observable<BaseEvent>
```

This property provides direct access to the agent's event stream. Events are
stored using a `ReplaySubject`, allowing late subscribers will receive all
historical events.

## Properties

- `agentId`: Unique identifier for the agent instance
- `description`: Human-readable description
- `threadId`: Conversation thread identifier
- `messages`: Array of conversation messages
- `state`: Current agent state object
- `events$`: Observable stream of all `BaseEvent` objects emitted during agent
execution (replayed for late subscribers)

## Protected Methods

Expand All @@ -131,6 +160,17 @@ Executes the agent and returns an observable event stream.
protected abstract run(input: RunAgentInput): RunAgent
```

### connect()

Establishes a persistent connection and returns an observable event stream.

```typescript
protected connect(input: RunAgentInput): RunAgent
```

Override this method to implement persistent connections. Default implementation
throws `ConnectNotImplementedError`.

### apply()

Processes events from the run and updates the agent state.
Expand Down
9 changes: 7 additions & 2 deletions typescript-sdk/integrations/mastra/src/mastra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,18 @@ export class MastraAgent extends AbstractAgent {
resourceId?: string;
runtimeContext?: RuntimeContext;

constructor({ agent, resourceId, runtimeContext, ...rest }: MastraAgentConfig) {
constructor(private config: MastraAgentConfig) {
const { agent, resourceId, runtimeContext, ...rest } = config;
super(rest);
this.agent = agent;
this.resourceId = resourceId;
this.runtimeContext = runtimeContext ?? new RuntimeContext();
}

public clone() {
return new MastraAgent(this.config);
}

protected run(input: RunAgentInput): Observable<BaseEvent> {
let messageId = randomUUID();

Expand Down Expand Up @@ -250,7 +255,7 @@ export class MastraAgent extends AbstractAgent {
);
const resourceId = this.resourceId ?? threadId;
const convertedMessages = convertAGUIMessagesToMastra(messages);
this.runtimeContext?.set('ag-ui', { context: inputContext });
this.runtimeContext?.set("ag-ui", { context: inputContext });
const runtimeContext = this.runtimeContext;

if (this.isLocalMastraAgent(this.agent)) {
Expand Down
7 changes: 6 additions & 1 deletion typescript-sdk/integrations/vercel-ai-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ export class VercelAISDKAgent extends AbstractAgent {
model: LanguageModelV1;
maxSteps: number;
toolChoice: ToolChoice<Record<string, unknown>>;
constructor({ model, maxSteps, toolChoice, ...rest }: VercelAISDKAgentConfig) {
constructor(private config: VercelAISDKAgentConfig) {
const { model, maxSteps, toolChoice, ...rest } = config;
super({ ...rest });
this.model = model;
this.maxSteps = maxSteps ?? 1;
this.toolChoice = toolChoice ?? "auto";
}

public clone() {
return new VercelAISDKAgent(this.config);
}

protected run(input: RunAgentInput): Observable<BaseEvent> {
const finalMessages: Message[] = input.messages;

Expand Down
2 changes: 1 addition & 1 deletion typescript-sdk/packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "create-ag-ui-app",
"author": "Markus Ecker <[email protected]>",
"version": "0.0.39",
"version": "0.0.40-alpha.2",
"private": false,
"publishConfig": {
"access": "public"
Expand Down
2 changes: 1 addition & 1 deletion typescript-sdk/packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ag-ui/client",
"author": "Markus Ecker <[email protected]>",
"version": "0.0.38",
"version": "0.0.40-alpha.2",
"private": false,
"publishConfig": {
"access": "public"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { AbstractAgent } from "../agent";
import { HttpAgent } from "../http";
import { BaseEvent, Message, RunAgentInput } from "@ag-ui/core";
import { EMPTY, Observable } from "rxjs";

class CloneableTestAgent extends AbstractAgent {
constructor() {
super({
agentId: "test-agent",
description: "Cloneable test agent",
threadId: "thread-test",
initialMessages: [
{
id: "msg-1",
role: "user",
content: "Hello world",
toolCalls: [],
} as Message,
],
initialState: { stage: "initial" },
});
}

protected run(_: RunAgentInput): Observable<BaseEvent> {
return EMPTY as Observable<BaseEvent>;
}
}

describe("AbstractAgent cloning", () => {
it("clones subclass instances with independent state", () => {
const agent = new CloneableTestAgent();

const cloned = agent.clone() as CloneableTestAgent;

expect(cloned).toBeInstanceOf(CloneableTestAgent);
expect(cloned).not.toBe(agent);
expect(cloned.agentId).toBe(agent.agentId);
expect(cloned.threadId).toBe(agent.threadId);
expect(cloned.messages).toEqual(agent.messages);
expect(cloned.messages).not.toBe(agent.messages);
expect(cloned.state).toEqual(agent.state);
expect(cloned.state).not.toBe(agent.state);
});
});

describe("HttpAgent cloning", () => {
it("produces a new HttpAgent with cloned configuration and abort controller", () => {
const httpAgent = new HttpAgent({
url: "https://example.com/agent",
headers: { Authorization: "Bearer token" },
threadId: "thread-http",
initialMessages: [
{
id: "msg-http",
role: "assistant",
content: "response",
toolCalls: [],
} as Message,
],
initialState: { status: "ready" },
});

httpAgent.abortController.abort("cancelled");

const cloned = httpAgent.clone() as HttpAgent;

expect(cloned).toBeInstanceOf(HttpAgent);
expect(cloned).not.toBe(httpAgent);
expect(cloned.url).toBe(httpAgent.url);
expect(cloned.headers).toEqual(httpAgent.headers);
expect(cloned.headers).not.toBe(httpAgent.headers);
expect(cloned.messages).toEqual(httpAgent.messages);
expect(cloned.messages).not.toBe(httpAgent.messages);
expect(cloned.state).toEqual(httpAgent.state);
expect(cloned.state).not.toBe(httpAgent.state);
expect(cloned.abortController).not.toBe(httpAgent.abortController);
expect(cloned.abortController).toBeInstanceOf(AbortController);
expect(cloned.abortController.signal.aborted).toBe(true);
expect(cloned.abortController.signal.reason).toBe("cancelled");
});
});
147 changes: 107 additions & 40 deletions typescript-sdk/packages/client/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { v4 as uuidv4 } from "uuid";
import { structuredClone_ } from "@/utils";
import { catchError, map, tap } from "rxjs/operators";
import { finalize } from "rxjs/operators";
import { pipe, Observable, from, of } from "rxjs";
import { pipe, Observable, from, of, EMPTY } from "rxjs";
import { verifyEvents } from "@/verify";
import { convertToLegacyEvents } from "@/legacy/convert";
import { LegacyRuntimeProtocolEvent } from "@/legacy/types";
import { lastValueFrom } from "rxjs";
import { transformChunks } from "@/chunks";
import { AgentStateMutation, AgentSubscriber, runSubscribersWithMutation } from "./subscriber";
import { AGUIConnectNotImplementedError } from "@ag-ui/core";

export interface RunAgentResult {
result: any;
Expand All @@ -27,6 +28,7 @@ export abstract class AbstractAgent {
public state: State;
public debug: boolean = false;
public subscribers: AgentSubscriber[] = [];
public isRunning: boolean = false;

constructor({
agentId,
Expand Down Expand Up @@ -59,43 +61,106 @@ export abstract class AbstractAgent {
parameters?: RunAgentParameters,
subscriber?: AgentSubscriber,
): Promise<RunAgentResult> {
this.agentId = this.agentId ?? uuidv4();
const input = this.prepareRunAgentInput(parameters);
let result: any = undefined;
const currentMessageIds = new Set(this.messages.map((message) => message.id));

const subscribers: AgentSubscriber[] = [
{
onRunFinishedEvent: (params) => {
result = params.result;
try {
this.isRunning = true;
this.agentId = this.agentId ?? uuidv4();
const input = this.prepareRunAgentInput(parameters);
let result: any = undefined;
const currentMessageIds = new Set(this.messages.map((message) => message.id));

const subscribers: AgentSubscriber[] = [
{
onRunFinishedEvent: (params) => {
result = params.result;
},
},
},
...this.subscribers,
subscriber ?? {},
];

await this.onInitialize(input, subscribers);
...this.subscribers,
subscriber ?? {},
];

await this.onInitialize(input, subscribers);

const pipeline = pipe(
() => this.run(input),
transformChunks(this.debug),
verifyEvents(this.debug),
(source$) => this.apply(input, source$, subscribers),
(source$) => this.processApplyEvents(input, source$, subscribers),
catchError((error) => {
this.isRunning = false;
return this.onError(input, error, subscribers);
}),
finalize(() => {
this.isRunning = false;
void this.onFinalize(input, subscribers);
}),
);

const pipeline = pipe(
() => this.run(input),
transformChunks(this.debug),
verifyEvents(this.debug),
(source$) => this.apply(input, source$, subscribers),
(source$) => this.processApplyEvents(input, source$, subscribers),
catchError((error) => {
return this.onError(input, error, subscribers);
}),
finalize(() => {
void this.onFinalize(input, subscribers);
}),
);
return lastValueFrom(pipeline(of(null))).then(() => {
const newMessages = structuredClone_(this.messages).filter(
(message: Message) => !currentMessageIds.has(message.id),
);
return { result, newMessages };
});
} finally {
this.isRunning = false;
}
}

return lastValueFrom(pipeline(of(null))).then(() => {
const newMessages = structuredClone_(this.messages).filter(
(message: Message) => !currentMessageIds.has(message.id),
protected connect(input: RunAgentInput): Observable<BaseEvent> {
throw new AGUIConnectNotImplementedError();
}
public async connectAgent(
parameters?: RunAgentParameters,
subscriber?: AgentSubscriber,
): Promise<RunAgentResult> {
try {
this.isRunning = true;
this.agentId = this.agentId ?? uuidv4();
const input = this.prepareRunAgentInput(parameters);
let result: any = undefined;
const currentMessageIds = new Set(this.messages.map((message) => message.id));

const subscribers: AgentSubscriber[] = [
{
onRunFinishedEvent: (params) => {
result = params.result;
},
},
...this.subscribers,
subscriber ?? {},
];

await this.onInitialize(input, subscribers);

const pipeline = pipe(
() => this.connect(input),
transformChunks(this.debug),
verifyEvents(this.debug),
(source$) => this.apply(input, source$, subscribers),
(source$) => this.processApplyEvents(input, source$, subscribers),
catchError((error) => {
this.isRunning = false;
if (!(error instanceof AGUIConnectNotImplementedError)) {
return this.onError(input, error, subscribers);
}
return EMPTY;
}),
finalize(() => {
this.isRunning = false;
void this.onFinalize(input, subscribers);
}),
);
return { result, newMessages };
});

return lastValueFrom(pipeline(of(null))).then(() => {
const newMessages = structuredClone_(this.messages).filter(
(message: Message) => !currentMessageIds.has(message.id),
);
return { result, newMessages };
});
} finally {
this.isRunning = false;
}
}

public abortRun() {}
Expand Down Expand Up @@ -281,12 +346,14 @@ export abstract class AbstractAgent {
public clone() {
const cloned = Object.create(Object.getPrototypeOf(this));

for (const key of Object.getOwnPropertyNames(this)) {
const value = (this as any)[key];
if (typeof value !== "function") {
cloned[key] = structuredClone_(value);
}
}
cloned.agentId = this.agentId;
cloned.description = this.description;
cloned.threadId = this.threadId;
cloned.messages = structuredClone_(this.messages);
cloned.state = structuredClone_(this.state);
cloned.debug = this.debug;
cloned.isRunning = this.isRunning;
cloned.subscribers = [...this.subscribers];

return cloned;
}
Expand Down
Loading