Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions packages/core/src/__tests__/proxied-runtime-transport.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { ProxiedCopilotRuntimeAgent } from "../agent";
import { CopilotKitCore } from "../core";
import { createSuggestionsConfig, MockAgent } from "./test-utils";

type TransportMatrixEntry = {
label: string;
Expand Down Expand Up @@ -169,6 +170,104 @@ describe("ProxiedCopilotRuntimeAgent transport integration", () => {
});
});

describe("ProxiedCopilotRuntimeAgent cloning", () => {
const originalFetch = global.fetch;
const runtimeUrl = "https://runtime.example/single";

beforeEach(() => {
// @ts-expect-error - Node typings allow reassigning fetch in tests
global.fetch = vi.fn(() => Promise.resolve(createSseResponse()));
});

afterEach(() => {
vi.restoreAllMocks();
global.fetch = originalFetch;
});

it("preserves single-endpoint envelope on cloned agents", async () => {
const agentId = "clone-agent";
const agent = new ProxiedCopilotRuntimeAgent({
runtimeUrl,
agentId,
transport: "single",
});

const cloned = agent.clone();
await expect(cloned.runAgent({})).resolves.toMatchObject({
newMessages: expect.any(Array),
});

const fetchMock = global.fetch as ReturnType<typeof vi.fn>;
expect(fetchMock).toHaveBeenCalledTimes(1);
const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit];
expect(url).toBe(runtimeUrl);
const body = JSON.parse(init.body as string);
expect(body).toMatchObject({
method: "agent/run",
params: {
agentId,
},
});
});
});

describe("Suggestions engine with single-endpoint runtime agents", () => {
const originalFetch = global.fetch;
const runtimeUrl = "https://runtime.example/single";

beforeEach(() => {
// @ts-expect-error - Node typings allow reassigning fetch in tests
global.fetch = vi.fn(() => Promise.resolve(createSseResponse()));
});

afterEach(() => {
vi.restoreAllMocks();
global.fetch = originalFetch;
});

it("envelopes suggestion runs with method and params when cloning runtime agents", async () => {
const providerAgent = new ProxiedCopilotRuntimeAgent({
runtimeUrl,
agentId: "provider",
transport: "single",
});
const consumerAgent = new MockAgent({ agentId: "consumer" }) as unknown as any;

const core = new CopilotKitCore({
runtimeUrl,
runtimeTransport: "single",
agents__unsafe_dev_only: {
provider: providerAgent,
consumer: consumerAgent,
},
});

core.addSuggestionsConfig(
createSuggestionsConfig({
providerAgentId: "provider",
consumerAgentId: "consumer",
}),
);

core.reloadSuggestions("consumer");

const fetchMock = global.fetch as ReturnType<typeof vi.fn>;
await vi.waitFor(() => {
expect(fetchMock).toHaveBeenCalled();
});

const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit];
expect(url).toBe(runtimeUrl);
const body = JSON.parse(init.body as string);
expect(body).toMatchObject({
method: "agent/run",
params: {
agentId: expect.any(String),
},
});
});
});

