diff --git a/packages/core/src/__tests__/proxied-runtime-transport.test.ts b/packages/core/src/__tests__/proxied-runtime-transport.test.ts index fd629e5..4b242ed 100644 --- a/packages/core/src/__tests__/proxied-runtime-transport.test.ts +++ b/packages/core/src/__tests__/proxied-runtime-transport.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { ProxiedCopilotRuntimeAgent } from "../agent"; import { CopilotKitCore } from "../core"; +import { createSuggestionsConfig, MockAgent } from "./test-utils"; type TransportMatrixEntry = { label: string; @@ -169,6 +170,104 @@ describe("ProxiedCopilotRuntimeAgent transport integration", () => { }); }); +describe("ProxiedCopilotRuntimeAgent cloning", () => { + const originalFetch = global.fetch; + const runtimeUrl = "https://runtime.example/single"; + + beforeEach(() => { + // @ts-expect-error - Node typings allow reassigning fetch in tests + global.fetch = vi.fn(() => Promise.resolve(createSseResponse())); + }); + + afterEach(() => { + vi.restoreAllMocks(); + global.fetch = originalFetch; + }); + + it("preserves single-endpoint envelope on cloned agents", async () => { + const agentId = "clone-agent"; + const agent = new ProxiedCopilotRuntimeAgent({ + runtimeUrl, + agentId, + transport: "single", + }); + + const cloned = agent.clone(); + await expect(cloned.runAgent({})).resolves.toMatchObject({ + newMessages: expect.any(Array), + }); + + const fetchMock = global.fetch as ReturnType; + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe(runtimeUrl); + const body = JSON.parse(init.body as string); + expect(body).toMatchObject({ + method: "agent/run", + params: { + agentId, + }, + }); + }); +}); + +describe("Suggestions engine with single-endpoint runtime agents", () => { + const originalFetch = global.fetch; + const runtimeUrl = "https://runtime.example/single"; + + beforeEach(() => { + // @ts-expect-error - Node typings allow reassigning fetch in tests + global.fetch = vi.fn(() => Promise.resolve(createSseResponse())); + }); + + afterEach(() => { + vi.restoreAllMocks(); + global.fetch = originalFetch; + }); + + it("envelopes suggestion runs with method and params when cloning runtime agents", async () => { + const providerAgent = new ProxiedCopilotRuntimeAgent({ + runtimeUrl, + agentId: "provider", + transport: "single", + }); + const consumerAgent = new MockAgent({ agentId: "consumer" }) as unknown as any; + + const core = new CopilotKitCore({ + runtimeUrl, + runtimeTransport: "single", + agents__unsafe_dev_only: { + provider: providerAgent, + consumer: consumerAgent, + }, + }); + + core.addSuggestionsConfig( + createSuggestionsConfig({ + providerAgentId: "provider", + consumerAgentId: "consumer", + }), + ); + + core.reloadSuggestions("consumer"); + + const fetchMock = global.fetch as ReturnType; + await vi.waitFor(() => { + expect(fetchMock).toHaveBeenCalled(); + }); + + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe(runtimeUrl); + const body = JSON.parse(init.body as string); + expect(body).toMatchObject({ + method: "agent/run", + params: { + agentId: expect.any(String), + }, + }); + }); +}); + function createSseResponse(): Response { const stream = new ReadableStream({ start(controller) { diff --git a/packages/core/src/agent.ts b/packages/core/src/agent.ts index 30d493f..b5fb3eb 100644 --- a/packages/core/src/agent.ts +++ b/packages/core/src/agent.ts @@ -126,6 +126,14 @@ export class ProxiedCopilotRuntimeAgent extends HttpAgent { return super.run(input); } + public override clone(): ProxiedCopilotRuntimeAgent { + const cloned = super.clone() as ProxiedCopilotRuntimeAgent; + cloned.runtimeUrl = this.runtimeUrl; + cloned.transport = this.transport; + cloned.singleEndpointUrl = this.singleEndpointUrl; + return cloned; + } + private createSingleRouteRequestInit(input: RunAgentInput, method: string, params?: Record): RequestInit { if (!this.agentId) { throw new Error("ProxiedCopilotRuntimeAgent requires agentId to make runtime requests"); diff --git a/packages/runtime/src/__tests__/express-body-order.test.ts b/packages/runtime/src/__tests__/express-body-order.test.ts new file mode 100644 index 0000000..0c952bd --- /dev/null +++ b/packages/runtime/src/__tests__/express-body-order.test.ts @@ -0,0 +1,70 @@ +import express from "express"; +import request from "supertest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { AbstractAgent } from "@ag-ui/client"; + +import { createCopilotEndpointExpress } from "../express"; +import { CopilotRuntime } from "../runtime"; + +const handleRunAgentMock = vi.fn(); + +vi.mock("../handlers/handle-run", () => ({ + handleRunAgent: (...args: unknown[]) => handleRunAgentMock(...args), +})); + +const createRuntime = () => + new CopilotRuntime({ + agents: { + agent: { + clone: () => ({ + execute: async () => ({ events: [] }), + }), + } as unknown as AbstractAgent, + }, + }); + +describe("createCopilotEndpointExpress with body parsers", () => { + beforeEach(() => { + handleRunAgentMock.mockReset(); + handleRunAgentMock.mockImplementation(async ({ request }: { request: Request }) => { + const body = await request.json(); + return new Response(JSON.stringify({ body }), { + headers: { "content-type": "application/json" }, + }); + }); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + const sendRunRequest = (app: express.Express) => + request(app) + .post("/agent/agent/run") + .set("Content-Type", "application/json") + .send({ hello: "world" }); + + it("handles requests when CopilotKit router is registered before express.json()", async () => { + const app = express(); + app.use(createCopilotEndpointExpress({ runtime: createRuntime(), basePath: "/" })); + app.use(express.json()); + + const response = await sendRunRequest(app); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ body: { hello: "world" } }); + expect(handleRunAgentMock).toHaveBeenCalledTimes(1); + }); + + it("handles requests when express.json() runs before the CopilotKit router", async () => { + const app = express(); + app.use(express.json()); + app.use(createCopilotEndpointExpress({ runtime: createRuntime(), basePath: "/" })); + + const response = await sendRunRequest(app); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ body: { hello: "world" } }); + expect(handleRunAgentMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/runtime/src/endpoints/express-utils.ts b/packages/runtime/src/endpoints/express-utils.ts index 38f7519..e4a8ae1 100644 --- a/packages/runtime/src/endpoints/express-utils.ts +++ b/packages/runtime/src/endpoints/express-utils.ts @@ -2,6 +2,7 @@ import type { Request as ExpressRequest, Response as ExpressResponse } from "exp import { Readable } from "node:stream"; import { pipeline } from "node:stream"; import { promisify } from "node:util"; +import { logger } from "@copilotkitnext/shared"; const streamPipeline = promisify(pipeline); @@ -27,16 +28,77 @@ export function createFetchRequestFromExpress(req: ExpressRequest): Request { headers, }; + const hasParsedBody = req.body !== undefined && req.body !== null; + const streamConsumed = isStreamConsumed(req, hasParsedBody); + if (!METHODS_WITHOUT_BODY.has(method)) { - init.body = Readable.toWeb(req) as unknown as BodyInit; - init.duplex = "half"; + const canStreamBody = req.readable !== false && !streamConsumed; + + if (canStreamBody) { + init.body = Readable.toWeb(req) as unknown as BodyInit; + init.duplex = "half"; + } else if (hasParsedBody) { + const { body, contentType } = synthesizeBody(req.body); + if (contentType) { + headers.set("content-type", contentType); + } + headers.delete("content-length"); + if (body !== undefined) { + init.body = body; + } + logger.info( + { + url, + method, + readable: req.readable, + readableEnded: req.readableEnded, + complete: req.complete, + }, + "Express request stream already consumed; synthesized body from parsed content", + ); + } else { + headers.delete("content-length"); + logger.warn( + { url, method }, + "Request stream already consumed but no body was available; sending empty body", + ); + } } const controller = new AbortController(); req.on("close", () => controller.abort()); init.signal = controller.signal; - return new Request(url, init); + try { + return new Request(url, init); + } catch (error) { + if (error instanceof TypeError && /disturbed|locked/i.test(error.message)) { + // Fallback to synthesized/empty body when the stream was already consumed. + headers.delete("content-length"); + delete init.duplex; + + if (hasParsedBody) { + const { body, contentType } = synthesizeBody(req.body); + if (contentType) { + headers.set("content-type", contentType); + } + init.body = body; + logger.info( + { url, method }, + "Request stream disturbed while constructing Request; reused parsed body", + ); + } else { + init.body = undefined; + logger.warn( + { url, method }, + "Request stream was disturbed; falling back to empty body", + ); + } + + return new Request(url, init); + } + throw error; + } } export async function sendFetchResponse(res: ExpressResponse, response: Response): Promise { @@ -68,3 +130,31 @@ function buildOrigin(req: ExpressRequest): string { const host = req.get("host") ?? "localhost"; return `${protocol}://${host}`; } + +function isStreamConsumed(req: ExpressRequest, hasParsedBody: boolean): boolean { + const state = (req as unknown as { _readableState?: { ended?: boolean; endEmitted?: boolean } }) + ._readableState; + return Boolean( + hasParsedBody || + req.readableEnded || + req.complete || + state?.ended || + state?.endEmitted, + ); +} + +function synthesizeBody(body: unknown): { body?: BodyInit; contentType?: string } { + if (Buffer.isBuffer(body) || body instanceof Uint8Array) { + return { body }; + } + + if (typeof body === "string") { + return { body }; + } + + if (typeof body === "object" && body !== undefined) { + return { body: JSON.stringify(body), contentType: "application/json" }; + } + + return {}; +}