diff --git a/docs/sdk/js/client/abstract-agent.mdx b/docs/sdk/js/client/abstract-agent.mdx index 3738b0aae..cf322d214 100644 --- a/docs/sdk/js/client/abstract-agent.mdx +++ b/docs/sdk/js/client/abstract-agent.mdx @@ -111,6 +111,33 @@ Creates a deep copy of the agent instance. clone(): AbstractAgent ``` +### connectAgent() + +Establishes a persistent connection with an agent that implements the +`connect()` method. + +```typescript +connectAgent(parameters?: RunAgentParameters, subscriber?: AgentSubscriber): Promise +``` + +Similar to `runAgent()` but uses the `connect()` method internally. The agent +must implement `connect()` or this functionality must be provided by a framework +like [CopilotKit](https://copilotkit.ai). + +## Observable Properties + +### events$ + +An observable stream of all events emitted during agent execution. + +```typescript +events$: Observable +``` + +This property provides direct access to the agent's event stream. Events are +stored using a `ReplaySubject`, allowing late subscribers will receive all +historical events. + ## Properties - `agentId`: Unique identifier for the agent instance @@ -118,6 +145,8 @@ clone(): AbstractAgent - `threadId`: Conversation thread identifier - `messages`: Array of conversation messages - `state`: Current agent state object +- `events$`: Observable stream of all `BaseEvent` objects emitted during agent + execution (replayed for late subscribers) ## Protected Methods @@ -131,6 +160,17 @@ Executes the agent and returns an observable event stream. protected abstract run(input: RunAgentInput): RunAgent ``` +### connect() + +Establishes a persistent connection and returns an observable event stream. + +```typescript +protected connect(input: RunAgentInput): RunAgent +``` + +Override this method to implement persistent connections. Default implementation +throws `ConnectNotImplementedError`. + ### apply() Processes events from the run and updates the agent state. diff --git a/typescript-sdk/integrations/mastra/src/mastra.ts b/typescript-sdk/integrations/mastra/src/mastra.ts index f8310767b..9de9e70dc 100644 --- a/typescript-sdk/integrations/mastra/src/mastra.ts +++ b/typescript-sdk/integrations/mastra/src/mastra.ts @@ -52,13 +52,18 @@ export class MastraAgent extends AbstractAgent { resourceId?: string; runtimeContext?: RuntimeContext; - constructor({ agent, resourceId, runtimeContext, ...rest }: MastraAgentConfig) { + constructor(private config: MastraAgentConfig) { + const { agent, resourceId, runtimeContext, ...rest } = config; super(rest); this.agent = agent; this.resourceId = resourceId; this.runtimeContext = runtimeContext ?? new RuntimeContext(); } + public clone() { + return new MastraAgent(this.config); + } + protected run(input: RunAgentInput): Observable { let messageId = randomUUID(); @@ -250,7 +255,7 @@ export class MastraAgent extends AbstractAgent { ); const resourceId = this.resourceId ?? threadId; const convertedMessages = convertAGUIMessagesToMastra(messages); - this.runtimeContext?.set('ag-ui', { context: inputContext }); + this.runtimeContext?.set("ag-ui", { context: inputContext }); const runtimeContext = this.runtimeContext; if (this.isLocalMastraAgent(this.agent)) { diff --git a/typescript-sdk/integrations/vercel-ai-sdk/src/index.ts b/typescript-sdk/integrations/vercel-ai-sdk/src/index.ts index b06664f34..602e45e14 100644 --- a/typescript-sdk/integrations/vercel-ai-sdk/src/index.ts +++ b/typescript-sdk/integrations/vercel-ai-sdk/src/index.ts @@ -48,13 +48,18 @@ export class VercelAISDKAgent extends AbstractAgent { model: LanguageModelV1; maxSteps: number; toolChoice: ToolChoice>; - constructor({ model, maxSteps, toolChoice, ...rest }: VercelAISDKAgentConfig) { + constructor(private config: VercelAISDKAgentConfig) { + const { model, maxSteps, toolChoice, ...rest } = config; super({ ...rest }); this.model = model; this.maxSteps = maxSteps ?? 1; this.toolChoice = toolChoice ?? "auto"; } + public clone() { + return new VercelAISDKAgent(this.config); + } + protected run(input: RunAgentInput): Observable { const finalMessages: Message[] = input.messages; diff --git a/typescript-sdk/packages/cli/package.json b/typescript-sdk/packages/cli/package.json index b8e37cb32..629b098c4 100644 --- a/typescript-sdk/packages/cli/package.json +++ b/typescript-sdk/packages/cli/package.json @@ -1,7 +1,7 @@ { "name": "create-ag-ui-app", "author": "Markus Ecker ", - "version": "0.0.39", + "version": "0.0.40-alpha.2", "private": false, "publishConfig": { "access": "public" diff --git a/typescript-sdk/packages/client/package.json b/typescript-sdk/packages/client/package.json index a5aef3881..a80cf385f 100644 --- a/typescript-sdk/packages/client/package.json +++ b/typescript-sdk/packages/client/package.json @@ -1,7 +1,7 @@ { "name": "@ag-ui/client", "author": "Markus Ecker ", - "version": "0.0.38", + "version": "0.0.40-alpha.2", "private": false, "publishConfig": { "access": "public" diff --git a/typescript-sdk/packages/client/src/agent/__tests__/agent-clone.test.ts b/typescript-sdk/packages/client/src/agent/__tests__/agent-clone.test.ts new file mode 100644 index 000000000..3fb00e8a0 --- /dev/null +++ b/typescript-sdk/packages/client/src/agent/__tests__/agent-clone.test.ts @@ -0,0 +1,81 @@ +import { AbstractAgent } from "../agent"; +import { HttpAgent } from "../http"; +import { BaseEvent, Message, RunAgentInput } from "@ag-ui/core"; +import { EMPTY, Observable } from "rxjs"; + +class CloneableTestAgent extends AbstractAgent { + constructor() { + super({ + agentId: "test-agent", + description: "Cloneable test agent", + threadId: "thread-test", + initialMessages: [ + { + id: "msg-1", + role: "user", + content: "Hello world", + toolCalls: [], + } as Message, + ], + initialState: { stage: "initial" }, + }); + } + + protected run(_: RunAgentInput): Observable { + return EMPTY as Observable; + } +} + +describe("AbstractAgent cloning", () => { + it("clones subclass instances with independent state", () => { + const agent = new CloneableTestAgent(); + + const cloned = agent.clone() as CloneableTestAgent; + + expect(cloned).toBeInstanceOf(CloneableTestAgent); + expect(cloned).not.toBe(agent); + expect(cloned.agentId).toBe(agent.agentId); + expect(cloned.threadId).toBe(agent.threadId); + expect(cloned.messages).toEqual(agent.messages); + expect(cloned.messages).not.toBe(agent.messages); + expect(cloned.state).toEqual(agent.state); + expect(cloned.state).not.toBe(agent.state); + }); +}); + +describe("HttpAgent cloning", () => { + it("produces a new HttpAgent with cloned configuration and abort controller", () => { + const httpAgent = new HttpAgent({ + url: "https://example.com/agent", + headers: { Authorization: "Bearer token" }, + threadId: "thread-http", + initialMessages: [ + { + id: "msg-http", + role: "assistant", + content: "response", + toolCalls: [], + } as Message, + ], + initialState: { status: "ready" }, + }); + + httpAgent.abortController.abort("cancelled"); + + const cloned = httpAgent.clone() as HttpAgent; + + expect(cloned).toBeInstanceOf(HttpAgent); + expect(cloned).not.toBe(httpAgent); + expect(cloned.url).toBe(httpAgent.url); + expect(cloned.headers).toEqual(httpAgent.headers); + expect(cloned.headers).not.toBe(httpAgent.headers); + expect(cloned.messages).toEqual(httpAgent.messages); + expect(cloned.messages).not.toBe(httpAgent.messages); + expect(cloned.state).toEqual(httpAgent.state); + expect(cloned.state).not.toBe(httpAgent.state); + expect(cloned.abortController).not.toBe(httpAgent.abortController); + expect(cloned.abortController).toBeInstanceOf(AbortController); + expect(cloned.abortController.signal.aborted).toBe(true); + expect(cloned.abortController.signal.reason).toBe("cancelled"); + }); +}); diff --git a/typescript-sdk/packages/client/src/agent/agent.ts b/typescript-sdk/packages/client/src/agent/agent.ts index f596a625c..081cd1934 100644 --- a/typescript-sdk/packages/client/src/agent/agent.ts +++ b/typescript-sdk/packages/client/src/agent/agent.ts @@ -6,13 +6,14 @@ import { v4 as uuidv4 } from "uuid"; import { structuredClone_ } from "@/utils"; import { catchError, map, tap } from "rxjs/operators"; import { finalize } from "rxjs/operators"; -import { pipe, Observable, from, of } from "rxjs"; +import { pipe, Observable, from, of, EMPTY } from "rxjs"; import { verifyEvents } from "@/verify"; import { convertToLegacyEvents } from "@/legacy/convert"; import { LegacyRuntimeProtocolEvent } from "@/legacy/types"; import { lastValueFrom } from "rxjs"; import { transformChunks } from "@/chunks"; import { AgentStateMutation, AgentSubscriber, runSubscribersWithMutation } from "./subscriber"; +import { AGUIConnectNotImplementedError } from "@ag-ui/core"; export interface RunAgentResult { result: any; @@ -27,6 +28,7 @@ export abstract class AbstractAgent { public state: State; public debug: boolean = false; public subscribers: AgentSubscriber[] = []; + public isRunning: boolean = false; constructor({ agentId, @@ -59,43 +61,106 @@ export abstract class AbstractAgent { parameters?: RunAgentParameters, subscriber?: AgentSubscriber, ): Promise { - this.agentId = this.agentId ?? uuidv4(); - const input = this.prepareRunAgentInput(parameters); - let result: any = undefined; - const currentMessageIds = new Set(this.messages.map((message) => message.id)); - - const subscribers: AgentSubscriber[] = [ - { - onRunFinishedEvent: (params) => { - result = params.result; + try { + this.isRunning = true; + this.agentId = this.agentId ?? uuidv4(); + const input = this.prepareRunAgentInput(parameters); + let result: any = undefined; + const currentMessageIds = new Set(this.messages.map((message) => message.id)); + + const subscribers: AgentSubscriber[] = [ + { + onRunFinishedEvent: (params) => { + result = params.result; + }, }, - }, - ...this.subscribers, - subscriber ?? {}, - ]; - - await this.onInitialize(input, subscribers); + ...this.subscribers, + subscriber ?? {}, + ]; + + await this.onInitialize(input, subscribers); + + const pipeline = pipe( + () => this.run(input), + transformChunks(this.debug), + verifyEvents(this.debug), + (source$) => this.apply(input, source$, subscribers), + (source$) => this.processApplyEvents(input, source$, subscribers), + catchError((error) => { + this.isRunning = false; + return this.onError(input, error, subscribers); + }), + finalize(() => { + this.isRunning = false; + void this.onFinalize(input, subscribers); + }), + ); - const pipeline = pipe( - () => this.run(input), - transformChunks(this.debug), - verifyEvents(this.debug), - (source$) => this.apply(input, source$, subscribers), - (source$) => this.processApplyEvents(input, source$, subscribers), - catchError((error) => { - return this.onError(input, error, subscribers); - }), - finalize(() => { - void this.onFinalize(input, subscribers); - }), - ); + return lastValueFrom(pipeline(of(null))).then(() => { + const newMessages = structuredClone_(this.messages).filter( + (message: Message) => !currentMessageIds.has(message.id), + ); + return { result, newMessages }; + }); + } finally { + this.isRunning = false; + } + } - return lastValueFrom(pipeline(of(null))).then(() => { - const newMessages = structuredClone_(this.messages).filter( - (message: Message) => !currentMessageIds.has(message.id), + protected connect(input: RunAgentInput): Observable { + throw new AGUIConnectNotImplementedError(); + } + public async connectAgent( + parameters?: RunAgentParameters, + subscriber?: AgentSubscriber, + ): Promise { + try { + this.isRunning = true; + this.agentId = this.agentId ?? uuidv4(); + const input = this.prepareRunAgentInput(parameters); + let result: any = undefined; + const currentMessageIds = new Set(this.messages.map((message) => message.id)); + + const subscribers: AgentSubscriber[] = [ + { + onRunFinishedEvent: (params) => { + result = params.result; + }, + }, + ...this.subscribers, + subscriber ?? {}, + ]; + + await this.onInitialize(input, subscribers); + + const pipeline = pipe( + () => this.connect(input), + transformChunks(this.debug), + verifyEvents(this.debug), + (source$) => this.apply(input, source$, subscribers), + (source$) => this.processApplyEvents(input, source$, subscribers), + catchError((error) => { + this.isRunning = false; + if (!(error instanceof AGUIConnectNotImplementedError)) { + return this.onError(input, error, subscribers); + } + return EMPTY; + }), + finalize(() => { + this.isRunning = false; + void this.onFinalize(input, subscribers); + }), ); - return { result, newMessages }; - }); + + return lastValueFrom(pipeline(of(null))).then(() => { + const newMessages = structuredClone_(this.messages).filter( + (message: Message) => !currentMessageIds.has(message.id), + ); + return { result, newMessages }; + }); + } finally { + this.isRunning = false; + } } public abortRun() {} @@ -281,12 +346,14 @@ export abstract class AbstractAgent { public clone() { const cloned = Object.create(Object.getPrototypeOf(this)); - for (const key of Object.getOwnPropertyNames(this)) { - const value = (this as any)[key]; - if (typeof value !== "function") { - cloned[key] = structuredClone_(value); - } - } + cloned.agentId = this.agentId; + cloned.description = this.description; + cloned.threadId = this.threadId; + cloned.messages = structuredClone_(this.messages); + cloned.state = structuredClone_(this.state); + cloned.debug = this.debug; + cloned.isRunning = this.isRunning; + cloned.subscribers = [...this.subscribers]; return cloned; } diff --git a/typescript-sdk/packages/client/src/agent/http.ts b/typescript-sdk/packages/client/src/agent/http.ts index 49fae2173..f9d9c3002 100644 --- a/typescript-sdk/packages/client/src/agent/http.ts +++ b/typescript-sdk/packages/client/src/agent/http.ts @@ -58,4 +58,19 @@ export class HttpAgent extends AbstractAgent { const httpEvents = runHttpRequest(this.url, this.requestInit(input)); return transformHttpEventStream(httpEvents); } + + public clone(): HttpAgent { + const cloned = super.clone() as HttpAgent; + cloned.url = this.url; + cloned.headers = structuredClone_(this.headers ?? {}); + + const newController = new AbortController(); + const originalSignal = this.abortController.signal as AbortSignal & { reason?: unknown }; + if (originalSignal.aborted) { + newController.abort(originalSignal.reason); + } + cloned.abortController = newController; + + return cloned; + } } diff --git a/typescript-sdk/packages/client/src/agent/index.ts b/typescript-sdk/packages/client/src/agent/index.ts index 945724cd0..046bfa90b 100644 --- a/typescript-sdk/packages/client/src/agent/index.ts +++ b/typescript-sdk/packages/client/src/agent/index.ts @@ -2,3 +2,4 @@ export { AbstractAgent } from "./agent"; export type { RunAgentResult } from "./agent"; export { HttpAgent } from "./http"; export type { AgentConfig, HttpAgentConfig, RunAgentParameters } from "./types"; +export type { AgentSubscriber, AgentStateMutation, AgentSubscriberParams } from "./subscriber"; diff --git a/typescript-sdk/packages/core/package.json b/typescript-sdk/packages/core/package.json index e6eebbddb..9662886fc 100644 --- a/typescript-sdk/packages/core/package.json +++ b/typescript-sdk/packages/core/package.json @@ -1,7 +1,7 @@ { "name": "@ag-ui/core", "author": "Markus Ecker ", - "version": "0.0.38", + "version": "0.0.40-alpha.2", "private": false, "publishConfig": { "access": "public" diff --git a/typescript-sdk/packages/core/src/types.ts b/typescript-sdk/packages/core/src/types.ts index 1abb31a0b..0b5b17224 100644 --- a/typescript-sdk/packages/core/src/types.ts +++ b/typescript-sdk/packages/core/src/types.ts @@ -105,3 +105,9 @@ export class AGUIError extends Error { super(message); } } + +export class AGUIConnectNotImplementedError extends AGUIError { + constructor() { + super("Connect not implemented. This method is not supported by the current agent."); + } +} diff --git a/typescript-sdk/packages/encoder/package.json b/typescript-sdk/packages/encoder/package.json index c1694462f..d5b9e20d0 100644 --- a/typescript-sdk/packages/encoder/package.json +++ b/typescript-sdk/packages/encoder/package.json @@ -1,7 +1,7 @@ { "name": "@ag-ui/encoder", "author": "Markus Ecker ", - "version": "0.0.38", + "version": "0.0.40-alpha.2", "private": false, "publishConfig": { "access": "public" diff --git a/typescript-sdk/packages/proto/package.json b/typescript-sdk/packages/proto/package.json index 87f5db877..8e8afbef0 100644 --- a/typescript-sdk/packages/proto/package.json +++ b/typescript-sdk/packages/proto/package.json @@ -1,7 +1,7 @@ { "name": "@ag-ui/proto", "author": "Markus Ecker ", - "version": "0.0.38", + "version": "0.0.40-alpha.2", "private": false, "publishConfig": { "access": "public"