function createSseResponse(): Response {
const stream = new ReadableStream({
start(controller) {
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ export class ProxiedCopilotRuntimeAgent extends HttpAgent {
return super.run(input);
}

public override clone(): ProxiedCopilotRuntimeAgent {
const cloned = super.clone() as ProxiedCopilotRuntimeAgent;
cloned.runtimeUrl = this.runtimeUrl;
cloned.transport = this.transport;
cloned.singleEndpointUrl = this.singleEndpointUrl;
return cloned;
}

private createSingleRouteRequestInit(input: RunAgentInput, method: string, params?: Record<string, string>): RequestInit {
if (!this.agentId) {
throw new Error("ProxiedCopilotRuntimeAgent requires agentId to make runtime requests");
Expand Down
70 changes: 70 additions & 0 deletions packages/runtime/src/__tests__/express-body-order.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import express from "express";
import request from "supertest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { AbstractAgent } from "@ag-ui/client";

import { createCopilotEndpointExpress } from "../express";
import { CopilotRuntime } from "../runtime";

const handleRunAgentMock = vi.fn();

vi.mock("../handlers/handle-run", () => ({
handleRunAgent: (...args: unknown[]) => handleRunAgentMock(...args),
}));

const createRuntime = () =>
new CopilotRuntime({
agents: {
agent: {
clone: () => ({
execute: async () => ({ events: [] }),
}),
} as unknown as AbstractAgent,
},
});

describe("createCopilotEndpointExpress with body parsers", () => {
beforeEach(() => {
handleRunAgentMock.mockReset();
handleRunAgentMock.mockImplementation(async ({ request }: { request: Request }) => {
const body = await request.json();
return new Response(JSON.stringify({ body }), {
headers: { "content-type": "application/json" },
});
});
});

afterEach(() => {
vi.clearAllMocks();
});

const sendRunRequest = (app: express.Express) =>
request(app)
.post("/agent/agent/run")
.set("Content-Type", "application/json")
.send({ hello: "world" });

it("handles requests when CopilotKit router is registered before express.json()", async () => {
const app = express();
app.use(createCopilotEndpointExpress({ runtime: createRuntime(), basePath: "/" }));
app.use(express.json());

const response = await sendRunRequest(app);

expect(response.status).toBe(200);
expect(response.body).toEqual({ body: { hello: "world" } });
expect(handleRunAgentMock).toHaveBeenCalledTimes(1);
});

it("handles requests when express.json() runs before the CopilotKit router", async () => {
const app = express();
app.use(express.json());
app.use(createCopilotEndpointExpress({ runtime: createRuntime(), basePath: "/" }));

const response = await sendRunRequest(app);

expect(response.status).toBe(200);
expect(response.body).toEqual({ body: { hello: "world" } });
expect(handleRunAgentMock).toHaveBeenCalledTimes(1);
});
});
96 changes: 93 additions & 3 deletions packages/runtime/src/endpoints/express-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Request as ExpressRequest, Response as ExpressResponse } from "exp
import { Readable } from "node:stream";
import { pipeline } from "node:stream";
import { promisify } from "node:util";
import { logger } from "@copilotkitnext/shared";

const streamPipeline = promisify(pipeline);

Expand All @@ -27,16 +28,77 @@ export function createFetchRequestFromExpress(req: ExpressRequest): Request {
headers,
};

const hasParsedBody = req.body !== undefined && req.body !== null;
const streamConsumed = isStreamConsumed(req, hasParsedBody);

if (!METHODS_WITHOUT_BODY.has(method)) {
init.body = Readable.toWeb(req) as unknown as BodyInit;
init.duplex = "half";
const canStreamBody = req.readable !== false && !streamConsumed;

if (canStreamBody) {
init.body = Readable.toWeb(req) as unknown as BodyInit;
init.duplex = "half";
} else if (hasParsedBody) {
const { body, contentType } = synthesizeBody(req.body);
if (contentType) {
headers.set("content-type", contentType);
}
headers.delete("content-length");
if (body !== undefined) {
init.body = body;
}
logger.info(
{
url,
method,
readable: req.readable,
readableEnded: req.readableEnded,
complete: req.complete,
},
"Express request stream already consumed; synthesized body from parsed content",
);
} else {
headers.delete("content-length");
logger.warn(
{ url, method },
"Request stream already consumed but no body was available; sending empty body",
);
}
}

const controller = new AbortController();
req.on("close", () => controller.abort());
init.signal = controller.signal;

return new Request(url, init);
try {
return new Request(url, init);
} catch (error) {
if (error instanceof TypeError && /disturbed|locked/i.test(error.message)) {
// Fallback to synthesized/empty body when the stream was already consumed.
headers.delete("content-length");
delete init.duplex;

if (hasParsedBody) {
const { body, contentType } = synthesizeBody(req.body);
if (contentType) {
headers.set("content-type", contentType);
}
init.body = body;
logger.info(
{ url, method },
"Request stream disturbed while constructing Request; reused parsed body",
);
} else {
init.body = undefined;
logger.warn(
{ url, method },
"Request stream was disturbed; falling back to empty body",
);
}

return new Request(url, init);
}
throw error;
}
}

export async function sendFetchResponse(res: ExpressResponse, response: Response): Promise<void> {
Expand Down Expand Up @@ -68,3 +130,31 @@ function buildOrigin(req: ExpressRequest): string {
const host = req.get("host") ?? "localhost";
return `${protocol}://${host}`;
}

function isStreamConsumed(req: ExpressRequest, hasParsedBody: boolean): boolean {
const state = (req as unknown as { _readableState?: { ended?: boolean; endEmitted?: boolean } })
._readableState;
return Boolean(
hasParsedBody ||
req.readableEnded ||
req.complete ||
state?.ended ||
state?.endEmitted,
);
}

function synthesizeBody(body: unknown): { body?: BodyInit; contentType?: string } {
if (Buffer.isBuffer(body) || body instanceof Uint8Array) {
return { body };
}

if (typeof body === "string") {
return { body };
}

if (typeof body === "object" && body !== undefined) {
return { body: JSON.stringify(body), contentType: "application/json" };
}

return {};
}