diff --git a/examples/resumable-stream-chat/.dev.vars.example b/examples/resumable-stream-chat/.dev.vars.example new file mode 100644 index 00000000..3cac181d --- /dev/null +++ b/examples/resumable-stream-chat/.dev.vars.example @@ -0,0 +1 @@ +OPENAI_API_KEY=your_openai_api_key_here diff --git a/examples/resumable-stream-chat/README.md b/examples/resumable-stream-chat/README.md new file mode 100644 index 00000000..44b4fd28 --- /dev/null +++ b/examples/resumable-stream-chat/README.md @@ -0,0 +1,22 @@ +# Resumable Streams Chat Example + +## Setup + +1. Copy `.dev.vars.example` to `.dev.vars` and add your OpenAI API key: + + ```bash + cp .dev.vars.example .dev.vars + # Edit .dev.vars and add your OPENAI_API_KEY + ``` + +2. Install dependencies: + + ```bash + npm install + ``` + +3. Start the development server: + + ```bash + npm run start + ``` diff --git a/examples/resumable-stream-chat/index.html b/examples/resumable-stream-chat/index.html new file mode 100644 index 00000000..95e707f4 --- /dev/null +++ b/examples/resumable-stream-chat/index.html @@ -0,0 +1,10 @@ + + + + Resumable Chat + + +
+ + + diff --git a/examples/resumable-stream-chat/package.json b/examples/resumable-stream-chat/package.json new file mode 100644 index 00000000..1eba89ca --- /dev/null +++ b/examples/resumable-stream-chat/package.json @@ -0,0 +1,13 @@ +{ + "author": "", + "description": "Resumable Stream Chat Example", + "keywords": [], + "license": "ISC", + "private": true, + "scripts": { + "deploy": "vite build && wrangler deploy", + "start": "vite dev" + }, + "type": "module", + "version": "0.0.0" +} diff --git a/examples/resumable-stream-chat/src/app.tsx b/examples/resumable-stream-chat/src/app.tsx new file mode 100644 index 00000000..6e469551 --- /dev/null +++ b/examples/resumable-stream-chat/src/app.tsx @@ -0,0 +1,111 @@ +import type { UIMessage as Message } from "ai"; +import "./styles.css"; +import { useAgentChatHttp } from "agents/use-agent-chat-http"; +import { useCallback, useEffect, useRef, useState } from "react"; + +export default function Chat() { + const [theme, setTheme] = useState<"dark" | "light">("dark"); + const [input, setInput] = useState(""); + const messagesEndRef = useRef(null); + + const scrollToBottom = useCallback(() => { + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }, []); + + useEffect(() => { + // Set initial theme + document.documentElement.setAttribute("data-theme", theme); + }, [theme]); + + // Scroll to bottom on mount + useEffect(() => { + scrollToBottom(); + }, [scrollToBottom]); + + const toggleTheme = () => { + const newTheme = theme === "dark" ? "light" : "dark"; + setTheme(newTheme); + document.documentElement.setAttribute("data-theme", newTheme); + }; + + const { messages, sendMessage, clearChatHistory } = useAgentChatHttp({ + agent: "resumable-chat-agent", + enableResumableStreams: true + }); + + const handleSubmit = useCallback( + (e: React.FormEvent) => { + e.preventDefault(); + if (input.trim()) { + sendMessage({ role: "user", parts: [{ type: "text", text: input }] }); + setInput(""); + } + }, + [input, sendMessage] + ); + + const handleInputChange = (e: React.ChangeEvent) => { + setInput(e.target.value); + }; + + // Scroll to bottom when messages change + useEffect(() => { + messages.length > 0 && scrollToBottom(); + }, [messages, scrollToBottom]); + + return ( + <> +
+ + +
+ +
+
+ {messages?.map((m: Message) => ( +
+ {`${m.role}: `} + {m.parts?.map((part, i) => { + switch (part.type) { + case "text": + return ( +
+ {part.text} +
+ ); + default: + return null; + } + })} +
+
+ ))} +
+
+ +
+ +
+
+ + ); +} diff --git a/examples/resumable-stream-chat/src/client.tsx b/examples/resumable-stream-chat/src/client.tsx new file mode 100644 index 00000000..e1d3897a --- /dev/null +++ b/examples/resumable-stream-chat/src/client.tsx @@ -0,0 +1,7 @@ +import "./styles.css"; +import { createRoot } from "react-dom/client"; +import App from "./app"; + +const root = createRoot(document.getElementById("root")!); + +root.render(); diff --git a/examples/resumable-stream-chat/src/server.ts b/examples/resumable-stream-chat/src/server.ts new file mode 100644 index 00000000..16a740f6 --- /dev/null +++ b/examples/resumable-stream-chat/src/server.ts @@ -0,0 +1,46 @@ +import { openai } from "@ai-sdk/openai"; +import { routeAgentRequest } from "agents"; +import { AIHttpChatAgent } from "agents/ai-chat-agent-http"; +import { + convertToModelMessages, + createUIMessageStream, + createUIMessageStreamResponse, + type StreamTextOnFinishCallback, + type ToolSet, + streamText +} from "ai"; + +type Env = { + OPENAI_API_KEY: string; +}; + +export class ResumableChatAgent extends AIHttpChatAgent { + async onChatMessage( + onFinish: StreamTextOnFinishCallback, + _options?: { streamId?: string } + ): Promise { + const stream = createUIMessageStream({ + execute: async ({ writer }) => { + const result = streamText({ + messages: convertToModelMessages(this.messages), + model: openai("gpt-4o"), + onFinish + }); + + writer.merge(result.toUIMessageStream()); + } + }); + + const response = createUIMessageStreamResponse({ stream }); + return response; + } +} + +export default { + async fetch(request: Request, env: Env, _ctx: ExecutionContext) { + return ( + (await routeAgentRequest(request, env)) || + new Response("Not found", { status: 404 }) + ); + } +} satisfies ExportedHandler; diff --git a/examples/resumable-stream-chat/src/styles.css b/examples/resumable-stream-chat/src/styles.css new file mode 100644 index 00000000..0c685513 --- /dev/null +++ b/examples/resumable-stream-chat/src/styles.css @@ -0,0 +1,292 @@ +/* Cloudflare color palette and theme variables */ +:root { + --cf-orange: #f48120; + --cf-dark-blue: #001835; + --cf-blue: #0051c3; + --cf-light-blue: #0098ec; + + /* Font stack */ + --font-sans: + -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, + sans-serif, "Apple Color Emoji", "Segoe UI Emoji"; + --font-mono: + ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", + "Courier New", monospace; + + /* Light mode */ + --background: #ffffff; + --text: #001835; + --secondary-text: #5c7291; + --border: #e5e7eb; + --input-bg: #f9fafb; + --code-bg: #f3f4f6; + --button-primary: var(--cf-blue); + --button-primary-hover: var(--cf-dark-blue); + --button-danger: #dc2626; + --button-danger-hover: #991b1b; +} + +/* Dark mode */ +[data-theme="dark"] { + --background: #001835; + --text: #ffffff; + --secondary-text: #94a3b8; + --border: #1e293b; + --input-bg: #0f172a; + --code-bg: #1e293b; + --button-primary: var(--cf-light-blue); + --button-primary-hover: var(--cf-blue); + --button-danger: #ef4444; + --button-danger-hover: #dc2626; +} + +/* Base styles */ +body { + background-color: var(--background); + color: var(--text); + transition: + background-color 0.3s ease, + color 0.3s ease; + font-family: var(--font-sans); + -webkit-font-smoothing: antialiased; + -moz-osx-font-smoothing: grayscale; +} + +/* Controls container */ +.controls-container { + position: fixed; + top: 1rem; + right: 1rem; + display: flex; + gap: 0.75rem; + z-index: 10; + align-items: center; +} + +/* Theme switch */ +.theme-switch { + position: relative; + width: 3.5rem; + height: 1.75rem; + background: var(--input-bg); + border-radius: 1rem; + border: 1px solid var(--border); + cursor: pointer; + transition: all 0.3s ease; + display: flex; + align-items: center; + padding: 0.25rem; +} + +.theme-switch::before { + content: "🌙"; + position: absolute; + left: 0.25rem; + font-size: 1rem; + line-height: 1; + transition: opacity 0.3s ease; + opacity: 0; +} + +.theme-switch::after { + content: "🌞"; + position: absolute; + right: 0.25rem; + font-size: 1rem; + line-height: 1; + transition: opacity 0.3s ease; + opacity: 0; +} + +.theme-switch[data-theme="dark"]::before { + opacity: 1; +} + +.theme-switch[data-theme="light"]::after { + opacity: 1; +} + +.theme-switch-handle { + width: 1.25rem; + height: 1.25rem; + background: var(--text); + border-radius: 50%; + transition: transform 0.3s ease; +} + +.theme-switch[data-theme="light"] .theme-switch-handle { + transform: translateX(1.75rem); +} + +/* Clear history button */ +.clear-history { + padding: 0.5rem; + border-radius: 0.5rem; + background: var(--input-bg); + border: 1px solid var(--button-danger); + color: var(--button-danger); + cursor: pointer; + display: flex; + align-items: center; + gap: 0.5rem; + font-size: 0.875rem; + font-family: var(--font-sans); + transition: all 0.2s ease; +} + +.clear-history:hover { + color: white; + background: var(--button-danger); +} + +/* Chat container */ +.chat-container { + display: flex; + flex-direction: column; + width: 100%; + max-width: 28rem; + height: calc(100vh - 8rem); /* Leave space for top margin and input */ + margin: 4rem auto 0; + padding: 0 1rem; + overflow-y: auto; + position: relative; + scrollbar-width: thin; + scrollbar-color: var(--secondary-text) var(--input-bg); +} + +/* Webkit scrollbar styles */ +.chat-container::-webkit-scrollbar { + width: 8px; +} + +.chat-container::-webkit-scrollbar-track { + background: var(--input-bg); + border-radius: 4px; +} + +.chat-container::-webkit-scrollbar-thumb { + background: var(--secondary-text); + border-radius: 4px; +} + +.chat-container::-webkit-scrollbar-thumb:hover { + background: var(--text); +} + +/* Messages wrapper to ensure proper spacing */ +.messages-wrapper { + flex: 1; + padding-bottom: 5rem; /* Space for input */ +} + +/* Message styles */ +.message { + white-space: pre-wrap; + margin-bottom: 1.5rem; + padding: 1rem; + border-radius: 0.5rem; + background: var(--input-bg); + border: 1px solid var(--border); + font-family: var(--font-sans); +} + +.message strong { + color: var(--cf-orange); + font-weight: 600; + font-family: var(--font-sans); +} + +.message-content { + margin-top: 0.5rem; + color: var(--text); + font-family: var(--font-sans); + line-height: 1.5; +} + +/* Tool invocation styles */ +.dynamic-info { + font-family: var(--font-mono); + background-color: var(--code-bg); + padding: 0.25rem 0.5rem; + border-radius: 0.25rem; + font-size: 0.875rem; + color: var(--cf-orange); +} + +.tool-invocation { + color: var(--secondary-text); + background: var(--input-bg); + padding: 1rem; + border-radius: 0.5rem; + border: 1px solid var(--border); + margin-top: 0.5rem; + font-family: var(--font-sans); +} + +/* Button styles */ +.button-container { + display: flex; + gap: 0.75rem; + padding-top: 1rem; +} + +.button-approve, +.button-reject { + padding: 0.5rem 1rem; + font-weight: 600; + color: white; + border-radius: 0.375rem; + cursor: pointer; + transition: all 0.2s ease; + border: none; + font-size: 0.875rem; + font-family: var(--font-sans); +} + +.button-approve { + background-color: var(--button-primary); +} + +.button-approve:hover { + background-color: var(--button-primary-hover); +} + +.button-reject { + background-color: var(--button-danger); +} + +.button-reject:hover { + background-color: var(--button-danger-hover); +} + +/* Input styles */ +.chat-input { + position: fixed; + bottom: 0; + width: 100%; + max-width: 26rem; + padding: 0.75rem 1rem; + margin-bottom: 2rem; + background: var(--input-bg); + border: 1px solid var(--border); + border-radius: 0.5rem; + color: var(--text); + font-family: var(--font-sans); + font-size: 0.875rem; + box-shadow: + 0 20px 25px -5px rgba(0, 0, 0, 0.1), + 0 8px 10px -6px rgba(0, 0, 0, 0.1); + transition: all 0.2s ease; +} + +.chat-input:focus { + outline: none; + border-color: var(--cf-light-blue); + box-shadow: 0 0 0 3px rgba(0, 152, 236, 0.1); +} + +.chat-input:disabled { + cursor: not-allowed; + opacity: 0.7; + background: var(--code-bg); +} diff --git a/examples/resumable-stream-chat/tsconfig.json b/examples/resumable-stream-chat/tsconfig.json new file mode 100644 index 00000000..9536a0f4 --- /dev/null +++ b/examples/resumable-stream-chat/tsconfig.json @@ -0,0 +1,3 @@ +{ + "extends": "../../tsconfig.base.json" +} diff --git a/examples/resumable-stream-chat/vite.config.ts b/examples/resumable-stream-chat/vite.config.ts new file mode 100644 index 00000000..40b7683e --- /dev/null +++ b/examples/resumable-stream-chat/vite.config.ts @@ -0,0 +1,26 @@ +import { cloudflare } from "@cloudflare/vite-plugin"; +import react from "@vitejs/plugin-react"; +import chalk from "chalk"; +import { defineConfig } from "vite"; + +export default defineConfig({ + plugins: [ + react(), + cloudflare(), + + { + configureServer(server) { + server.middlewares.use((req, _res, next) => { + const timeString = new Date().toLocaleTimeString(); + console.log( + `[${chalk.blue(timeString)}] ${chalk.green( + req.method + )} ${chalk.yellow(req.url)}` + ); + next(); + }); + }, + name: "requestLogger" + } + ] +}); diff --git a/examples/resumable-stream-chat/wrangler.jsonc b/examples/resumable-stream-chat/wrangler.jsonc new file mode 100644 index 00000000..a3b80259 --- /dev/null +++ b/examples/resumable-stream-chat/wrangler.jsonc @@ -0,0 +1,26 @@ +{ + "name": "resumable-chat", + "main": "./src/server.ts", + "compatibility_date": "2025-02-21", + "compatibility_flags": [ + "nodejs_compat", + "nodejs_compat_populate_process_env" + ], + "assets": { + "directory": "public" + }, + "durable_objects": { + "bindings": [ + { + "name": "ResumableChatAgent", + "class_name": "ResumableChatAgent" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": ["ResumableChatAgent"] + } + ] +} diff --git a/package-lock.json b/package-lock.json index c71ce30e..3bb4c3b4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -531,6 +531,10 @@ "node": "^18 || >=20" } }, + "examples/resumable-stream-chat": { + "version": "0.0.0", + "license": "ISC" + }, "examples/tictactoe": { "name": "@cloudflare/agents-tictactoe", "version": "0.0.0", @@ -4075,6 +4079,113 @@ "url": "https://opencollective.com/libvips" } }, + "node_modules/@inquirer/ansi": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@inquirer/ansi/-/ansi-1.0.1.tgz", + "integrity": "sha512-yqq0aJW/5XPhi5xOAL1xRCpe1eh8UFVgYFpFsjEqmIR8rKLyP+HINvFXwUaxYICflJrVlxnp7lLN6As735kVpw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=18" + } + }, + "node_modules/@inquirer/confirm": { + "version": "5.1.19", + "resolved": "https://registry.npmjs.org/@inquirer/confirm/-/confirm-5.1.19.tgz", + "integrity": "sha512-wQNz9cfcxrtEnUyG5PndC8g3gZ7lGDBzmWiXZkX8ot3vfZ+/BLjR8EvyGX4YzQLeVqtAlY/YScZpW7CW8qMoDQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@inquirer/core": "^10.3.0", + "@inquirer/type": "^3.0.9" + }, + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "@types/node": ">=18" + }, + "peerDependenciesMeta": { + "@types/node": { + "optional": true + } + } + }, + "node_modules/@inquirer/core": { + "version": "10.3.0", + "resolved": "https://registry.npmjs.org/@inquirer/core/-/core-10.3.0.tgz", + "integrity": "sha512-Uv2aPPPSK5jeCplQmQ9xadnFx2Zhj9b5Dj7bU6ZeCdDNNY11nhYy4btcSdtDguHqCT2h5oNeQTcUNSGGLA7NTA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@inquirer/ansi": "^1.0.1", + "@inquirer/figures": "^1.0.14", + "@inquirer/type": "^3.0.9", + "cli-width": "^4.1.0", + "mute-stream": "^2.0.0", + "signal-exit": "^4.1.0", + "wrap-ansi": "^6.2.0", + "yoctocolors-cjs": "^2.1.2" + }, + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "@types/node": ">=18" + }, + "peerDependenciesMeta": { + "@types/node": { + "optional": true + } + } + }, + "node_modules/@inquirer/core/node_modules/emoji-regex": { + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/emoji-regex/-/emoji-regex-8.0.0.tgz", + "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", + "dev": true, + "license": "MIT" + }, + "node_modules/@inquirer/core/node_modules/is-fullwidth-code-point": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/is-fullwidth-code-point/-/is-fullwidth-code-point-3.0.0.tgz", + "integrity": "sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=8" + } + }, + "node_modules/@inquirer/core/node_modules/string-width": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz", + "integrity": "sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==", + "dev": true, + "license": "MIT", + "dependencies": { + "emoji-regex": "^8.0.0", + "is-fullwidth-code-point": "^3.0.0", + "strip-ansi": "^6.0.1" + }, + "engines": { + "node": ">=8" + } + }, + "node_modules/@inquirer/core/node_modules/wrap-ansi": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-6.2.0.tgz", + "integrity": "sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.0.0", + "string-width": "^4.1.0", + "strip-ansi": "^6.0.0" + }, + "engines": { + "node": ">=8" + } + }, "node_modules/@inquirer/external-editor": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@inquirer/external-editor/-/external-editor-1.0.1.tgz", @@ -4097,6 +4208,34 @@ } } }, + "node_modules/@inquirer/figures": { + "version": "1.0.14", + "resolved": "https://registry.npmjs.org/@inquirer/figures/-/figures-1.0.14.tgz", + "integrity": "sha512-DbFgdt+9/OZYFM+19dbpXOSeAstPy884FPy1KjDu4anWwymZeOYhMY1mdFri172htv6mvc/uvIAAi7b7tvjJBQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=18" + } + }, + "node_modules/@inquirer/type": { + "version": "3.0.9", + "resolved": "https://registry.npmjs.org/@inquirer/type/-/type-3.0.9.tgz", + "integrity": "sha512-QPaNt/nmE2bLGQa9b7wwyRJoLZ7pN6rcyXvzU0YCmivmJyq1BVo94G98tStRWkoD1RgDX5C+dPlhhHzNdu/W/w==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "@types/node": ">=18" + }, + "peerDependenciesMeta": { + "@types/node": { + "optional": true + } + } + }, "node_modules/@isaacs/balanced-match": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@isaacs/balanced-match/-/balanced-match-4.0.1.tgz", @@ -4926,6 +5065,24 @@ } } }, + "node_modules/@mswjs/interceptors": { + "version": "0.40.0", + "resolved": "https://registry.npmjs.org/@mswjs/interceptors/-/interceptors-0.40.0.tgz", + "integrity": "sha512-EFd6cVbHsgLa6wa4RljGj6Wk75qoHxUSyc5asLyyPSyuhIcdS2Q3Phw6ImS1q+CkALthJRShiYfKANcQMuMqsQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@open-draft/deferred-promise": "^2.2.0", + "@open-draft/logger": "^0.3.0", + "@open-draft/until": "^2.0.0", + "is-node-process": "^1.2.0", + "outvariant": "^1.4.3", + "strict-event-emitter": "^0.5.1" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/@napi-rs/wasm-runtime": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.0.7.tgz", @@ -5300,6 +5457,31 @@ "@octokit/openapi-types": "^20.0.0" } }, + "node_modules/@open-draft/deferred-promise": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/@open-draft/deferred-promise/-/deferred-promise-2.2.0.tgz", + "integrity": "sha512-CecwLWx3rhxVQF6V4bAgPS5t+So2sTbPgAzafKkVizyi7tlwpcFpdFqq+wqF2OwNBmqFuu6tOyouTuxgpMfzmA==", + "dev": true, + "license": "MIT" + }, + "node_modules/@open-draft/logger": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@open-draft/logger/-/logger-0.3.0.tgz", + "integrity": "sha512-X2g45fzhxH238HKO4xbSr7+wBS8Fvw6ixhTDuvLd5mqh6bJJCFAPwU9mPDxbcrRtfxv4u5IHCEH77BmxvXmmxQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "is-node-process": "^1.2.0", + "outvariant": "^1.4.0" + } + }, + "node_modules/@open-draft/until": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@open-draft/until/-/until-2.1.0.tgz", + "integrity": "sha512-U69T3ItWHvLwGg5eJ0n3I62nWuE6ilHlmz7zM0npLBRvPRd7e6NYmg54vvRtP5mZG7kZqZCFVdsTWo7BPtBujg==", + "dev": true, + "license": "MIT" + }, "node_modules/@openai/agents": { "version": "0.3.0", "resolved": "https://registry.npmjs.org/@openai/agents/-/agents-0.3.0.tgz", @@ -9181,6 +9363,13 @@ "@types/send": "*" } }, + "node_modules/@types/statuses": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/@types/statuses/-/statuses-2.0.6.tgz", + "integrity": "sha512-xMAgYwceFhRA2zY+XbEA7mxYbA093wdiW8Vu6gZPGWy9cmOyU9XesH1tNcEWsKFd5Vzrqx5T3D38PWx1FIIXkA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/trusted-types": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz", @@ -12338,6 +12527,16 @@ "url": "https://github.com/chalk/strip-ansi?sponsor=1" } }, + "node_modules/cli-width": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/cli-width/-/cli-width-4.1.0.tgz", + "integrity": "sha512-ouuZd4/dm2Sw5Gmqy6bGyNNNe1qt9RpmxveLSO7KcgsTnU7RXfsw+/bukWGo1abgBiMAic068rclZsO4IWmmxQ==", + "dev": true, + "license": "ISC", + "engines": { + "node": ">= 12" + } + }, "node_modules/cliui": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/cliui/-/cliui-8.0.1.tgz", @@ -14789,6 +14988,16 @@ "dev": true, "license": "ISC" }, + "node_modules/graphql": { + "version": "16.12.0", + "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.12.0.tgz", + "integrity": "sha512-DKKrynuQRne0PNpEbzuEdHlYOMksHSUI8Zc9Unei5gTsMNA2/vMpoMz/yKba50pejK56qj98qM0SjYxAKi13gQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" + } + }, "node_modules/gsap": { "version": "3.13.0", "resolved": "https://registry.npmjs.org/gsap/-/gsap-3.13.0.tgz", @@ -15061,6 +15270,13 @@ "url": "https://opencollective.com/unified" } }, + "node_modules/headers-polyfill": { + "version": "4.0.3", + "resolved": "https://registry.npmjs.org/headers-polyfill/-/headers-polyfill-4.0.3.tgz", + "integrity": "sha512-IScLbePpkvO846sIwOtOTDjutRMWdXdJmXdMvk6gCBHxFO8d+QKOQedyZSxFTTFYRSmlgSTDtXqqq4pcenBXLQ==", + "dev": true, + "license": "MIT" + }, "node_modules/hono": { "version": "4.10.4", "resolved": "https://registry.npmjs.org/hono/-/hono-4.10.4.tgz", @@ -15405,6 +15621,13 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/is-node-process": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/is-node-process/-/is-node-process-1.2.0.tgz", + "integrity": "sha512-Vg4o6/fqPxIjtxgUH5QLJhwZ7gW5diGCVlXpuUfELC62CuxM1iHcRe51f2W1FDy04Ai4KJkagKjx3XaqyfRKXw==", + "dev": true, + "license": "MIT" + }, "node_modules/is-number": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", @@ -18177,6 +18400,68 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "license": "MIT" }, + "node_modules/msw": { + "version": "2.12.0", + "resolved": "https://registry.npmjs.org/msw/-/msw-2.12.0.tgz", + "integrity": "sha512-jzf2eVnd8+iWXN74dccLrHUw3i3hFVvNVQRWS4vBl2KxaUt7Tdur0Eyda/DODGFkZDu2P5MXaeLe/9Qx8PZkrg==", + "dev": true, + "hasInstallScript": true, + "license": "MIT", + "dependencies": { + "@inquirer/confirm": "^5.0.0", + "@mswjs/interceptors": "^0.40.0", + "@open-draft/deferred-promise": "^2.2.0", + "@types/statuses": "^2.0.4", + "cookie": "^1.0.2", + "graphql": "^16.8.1", + "headers-polyfill": "^4.0.2", + "is-node-process": "^1.2.0", + "outvariant": "^1.4.3", + "path-to-regexp": "^6.3.0", + "picocolors": "^1.1.1", + "rettime": "^0.7.0", + "statuses": "^2.0.2", + "strict-event-emitter": "^0.5.1", + "tough-cookie": "^6.0.0", + "type-fest": "^4.26.1", + "until-async": "^3.0.2", + "yargs": "^17.7.2" + }, + "bin": { + "msw": "cli/index.js" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/mswjs" + }, + "peerDependencies": { + "typescript": ">= 4.8.x" + }, + "peerDependenciesMeta": { + "typescript": { + "optional": true + } + } + }, + "node_modules/msw/node_modules/path-to-regexp": { + "version": "6.3.0", + "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-6.3.0.tgz", + "integrity": "sha512-Yhpw4T9C6hPpgPeA28us07OJeqZ5EzQTkbfwuhsUg0c237RomFoETJgmp2sa3F/41gfLE6G5cqcYwznmeEeOlQ==", + "dev": true, + "license": "MIT" + }, + "node_modules/msw/node_modules/statuses": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.2.tgz", + "integrity": "sha512-DvEy55V3DB7uknRo+4iOGT5fP1slR8wQohVdknigZPMpMstaKJQWhwiYBACJE3Ul2pTnATihhBYnRhZQHGBiRw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">= 0.8" + } + }, "node_modules/multiformats": { "version": "9.9.0", "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.9.0.tgz", @@ -18193,6 +18478,16 @@ "mustache": "bin/mustache" } }, + "node_modules/mute-stream": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-2.0.0.tgz", + "integrity": "sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==", + "dev": true, + "license": "ISC", + "engines": { + "node": "^18.17.0 || >=20.5.0" + } + }, "node_modules/nano-spawn": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/nano-spawn/-/nano-spawn-2.0.0.tgz", @@ -18580,6 +18875,13 @@ "dev": true, "license": "MIT" }, + "node_modules/outvariant": { + "version": "1.4.3", + "resolved": "https://registry.npmjs.org/outvariant/-/outvariant-1.4.3.tgz", + "integrity": "sha512-+Sl2UErvtsoajRDKCE5/dBz4DIvHXQQnAxtQTF04OJxY0+DyZXSo5P5Bb7XYWOh81syohlYL24hbDwxedPUJCA==", + "dev": true, + "license": "MIT" + }, "node_modules/ox": { "version": "0.9.3", "resolved": "https://registry.npmjs.org/ox/-/ox-0.9.3.tgz", @@ -20078,6 +20380,10 @@ "integrity": "sha512-gSfoiOEA0VPE6Tukkrr7I0RBdE0s7H1eFCDBk05l1KIQT1UIKNc5JZy6jdyW6eYH3aR3g5b3PuL77rq0hvwtAw==", "license": "MIT" }, + "node_modules/resumable-stream-chat": { + "resolved": "examples/resumable-stream-chat", + "link": true + }, "node_modules/ret": { "version": "0.5.0", "resolved": "https://registry.npmjs.org/ret/-/ret-0.5.0.tgz", @@ -20149,6 +20455,13 @@ "url": "https://opencollective.com/unified" } }, + "node_modules/rettime": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/rettime/-/rettime-0.7.0.tgz", + "integrity": "sha512-LPRKoHnLKd/r3dVxcwO7vhCW+orkOGj9ViueosEBK6ie89CijnfRlhaDhHq/3Hxu4CkWQtxwlBG0mzTQY6uQjw==", + "dev": true, + "license": "MIT" + }, "node_modules/reusify": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.1.0.tgz", @@ -21245,6 +21558,13 @@ "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", "license": "MIT" }, + "node_modules/strict-event-emitter": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.5.1.tgz", + "integrity": "sha512-vMgjE/GGEPEFnhFub6pa4FmJBRBVOLpIII2hvCZ8Kzb7K0hlHo7mQv6xYrBvCL2LtAIBwFUK8wvuJgTVSQ5MFQ==", + "dev": true, + "license": "MIT" + }, "node_modules/strict-uri-encode": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-2.0.0.tgz", @@ -21721,6 +22041,26 @@ "node": ">=14.0.0" } }, + "node_modules/tldts": { + "version": "7.0.17", + "resolved": "https://registry.npmjs.org/tldts/-/tldts-7.0.17.tgz", + "integrity": "sha512-Y1KQBgDd/NUc+LfOtKS6mNsC9CCaH+m2P1RoIZy7RAPo3C3/t8X45+zgut31cRZtZ3xKPjfn3TkGTrctC2TQIQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "tldts-core": "^7.0.17" + }, + "bin": { + "tldts": "bin/cli.js" + } + }, + "node_modules/tldts-core": { + "version": "7.0.17", + "resolved": "https://registry.npmjs.org/tldts-core/-/tldts-core-7.0.17.tgz", + "integrity": "sha512-DieYoGrP78PWKsrXr8MZwtQ7GLCUeLxihtjC1jZsW1DnvSMdKPitJSe8OSYDM2u5H6g3kWJZpePqkp43TfLh0g==", + "dev": true, + "license": "MIT" + }, "node_modules/tmp": { "version": "0.2.5", "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.2.5.tgz", @@ -21805,6 +22145,19 @@ "node": ">=6" } }, + "node_modules/tough-cookie": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-6.0.0.tgz", + "integrity": "sha512-kXuRi1mtaKMrsLUxz3sQYvVl37B0Ns6MzfrtV5DvJceE9bPyspOqk9xxv7XbZWcfLWbFmm997vl83qUWVJA64w==", + "dev": true, + "license": "BSD-3-Clause", + "dependencies": { + "tldts": "^7.0.5" + }, + "engines": { + "node": ">=16" + } + }, "node_modules/tr46": { "version": "0.0.3", "resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz", @@ -22730,6 +23083,16 @@ "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==", "license": "ISC" }, + "node_modules/until-async": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/until-async/-/until-async-3.0.2.tgz", + "integrity": "sha512-IiSk4HlzAMqTUseHHe3VhIGyuFmN90zMTpD3Z3y8jeQbzLIq500MVM7Jq2vUAnTKAFPJrqwkzr6PoTcPhGcOiw==", + "dev": true, + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/kettanaito" + } + }, "node_modules/update-browserslist-db": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.1.3.tgz", @@ -24021,6 +24384,19 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/yoctocolors-cjs": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/yoctocolors-cjs/-/yoctocolors-cjs-2.1.3.tgz", + "integrity": "sha512-U/PBtDf35ff0D8X8D0jfdzHYEPFxAI7jJlxZXwCSez5M3190m+QobIfh+sWDWSHMCWWJN2AWamkegn6vr6YBTw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/youch": { "version": "4.1.0-beta.10", "resolved": "https://registry.npmjs.org/youch/-/youch-4.1.0-beta.10.tgz", @@ -24196,6 +24572,7 @@ "devDependencies": { "@cloudflare/workers-oauth-provider": "^0.1.0", "@types/yargs": "^17.0.34", + "msw": "^2.11.3", "react": "*", "vitest-browser-react": "^1.0.1", "x402": "^0.7.1" diff --git a/packages/agents/package.json b/packages/agents/package.json index 002d8e22..1653d3ae 100644 --- a/packages/agents/package.json +++ b/packages/agents/package.json @@ -41,6 +41,7 @@ "devDependencies": { "@cloudflare/workers-oauth-provider": "^0.1.0", "@types/yargs": "^17.0.34", + "msw": "^2.11.3", "react": "*", "vitest-browser-react": "^1.0.1", "x402": "^0.7.1" @@ -129,6 +130,16 @@ "types": "./dist/mcp/x402.d.ts", "import": "./dist/mcp/x402.js", "require": "./dist/mcp/x402.js" + }, + "./ai-chat-agent-http": { + "types": "./dist/ai-chat-agent-http.d.ts", + "import": "./dist/ai-chat-agent-http.js", + "require": "./dist/ai-chat-agent-http.js" + }, + "./use-agent-chat-http": { + "types": "./dist/use-agent-chat-http.d.ts", + "import": "./dist/use-agent-chat-http.js", + "require": "./dist/use-agent-chat-http.js" } }, "publishConfig": { diff --git a/packages/agents/src/ai-chat-agent-http.ts b/packages/agents/src/ai-chat-agent-http.ts new file mode 100644 index 00000000..6121e38d --- /dev/null +++ b/packages/agents/src/ai-chat-agent-http.ts @@ -0,0 +1,287 @@ +import type { + UIMessage as ChatMessage, + StreamTextOnFinishCallback, + ToolSet +} from "ai"; +import { nanoid } from "nanoid"; +import { Agent, type AgentContext } from "./"; +import { autoTransformMessages } from "./ai-chat-v5-migration"; +import { ResumableStreamManager } from "./resumable-stream-manager"; + +export class AIHttpChatAgent< + Env = unknown, + State = unknown, + Message extends ChatMessage = ChatMessage +> extends Agent { + /** Array of chat messages for the current conversation */ + messages: Message[]; + + /** Resumable stream manager for handling stream persistence and resumption */ + private _streamManager: ResumableStreamManager; + + constructor(ctx: AgentContext, env: Env) { + super(ctx, env); + + // Initialize message storage table + this.sql`create table if not exists cf_ai_http_chat_messages ( + id text primary key, + message text not null, + created_at datetime default current_timestamp + )`; + + // Initialize resumable stream manager + this._streamManager = new ResumableStreamManager(ctx, this.sql); + + // Load messages and automatically transform them to v5 format + const rawMessages = ( + this + .sql`select * from cf_ai_http_chat_messages order by created_at asc` || + [] + ).map((row) => { + return JSON.parse(row.message as string); + }); + + this.messages = autoTransformMessages(rawMessages) as Message[]; + } + + override async onRequest(request: Request): Promise { + const url = new URL(request.url); + const pathname = url.pathname; + + try { + // GET /messages - Retrieve message history with pagination + if (pathname.endsWith("/messages") && request.method === "GET") { + return this._handleGetMessages(request); + } + + // POST /chat - Send message and get streaming response + if (pathname.endsWith("/chat") && request.method === "POST") { + return this._handlePostChat(request); + } + + // POST /stream/{streamId}/cancel - Cancel active stream + if ( + pathname.includes("/stream/") && + pathname.endsWith("/cancel") && + request.method === "POST" + ) { + const streamId = pathname.split("/stream/")[1].replace("/cancel", ""); + return this._handleCancelStream(streamId); + } + + // GET /stream/{streamId}/status - Get stream status + if ( + pathname.includes("/stream/") && + pathname.endsWith("/status") && + request.method === "GET" + ) { + const streamId = pathname.split("/stream/")[1].replace("/status", ""); + return this._handleStreamStatus(streamId); + } + + // GET /stream/{streamId} - Resume interrupted stream + if (pathname.includes("/stream/") && request.method === "GET") { + const streamId = pathname.split("/stream/")[1]; + return this._handleResumeStream(streamId); + } + + // DELETE /messages - Clear message history + if (pathname.endsWith("/messages") && request.method === "DELETE") { + return this._handleClearMessages(); + } + + return super.onRequest(request); + } catch (error) { + console.error("[AIHttpChatAgent] Request error:", error); + return new Response( + JSON.stringify({ + error: "Internal server error", + message: error instanceof Error ? error.message : String(error) + }), + { + status: 500, + headers: { "Content-Type": "application/json" } + } + ); + } + } + + /** + * Handle GET /messages - Retrieve paginated message history + */ + private async _handleGetMessages(request: Request): Promise { + const url = new URL(request.url); + const limit = Math.min( + Number.parseInt(url.searchParams.get("limit") || "50", 10), + 100 + ); + const offset = Number.parseInt(url.searchParams.get("offset") || "0", 10); + + const messages = ( + this.sql`select * from cf_ai_http_chat_messages + order by created_at asc + limit ${limit} offset ${offset}` || [] + ).map((row) => { + return JSON.parse(row.message as string); + }); + + const countResult = this + .sql`select count(*) as count from cf_ai_http_chat_messages`; + const totalCount = (countResult[0] as { count: number }).count; + + return new Response( + JSON.stringify({ + messages: autoTransformMessages(messages), + pagination: { + limit, + offset, + total: totalCount, + hasMore: offset + limit < totalCount + } + }), + { + headers: { "Content-Type": "application/json" } + } + ); + } + + /** + * Handle POST /chat - Send message and get streaming response + */ + private async _handlePostChat(request: Request): Promise { + const body = (await request.json()) as { + messages?: Message[]; + streamId?: string; + includeMessages?: boolean; + }; + const { + messages: incomingMessages, + streamId: requestStreamId, + includeMessages + } = body; + + if (!Array.isArray(incomingMessages)) { + return new Response( + JSON.stringify({ error: "Messages must be an array" }), + { + status: 400, + headers: { "Content-Type": "application/json" } + } + ); + } + + // Transform and persist incoming messages (if not resuming) + if (incomingMessages.length > 0) { + const transformedMessages = autoTransformMessages( + incomingMessages + ) as Message[]; + await this.persistMessages(transformedMessages); + } + + // Generate or reuse stream ID + const streamId = requestStreamId || `stream_${nanoid()}`; + + // Delegate to stream manager + return this._streamManager.startStream( + streamId, + this.onChatMessage.bind(this), + this.persistMessages.bind(this), + this.messages, + includeMessages + ); + } + + /** + * Handle GET /stream/{streamId} - Resume interrupted stream + */ + private async _handleResumeStream(streamId: string): Promise { + return this._streamManager.resumeStream(streamId, this.messages); + } + + /** + * Handle POST /stream/{streamId}/cancel - Cancel active stream + */ + private async _handleCancelStream(streamId: string): Promise { + return this._streamManager.cancelStream(streamId); + } + + /** + * Handle GET /stream/{streamId}/status - Get stream status + */ + private async _handleStreamStatus(streamId: string): Promise { + return this._streamManager.getStreamStatus(streamId); + } + + /** + * Handle DELETE /messages - Clear message history + */ + private async _handleClearMessages(): Promise { + this.sql`delete from cf_ai_http_chat_messages`; + this.messages = [] as Message[]; + await this._streamManager.clearStreams(); + + return new Response( + JSON.stringify({ success: true, message: "Messages cleared" }), + { headers: { "Content-Type": "application/json" } } + ); + } + + /** + * Handle incoming chat messages and generate a response + * @param onFinish Callback to be called when the response is finished + * @param options.streamId The stream ID for resumable streaming + * @returns Response to send to the client or undefined + */ + async onChatMessage( + // biome-ignore lint/correctness/noUnusedFunctionParameters: overridden later + onFinish: StreamTextOnFinishCallback, + // biome-ignore lint/correctness/noUnusedFunctionParameters: overridden later + options?: { streamId?: string } + ): Promise { + throw new Error( + "Received a chat message, override onChatMessage and return a Response to send to the client" + ); + } + + /** + * Save messages following AI SDK patterns + * @param messages Chat messages to save + */ + async saveMessages(messages: Message[]) { + await this.persistMessages(messages); + } + + /** + * Persist messages to database + * @param messages Messages to persist + */ + async persistMessages(messages: Message[]) { + // Clear existing messages and insert new ones + this.sql`delete from cf_ai_http_chat_messages`; + + for (const message of messages) { + this.sql` + insert into cf_ai_http_chat_messages (id, message) + values (${message.id}, ${JSON.stringify(message)}) + `; + } + + this.messages = messages; + } + + /** + * Clean up old completed streams (call periodically) + */ + async cleanupOldStreams(maxAgeHours = 24): Promise { + await this._streamManager.cleanupOldStreams(maxAgeHours); + } + + /** + * Override destroy to properly clean up resumable streaming + */ + override async destroy(): Promise { + // Clean up resumable streaming first + await this._streamManager.destroy(); + await super.destroy(); + } +} diff --git a/packages/agents/src/client.ts b/packages/agents/src/client.ts index e49c5495..ab5f1f02 100644 --- a/packages/agents/src/client.ts +++ b/packages/agents/src/client.ts @@ -3,6 +3,7 @@ import { PartySocket, type PartySocketOptions } from "partysocket"; +import { nanoid } from "nanoid"; import type { RPCRequest, RPCResponse } from "./"; import type { SerializableReturnValue, @@ -181,7 +182,7 @@ export class AgentClient extends PartySocket { streamOptions?: StreamOptions ): Promise { return new Promise((resolve, reject) => { - const id = Math.random().toString(36).slice(2); + const id = nanoid(); this._pendingCalls.set(id, { reject, resolve: (value: unknown) => resolve(value as T), diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index fc9d3fce..41f36988 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -9,6 +9,7 @@ import { type ElicitResult } from "@modelcontextprotocol/sdk/types.js"; import type { Connection, ConnectionContext } from "../"; +import { nanoid } from "nanoid"; import { Agent } from "../index"; import type { BaseTransportType, MaybePromise, ServeOptions } from "./types"; import { @@ -224,7 +225,7 @@ export abstract class McpAgent< message: string; requestedSchema: unknown; }): Promise { - const requestId = `elicit_${Math.random().toString(36).substring(2, 11)}`; + const requestId = `elicit_${nanoid()}`; // Store pending request in durable storage await this.ctx.storage.put(`elicitation:${requestId}`, { diff --git a/packages/agents/src/react.tsx b/packages/agents/src/react.tsx index 9c22ca43..9f92ae80 100644 --- a/packages/agents/src/react.tsx +++ b/packages/agents/src/react.tsx @@ -1,6 +1,7 @@ import type { PartySocket } from "partysocket"; import { usePartySocket } from "partysocket/react"; import { useCallback, useRef, use, useMemo, useEffect } from "react"; +import { nanoid } from "nanoid"; import type { Agent, MCPServersState, RPCRequest, RPCResponse } from "./"; import type { StreamOptions } from "./client"; import type { Method, RPCMethod } from "./serializable"; @@ -390,7 +391,7 @@ export function useAgent( streamOptions?: StreamOptions ): Promise => { return new Promise((resolve, reject) => { - const id = Math.random().toString(36).slice(2); + const id = nanoid(); pendingCallsRef.current.set(id, { reject, resolve: resolve as (value: unknown) => void, diff --git a/packages/agents/src/resumable-stream-manager.ts b/packages/agents/src/resumable-stream-manager.ts new file mode 100644 index 00000000..e31482a0 --- /dev/null +++ b/packages/agents/src/resumable-stream-manager.ts @@ -0,0 +1,762 @@ +import type { + UIMessage as ChatMessage, + StreamTextOnFinishCallback, + ToolSet +} from "ai"; +import { nanoid } from "nanoid"; +import type { AgentContext } from "./"; + +interface StreamStateRow { + stream_id: string; + seq: number; + fetching: number; + completed: number; + created_at?: string; + updated_at?: string; + headers?: string; +} + +interface TextDeltaRow { + stream_id: string; + seq: number; + text_delta: string; + created_at?: string; +} + +interface StreamStatusRow { + seq: number; + fetching: number; + completed: number; + created_at: string; + updated_at: string; +} + +const decoder = new TextDecoder(); + +export class ResumableStreamManager { + /** Map of stream IDs to their current state for resumable streams */ + private _activeStreams: Map< + string, + { + seq: number; // Current chunk sequence number + fetching: boolean; // Is upstream still fetching? + completed: boolean; + upstreamReader?: ReadableStreamDefaultReader; // Reader for upstream response + timestamp: number; + readers: Set< + WritableStreamDefaultWriter | ReadableStreamDefaultController + >; // Active readers/writers + } + >; + + private ctx: AgentContext; + private sql: >( + strings: TemplateStringsArray, + ...values: (string | number | boolean | null)[] + ) => T[]; + + constructor( + ctx: AgentContext, + sql: >( + strings: TemplateStringsArray, + ...values: (string | number | boolean | null)[] + ) => T[] + ) { + this.ctx = ctx; + this.sql = sql; + this._activeStreams = new Map(); + this._initializeTables(); + } + + /** + * Initialize database tables for resumable streaming + */ + private _initializeTables(): void { + this.sql`create table if not exists cf_ai_http_chat_streams ( + stream_id text primary key, + seq integer not null default 0, + fetching integer not null default 0, + completed integer not null default 0, + created_at datetime default current_timestamp, + updated_at datetime default current_timestamp + )`; + + // Initialize stream text deltas table + this.sql`create table if not exists cf_ai_http_chat_text_deltas ( + stream_id text not null, + seq integer not null, + text_delta text not null, + created_at datetime default current_timestamp, + primary key (stream_id, seq) + )`; + } + + /** + * Start a new resumable stream + */ + async startStream( + streamId: string, + onChatMessage: ( + onFinish: StreamTextOnFinishCallback, + options?: { streamId?: string } + ) => Promise, + persistMessages: (messages: Message[]) => Promise, + messages: Message[], + includeMessages = false + ): Promise { + // Generate random stream ID if not provided + const actualStreamId = streamId || crypto.randomUUID(); + + // Check if this stream already exists and is active + let streamState = this._activeStreams.get(actualStreamId); + if (!streamState) { + const dbState = this.sql` + select * from cf_ai_http_chat_streams + where stream_id = ${actualStreamId} + `[0] as unknown as StreamStateRow | undefined; + + if (dbState) { + console.log( + `[ResumableStreamManager] Found existing DB state for ${actualStreamId}:`, + dbState + ); + streamState = { + seq: dbState.seq, + fetching: Boolean(dbState.fetching), + completed: Boolean(dbState.completed), + timestamp: Date.now(), + readers: new Set() + }; + this._activeStreams.set(actualStreamId, streamState); + } + } + + // Start upstream fetch once (in background) if not already fetching + if (!streamState || (!streamState.fetching && !streamState.completed)) { + console.log( + `[ResumableStreamManager] Need to start upstream fetch for ${actualStreamId}` + ); + + await this.ctx.blockConcurrencyWhile(async () => { + streamState = this._activeStreams.get(actualStreamId); + if (streamState?.fetching || streamState?.completed) { + console.log( + `[ResumableStreamManager] Stream ${actualStreamId} already fetching/completed, skipping` + ); + return; + } + + console.log( + `[ResumableStreamManager] Initializing stream state for ${actualStreamId}` + ); + + // Initialize stream state + this._activeStreams.set(actualStreamId, { + seq: 0, + fetching: true, + completed: false, + timestamp: Date.now(), + readers: new Set() + }); + + // Initialize in database + this.sql` + insert into cf_ai_http_chat_streams (stream_id, seq, fetching, completed) + values (${actualStreamId}, 0, 1, 0) + on conflict(stream_id) do update set + fetching = 1, + updated_at = current_timestamp + `; + + console.log( + `[ResumableStreamManager] Starting upstream fetch in background for ${actualStreamId}` + ); + + // Start upstream fetch in background using waitUntil + this.ctx.waitUntil( + this._startUpstreamFetch( + actualStreamId, + onChatMessage, + persistMessages, + messages + ) + ); + }); + } + + console.log( + `[ResumableStreamManager] Creating client stream for ${actualStreamId}` + ); + + // Create response stream for this client + return this._createClientStream(actualStreamId, messages, includeMessages); + } + + /** + * Resume an interrupted stream + */ + async resumeStream(streamId: string, messages: Message[]): Promise { + // Check if stream exists in database + const streamState = this.sql` + select * from cf_ai_http_chat_streams + where stream_id = ${streamId} + `[0] as unknown as StreamStateRow | undefined; + + if (!streamState) { + return new Response(JSON.stringify({ error: "Stream not found" }), { + status: 404, + headers: { "Content-Type": "application/json" } + }); + } + + // Create a new client stream that will replay stored chunks and join live if still fetching + return this._createClientStream(streamId, messages); + } + + /** + * Cancel an active stream + */ + async cancelStream(streamId: string): Promise { + const state = this._activeStreams.get(streamId); + if (state) { + try { + await state.upstreamReader?.cancel(); + } catch {} + } + + this._markStreamCompleted(streamId); + + return new Response( + JSON.stringify({ success: true, message: "Stream cancelled" }), + { headers: { "Content-Type": "application/json" } } + ); + } + + /** + * Get stream status + */ + async getStreamStatus(streamId: string): Promise { + const streamState = this.sql` + select seq, fetching, completed, created_at, updated_at from cf_ai_http_chat_streams + where stream_id = ${streamId} + `[0] as unknown as StreamStatusRow | undefined; + + if (!streamState) { + return new Response(JSON.stringify({ error: "Stream not found" }), { + status: 404, + headers: { "Content-Type": "application/json" } + }); + } + + return new Response( + JSON.stringify({ + streamId, + position: streamState.seq, + completed: Boolean(streamState.completed), + createdAt: streamState.created_at, + updatedAt: streamState.updated_at + }), + { headers: { "Content-Type": "application/json" } } + ); + } + + /** + * Clear all streams and text deltas + */ + async clearStreams(): Promise { + this.sql`delete from cf_ai_http_chat_streams`; + this.sql`delete from cf_ai_http_chat_text_deltas`; + this._activeStreams.clear(); + } + + /** + * Destroy all resumable streaming + * Should be called during Agent destruction + */ + async destroy(): Promise { + // Clear in-memory state first + this._activeStreams.clear(); + + // Drop all tables + this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_streams`; + this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_text_deltas`; + } + + /** + * Clean up old completed streams (call periodically) + */ + async cleanupOldStreams(maxAgeHours = 24): Promise { + const cutoffTime = new Date( + Date.now() - maxAgeHours * 60 * 60 * 1000 + ).toISOString(); + + this.sql` + delete from cf_ai_http_chat_streams + where completed = 1 and updated_at < ${cutoffTime} + `; + + this.sql` + delete from cf_ai_http_chat_text_deltas + where stream_id in ( + select stream_id from cf_ai_http_chat_streams + where completed = 1 and updated_at < ${cutoffTime} + ) + `; + } + + /** + * Start upstream fetch in background + */ + private async _startUpstreamFetch( + streamId: string, + onChatMessage: ( + onFinish: StreamTextOnFinishCallback, + options?: { streamId?: string } + ) => Promise, + persistMessages: (messages: Message[]) => Promise, + messages: Message[] + ): Promise { + try { + console.log( + `[ResumableStreamManager] Starting upstream fetch for stream ${streamId}` + ); + + const response = await onChatMessage( + async () => { + // Mark stream as completed + console.log(`[ResumableStreamManager] Stream ${streamId} finished`); + this._markStreamCompleted(streamId); + }, + { streamId } + ); + + console.log( + "[ResumableStreamManager] onChatMessage response:", + !!response, + !!response?.body + ); + + if (!response || !response.body) { + console.log( + `[ResumableStreamManager] No response or body for stream ${streamId}, marking completed` + ); + this._markStreamCompleted(streamId); + return; + } + + // Headers are captured and handled in the response stream + + console.log( + `[ResumableStreamManager] Starting to pipe upstream response for stream ${streamId}` + ); + + // Process the upstream response + await this._pipeUpstream(response, streamId, persistMessages, messages); + } catch (error) { + console.error( + `[ResumableStreamManager] Error in upstream fetch for stream ${streamId}:`, + error + ); + this._markStreamCompleted(streamId); + } + } + + /** + * Pipe upstream response and store chunks + */ + private async _pipeUpstream( + response: Response, + streamId: string, + persistMessages: (messages: Message[]) => Promise, + messages: Message[] + ): Promise { + if (!response.body) return; + + const reader = response.body.getReader(); + const streamState = this._activeStreams.get(streamId); + if (!streamState) return; + + streamState.upstreamReader = reader; + + let assistantMessageText = ""; + const assistantMessageId = `assistant_${nanoid()}`; + let buffer = ""; + + let completedNaturally = false; + try { + while (true) { + // Check if stream was completed by onFinish callback + const currentState = this._activeStreams.get(streamId); + if (currentState?.completed) { + // Ensure database state is synchronized + try { + this.sql` + update cf_ai_http_chat_streams + set fetching = 0, completed = 1, updated_at = current_timestamp + where stream_id = ${streamId} + `; + } catch (sqlError) { + console.error( + `[ResumableStreamManager] Error syncing completion state for ${streamId}:`, + sqlError + ); + } + + completedNaturally = true; + break; + } + + const { done, value } = await reader.read(); + if (done) { + completedNaturally = true; + break; + } + + // Parse SSE chunk for text content first + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; + + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim() === "") continue; + if (line.startsWith("data: ")) { + const dataStr = line.slice(6); + if (dataStr === "[DONE]") continue; + try { + const data = JSON.parse(dataStr); + if (data.type === "text-delta" && data.delta) { + assistantMessageText += data.delta; + + // Store the parsed text delta + try { + const seqResult = this.sql` + update cf_ai_http_chat_streams + set seq = seq + 1, updated_at = current_timestamp + where stream_id = ${streamId} + returning seq + `; + + const seq = Number(seqResult[0]?.seq) || streamState.seq++; + + this.sql` + insert into cf_ai_http_chat_text_deltas (stream_id, seq, text_delta) + values (${streamId}, ${seq}, ${data.delta}) + `; + + streamState.seq = seq + 1; + } catch (sqlError) { + console.error( + `[ResumableStreamManager] SQL error storing text delta for ${streamId}:`, + sqlError + ); + } + } + } catch { + // Ignore parse errors + } + } + } + + // Broadcast to all active readers (writers) + for (const readerOrWriter of streamState.readers) { + try { + if (readerOrWriter instanceof WritableStreamDefaultWriter) { + readerOrWriter.write(value); + } else { + // Handle ReadableStreamDefaultController + if ( + "enqueue" in readerOrWriter && + typeof readerOrWriter.enqueue === "function" + ) { + readerOrWriter.enqueue(value); + } + } + } catch { + // Reader might be closed + streamState.readers.delete(readerOrWriter); + } + } + } + + // Save assistant message if we collected any text + if (assistantMessageText) { + // Create assistant message with proper typing + const assistantMessage = { + id: assistantMessageId, + role: "assistant" as const, + parts: [{ type: "text" as const, text: assistantMessageText }] + } as Message; + + await persistMessages([...messages, assistantMessage]); + } + } finally { + // Clear the upstream reader reference + const currentState = this._activeStreams.get(streamId); + if (currentState) { + currentState.upstreamReader = undefined; + } + // Only mark as completed if stream finished naturally, not if interrupted + if (completedNaturally) { + this._markStreamCompleted(streamId); + } else { + // Stream was interrupted - update fetching state but don't mark as completed + if (currentState && !currentState.completed) { + currentState.fetching = false; + } + try { + this.sql` + update cf_ai_http_chat_streams + set fetching = 0, updated_at = current_timestamp + where stream_id = ${streamId} + `; + } catch (sqlError) { + console.error( + `[ResumableStreamManager] Error updating fetching state for ${streamId}:`, + sqlError + ); + } + } + } + } + + /** + * Create client stream that replays stored chunks and joins live if active + */ + private _createClientStream( + streamId: string, + messages: Message[], + includeMessages = false + ): Response { + console.log( + `[ResumableStreamManager] Creating client stream for ${streamId}` + ); + + const dbState = this.sql` + select * from cf_ai_http_chat_streams + where stream_id = ${streamId} + `[0] as unknown as StreamStateRow | undefined; + + console.log(`[ResumableStreamManager] DB state for ${streamId}:`, dbState); + + if (!dbState) { + console.log(`[ResumableStreamManager] No DB state found for ${streamId}`); + return new Response(JSON.stringify({ error: "Stream not found" }), { + status: 404, + headers: { "Content-Type": "application/json" } + }); + } + + let streamState = this._activeStreams.get(streamId); + if (!streamState) { + console.log( + `[ResumableStreamManager] Creating new in-memory state for ${streamId}` + ); + streamState = { + seq: dbState.seq, + fetching: Boolean(dbState.fetching), + completed: Boolean(dbState.completed), + timestamp: Date.now(), + readers: new Set() + }; + this._activeStreams.set(streamId, streamState); + } + + console.log(`[ResumableStreamManager] Stream state for ${streamId}:`, { + seq: streamState.seq, + fetching: streamState.fetching, + completed: streamState.completed, + readersCount: streamState.readers.size + }); + + // Create a TransformStream for this client + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + + // Fork the readable stream for cleanup monitoring + const [toClient, toDrain] = readable.tee(); + + // Track writer's last seen sequence + let lastSeenSeq = -1; + + // Replay stored chunks and setup live streaming + (async () => { + try { + // 1. Replay stored text deltas + await this.ctx.blockConcurrencyWhile(async () => { + const textDeltas = this.sql` + select seq, text_delta from cf_ai_http_chat_text_deltas + where stream_id = ${streamId} + order by seq asc + ` as unknown as Pick[]; + + for (const row of textDeltas) { + // Reconstruct SSE format from stored text delta + const sseData = { + type: "text-delta", + delta: row.text_delta + }; + const sseChunk = `data: ${JSON.stringify(sseData)}\n\n`; + const bytes = new TextEncoder().encode(sseChunk); + await writer.write(bytes); + lastSeenSeq = row.seq; + } + }); + + // 2. Check if stream is truly complete by verifying both in-memory and database state + const currentState = this._activeStreams.get(streamId); + + // Get the latest database state to ensure consistency + const dbState = this.sql` + select fetching, completed from cf_ai_http_chat_streams + where stream_id = ${streamId} + `[0] as unknown as + | Pick + | undefined; + + const isStillFetching = + currentState?.fetching || dbState?.fetching === 1; + const isCompleted = currentState?.completed && dbState?.completed === 1; + + if (isStillFetching && !isCompleted) { + // Stream is still active, join as live reader + if (currentState) { + currentState.readers.add(writer); + } + await this._backfillGaps(streamId, writer, lastSeenSeq + 1); + } else { + // Stream is complete, close writer + await writer.close(); + } + } catch (error) { + console.error("Error in client stream:", error); + try { + await writer.close(); + } catch {} + } + })(); + + // Clean up writer when client disconnects + toDrain.pipeTo(new WritableStream()).catch(() => { + const state = this._activeStreams.get(streamId); + if (state) { + state.readers.delete(writer); + } + }); + + // Set standard SSE headers + const headers: Record = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive" + }; + + // Add stream metadata headers + headers["X-Stream-Id"] = streamId; + headers["X-Resumable"] = "true"; + headers["X-Stream-Complete"] = String(Boolean(dbState?.completed)); + + // Include messages in header if requested + if (includeMessages) { + try { + headers["X-Messages"] = encodeURIComponent(JSON.stringify(messages)); + } catch (e) { + console.error("Failed to add messages to header:", e); + } + } + + return new Response(toClient, { headers }); + } + + /** + * Backfill any text deltas that were written while this writer was joining + */ + private async _backfillGaps( + streamId: string, + writer: WritableStreamDefaultWriter, + startSeq: number + ): Promise { + const streamState = this._activeStreams.get(streamId); + if (!streamState) return; + + let cursor = startSeq; + while (cursor < streamState.seq) { + const gaps = this.sql` + select seq, text_delta from cf_ai_http_chat_text_deltas + where stream_id = ${streamId} and seq >= ${cursor} and seq < ${streamState.seq} + order by seq asc + ` as unknown as Pick[]; + + for (const row of gaps) { + try { + // Reconstruct SSE format from stored text delta + const sseData = { + type: "text-delta", + delta: row.text_delta + }; + const sseChunk = `data: ${JSON.stringify(sseData)}\n\n`; + const bytes = new TextEncoder().encode(sseChunk); + await writer.write(bytes); + cursor = row.seq + 1; + } catch { + // Writer closed + return; + } + } + + // Check if more text deltas arrived while we were backfilling + if (cursor >= streamState.seq) break; + } + } + + /** + * Mark stream as completed + */ + private _markStreamCompleted(streamId: string): void { + const streamState = this._activeStreams.get(streamId); + if (streamState) { + streamState.fetching = false; + streamState.completed = true; + streamState.timestamp = Date.now(); + + // Close all readers/writers + for (const readerOrWriter of streamState.readers) { + try { + if (readerOrWriter instanceof WritableStreamDefaultWriter) { + readerOrWriter.close(); + } else { + // Handle ReadableStreamDefaultController + if ( + "close" in readerOrWriter && + typeof readerOrWriter.close === "function" + ) { + readerOrWriter.close(); + } + } + } catch {} + } + streamState.readers.clear(); + } + + // Update database state with error handling + try { + this.sql` + update cf_ai_http_chat_streams + set fetching = 0, completed = 1, updated_at = current_timestamp + where stream_id = ${streamId} + `; + } catch (sqlError) { + console.error( + `[ResumableStreamManager] Error marking stream ${streamId} completed:`, + sqlError + ); + // Stream is still marked as completed in memory even if SQL fails + } + + // Clean up from memory after some time + setTimeout(() => { + this._activeStreams.delete(streamId); + }, 60000); // Keep in memory for 1 minute after completion + } +} diff --git a/packages/agents/src/tests/resumable-streaming.test.ts b/packages/agents/src/tests/resumable-streaming.test.ts new file mode 100644 index 00000000..ba56f433 --- /dev/null +++ b/packages/agents/src/tests/resumable-streaming.test.ts @@ -0,0 +1,1691 @@ +import { createExecutionContext, env } from "cloudflare:test"; +import { describe, it, expect, beforeEach } from "vitest"; +import worker, { type Env } from "./worker"; +import { nanoid } from "nanoid"; + +declare module "cloudflare:test" { + interface ProvidedEnv extends Env {} +} + +async function makeRequest( + agentId: string, + method: string, + path: string, + body?: unknown +) { + const ctx = createExecutionContext(); + const url = `http://example.com/agents/resumable-stream-agent/${agentId}${path}`; + const requestInit: RequestInit = { + method, + headers: body ? { "Content-Type": "application/json" } : {} + }; + + if (body && method !== "GET") { + requestInit.body = JSON.stringify(body); + } + + const request = new Request(url, requestInit); + return await worker.fetch(request, env, ctx); +} + +async function readPartialStreamChunks( + response: Response, + maxChunks = 3 +): Promise<{ + chunks: string[]; + reader: ReadableStreamDefaultReader; +}> { + if (!response.body) throw new Error("No response body"); + + const reader = response.body.getReader(); + const chunks: string[] = []; + const decoder = new TextDecoder(); + + let count = 0; + while (count < maxChunks) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + chunks.push(chunk); + count++; + } + + return { chunks, reader }; +} + +async function readRemainingChunks( + reader: ReadableStreamDefaultReader +): Promise { + const chunks: string[] = []; + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + chunks.push(chunk); + } + } finally { + reader.releaseLock(); + } + + return chunks; +} + +async function readMoreChunksFromReader( + reader: ReadableStreamDefaultReader, + maxChunks = 3 +): Promise { + const chunks: string[] = []; + const decoder = new TextDecoder(); + + let count = 0; + while (count < maxChunks) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + chunks.push(chunk); + count++; + } + + return chunks; +} + +// Helper function to read all chunks from a stream +async function readStreamChunks(response: Response): Promise { + if (!response.body) throw new Error("No response body"); + + const reader = response.body.getReader(); + return await readRemainingChunks(reader); +} + +function extractTextFromSSE(chunks: string[]): string { + let fullText = ""; + let buffer = ""; + + for (const raw of chunks) { + buffer += raw; + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; // keep last partial line + + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + const payload = line.slice(6).trim(); + if (payload === "[DONE]") continue; + try { + const data = JSON.parse(payload); + if (data.type === "text-delta" && typeof data.delta === "string") { + fullText += data.delta; + } + } catch { + // ignore bad frames + } + } + } + return fullText; +} + +describe("Resumable Streaming - Stream Resumption", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should resume a stream from interruption point", async () => { + const customStreamId = `resume-test-${nanoid()}`; + + // Start a stream with long content + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + expect(response1.headers.get("X-Stream-Id")).toBe(customStreamId); + + // Assert SSE headers + expect(response1.headers.get("Content-Type")).toContain( + "text/event-stream" + ); + expect(response1.headers.get("Cache-Control")).toContain("no-cache"); + + // Read only part of the stream to simulate interruption + const { chunks: firstChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + expect(firstChunks.length).toBeGreaterThan(0); + await reader.cancel(); // Important: cancel to simulate dropped client + + // Resume the stream + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + expect(response2.headers.get("X-Stream-Id")).toBe(customStreamId); + expect(response2.headers.get("Content-Type")).toContain( + "text/event-stream" + ); + + // Read the resumed stream + const secondChunks: string[] = []; + if (response2.body) { + const reader2 = response2.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader2.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + secondChunks.push(chunk); + } + } finally { + reader2.releaseLock(); + } + } + + const firstPartialText = extractTextFromSSE(firstChunks); + const resumedCompleteText = extractTextFromSSE(secondChunks); + + const fullExpectedText = + "This is a much longer response that will be streamed in multiple chunks. It contains enough text to demonstrate the chunking behavior of the resumable streaming system. The response continues with more content to ensure we have sufficient data for testing resumption scenarios."; + + expect(resumedCompleteText).toBe(fullExpectedText); + + // the resumed stream should start from the beginning, not where we left off + expect(resumedCompleteText.startsWith(firstPartialText)).toBe(true); + expect(fullExpectedText.startsWith(firstPartialText)).toBe(true); + }); + + it("should maintain exact chunk sequence and prevent duplicates", async () => { + const customStreamId = `sequence-test-${nanoid()}`; + + // Start a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Read first few chunks and track exact content + const { chunks: firstRawChunks, reader } = await readPartialStreamChunks( + response1, + 4 + ); + await reader.cancel(); + + // Resume and read complete stream + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const secondRawChunks = await readStreamChunks(response2); + + // Verify the first chunks appear at the start of the resumed stream + const firstChunksText = firstRawChunks.join(""); + const secondChunksText = secondRawChunks.join(""); + + // the complete stream must start with exactly what we saw before + expect(secondChunksText.startsWith(firstChunksText)).toBe(true); + + const firstText = extractTextFromSSE(firstRawChunks); + const completeText = extractTextFromSSE(secondRawChunks); + + expect(completeText.startsWith(firstText)).toBe(true); + expect(completeText).toBe( + "This is a much longer response that will be streamed in multiple chunks. It contains enough text to demonstrate the chunking behavior of the resumable streaming system. The response continues with more content to ensure we have sufficient data for testing resumption scenarios." + ); + }); + + it("should handle resumption of completed stream", async () => { + const customStreamId = `completed-test-${nanoid()}`; + + // Start and complete a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read the entire stream to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + // Try to resume the completed stream + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + + // Should get the full content again + const chunks: string[] = []; + if (response2.body) { + const reader = response2.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value); + chunks.push(chunk); + } + } finally { + reader.releaseLock(); + } + } + + const text = extractTextFromSSE(chunks); + expect(text).toContain("This is a test response"); + }); +}); + +describe("Resumable Streaming - Multiple Clients", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should support multiple clients connecting to same stream", async () => { + const customStreamId = `multi-client-${nanoid()}`; + + // Client 1 starts the stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + + // Client 2 connects to the same stream while it's active + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + + // Both should receive content + const readBoth = async () => { + const promises = [response1, response2].map(async (response, index) => { + const chunks: string[] = []; + if (response.body) { + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + try { + // Read a few chunks to verify both are receiving data + for (let i = 0; i < 3; i++) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value); + chunks.push(chunk); + } + } finally { + reader.releaseLock(); + } + } + return { client: index + 1, chunks }; + }); + + return Promise.all(promises); + }; + + const results = await readBoth(); + + // Both clients should receive data + expect(results[0].chunks.length).toBeGreaterThan(0); + expect(results[1].chunks.length).toBeGreaterThan(0); + + // Extract text from both clients + const text1 = extractTextFromSSE(results[0].chunks); + const text2 = extractTextFromSSE(results[1].chunks); + + expect(text1.length).toBeGreaterThan(0); + expect(text2.length).toBeGreaterThan(0); + }); + + it("should ensure all chunks persist in correct sequence across multiple interruptions", async () => { + const customStreamId = `chunk-persistence-${nanoid()}`; + + // Start a stream that will be interrupted multiple times + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Interrupt after reading some chunks + const { chunks: chunk1, reader: reader1 } = await readPartialStreamChunks( + response1, + 2 + ); + const text1 = extractTextFromSSE(chunk1); + await reader1.cancel(); + + // Wait a bit to let the stream complete in background + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Resume and read from completed stream (should have all chunks now) + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const chunk2 = await readStreamChunks(response2); + const text2 = extractTextFromSSE(chunk2); + + // Final resume; should get complete stream + const response3 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const chunk3 = await readStreamChunks(response3); + const text3 = extractTextFromSSE(chunk3); + + const expectedComplete = + "This is a much longer response that will be streamed in multiple chunks. It contains enough text to demonstrate the chunking behavior of the resumable streaming system. The response continues with more content to ensure we have sufficient data for testing resumption scenarios."; + + // every resume should contain the complete response from the beginning + expect(text3).toBe(expectedComplete); + expect(text2).toBe(expectedComplete); // Second resume was also complete + + // earlier partial reads should be proper prefixes + expect(expectedComplete.startsWith(text1)).toBe(true); + expect(text1.length).toBeGreaterThan(0); + expect(text1.length).toBeLessThan(expectedComplete.length); + }); +}); + +describe("Resumable Streaming - Persistence and Cleanup", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should persist stream data across agent hibernation and restarts", async () => { + const customStreamId = `persist-test-${nanoid()}`; + const originalAgentId = `persist-test-agent-${nanoid()}`; + + // Start a stream and read partial content + const response1 = await makeRequest(originalAgentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + + // Read partial content and track what we received + const { chunks: firstChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + const firstText = extractTextFromSSE(firstChunks); + await reader.cancel(); // Simulate client disconnect + + expect(firstText.length).toBeGreaterThan(0); + + const hibernatedResponse = await makeRequest( + originalAgentId, + "GET", + `/stream/${customStreamId}` + ); + expect(hibernatedResponse.status).toBe(200); + + // the hibernated agent should replay the stream from the beginning + const hibernatedChunks = await readStreamChunks(hibernatedResponse); + const hibernatedText = extractTextFromSSE(hibernatedChunks); + + // stream should contain the content we saw before AND be complete + expect(hibernatedText).toContain(firstText); + expect(hibernatedText).toBe( + "This is a test response for resumable streaming." + ); + + expect(hibernatedText.startsWith(firstText)).toBe(true); + }); + + it("should clean up old streams", async () => { + const streamId1 = `cleanup-test-1-${nanoid()}`; + const streamId2 = `cleanup-test-2-${nanoid()}`; + + // Create multiple streams + await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: streamId1 + }); + + await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg2", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: streamId2 + }); + + // Clear all streams + const clearResponse = await makeRequest(agentId, "DELETE", "/messages"); + expect(clearResponse.status).toBe(200); + + // should return 404 since clearStreams() deletes all stream records + const status1 = await makeRequest( + agentId, + "GET", + `/stream/${streamId1}/status` + ); + const status2 = await makeRequest( + agentId, + "GET", + `/stream/${streamId2}/status` + ); + + expect(status1.status).toBe(404); + expect(status2.status).toBe(404); + + const status1Body = await status1.json(); + const status2Body = await status2.json(); + expect(status1Body).toEqual({ error: "Stream not found" }); + expect(status2Body).toEqual({ error: "Stream not found" }); + }); +}); + +describe("Resumable Streaming - Data Integrity", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should maintain data integrity across resumptions", async () => { + const customStreamId = `integrity-test-${nanoid()}`; + + // Start a stream with known content + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Read partial content + const { chunks: partialChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + reader.releaseLock(); + + // Resume and read the rest + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const fullChunks: string[] = []; + + if (response2.body) { + const reader2 = response2.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader2.read(); + if (done) break; + + const chunk = decoder.decode(value); + fullChunks.push(chunk); + } + } finally { + reader2.releaseLock(); + } + } + + // Start a fresh stream to get the complete content + const response3 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg2", role: "user", parts: [{ type: "text", text: "long" }] } + ] + }); + + const completeChunks: string[] = []; + if (response3.body) { + const reader3 = response3.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader3.read(); + if (done) break; + + const chunk = decoder.decode(value); + completeChunks.push(chunk); + } + } finally { + reader3.releaseLock(); + } + } + + // Extract text from all scenarios + const partialText = extractTextFromSSE(partialChunks); + const resumedFullText = extractTextFromSSE(fullChunks); + const completeText = extractTextFromSSE(completeChunks); + + // The resumed stream should contain the original content + expect(resumedFullText).toContain("This is a much longer response"); + expect(completeText).toContain("This is a much longer response"); + + expect(partialText.length).toBeGreaterThan(0); + expect(resumedFullText.length).toBeGreaterThan(0); + }); + + it("should handle concurrent stream access safely", async () => { + const customStreamId = `concurrent-test-${nanoid()}`; + + // Start multiple concurrent requests to the same stream + const promises = Array.from({ length: 3 }, (_, index) => + makeRequest(agentId, "POST", "/chat", { + messages: [ + { + id: `msg${index}`, + role: "user", + parts: [{ type: "text", text: "test" }] + } + ], + streamId: customStreamId + }) + ); + + const responses = await Promise.all(promises); + + // All should succeed + responses.forEach((response) => { + expect(response.status).toBe(200); + expect(response.headers.get("X-Stream-Id")).toBe(customStreamId); + }); + + // Read content from all responses + const contentPromises = responses.map(async (response) => { + const chunks: string[] = []; + if (response.body) { + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value); + chunks.push(chunk); + } + } finally { + reader.releaseLock(); + } + } + return extractTextFromSSE(chunks); + }); + + const contents = await Promise.all(contentPromises); + + // All should receive content + contents.forEach((content) => { + expect(content.length).toBeGreaterThan(0); + }); + }); +}); + +describe("Resumable Streaming - Cancellation", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should cancel an active stream, mark it completed, and stop further output", async () => { + const customStreamId = `cancel-test-${nanoid()}`; + + // Start a long stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + expect(response1.headers.get("X-Stream-Id")).toBe(customStreamId); + + // Read a few chunks to accumulate partial text + const { chunks: partialChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + const partialText = extractTextFromSSE(partialChunks); + expect(partialText.length).toBeGreaterThan(0); + + // Cancel the stream + const cancelResp = await makeRequest( + agentId, + "POST", + `/stream/${customStreamId}/cancel` + ); + expect(cancelResp.status).toBe(200); + + // Ensure original reader is canceled/closed + try { + await reader.cancel(); + } catch {} + + // Status should show completed + const statusResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + expect(statusResp.status).toBe(200); + const status = (await statusResp.json()) as { completed: boolean }; + expect(status.completed).toBe(true); + + // Resuming should immediately complete and return only persisted partial deltas + const resumeResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(resumeResp.status).toBe(200); + expect(resumeResp.headers.get("X-Stream-Complete")).toBe("true"); + + const resumedChunks = await readStreamChunks(resumeResp); + const resumedText = extractTextFromSSE(resumedChunks); + // After cancellation, no more output should be streamed; however, + // persisted deltas might slightly exceed what this client read before cancel. + // Ensure at least the previously seen partial text is included and no streaming continues beyond persisted data. + expect(resumedText.startsWith(partialText)).toBe(true); + }); +}); + +describe("Resumable Streaming - Error Handling and Edge Cases", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should return 404 for unknown stream ids", async () => { + const response = await makeRequest( + agentId, + "GET", + `/stream/nonexistent-${nanoid()}` + ); + expect(response.status).toBe(404); + + const body = await response.json(); + expect(body).toHaveProperty("error"); + }); + + it("should handle request timeout with AbortController", async () => { + const customStreamId = `timeout-test-${nanoid()}`; + const controller = new AbortController(); + + const ctx = createExecutionContext(); + const url = `http://example.com/agents/resumable-stream-agent/${agentId}/chat`; + const request = new Request(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }), + signal: controller.signal + }); + + const p = worker.fetch(request, env, ctx); + setTimeout(() => controller.abort(), 10); + + await expect(p).resolves.toBeDefined(); + }); + + it("should validate exact text ordering in deterministic streams", async () => { + const customStreamId = `ordering-test-${nanoid()}`; + + const response = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + const chunks: string[] = []; + if (response.body) { + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + chunks.push(chunk); + } + } finally { + reader.releaseLock(); + } + } + + const fullText = extractTextFromSSE(chunks); + expect(fullText).toBe("This is a test response for resumable streaming."); + }); + + it("should provide consistent content when resuming interrupted streams", async () => { + const customStreamId = `dedup-test-${nanoid()}`; + + // Start stream and read partial content + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + const { chunks: firstChunks, reader } = await readPartialStreamChunks( + response1, + 3 + ); + const firstText = extractTextFromSSE(firstChunks); + await reader.cancel(); + + // Resume the stream + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + + const secondChunks: string[] = []; + if (response2.body) { + const reader2 = response2.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader2.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + secondChunks.push(chunk); + } + } finally { + reader2.releaseLock(); + } + } + + const secondText = extractTextFromSSE(secondChunks); + + // Verify both streams provide content and the resumed stream contains the expected response + expect(firstText.length).toBeGreaterThan(0); + expect(secondText.length).toBeGreaterThan(0); + expect(secondText).toContain("This is a much longer response"); + }); + + it("should handle partial SSE frames and reconstruct JSON correctly", async () => { + const customStreamId = `partial-sse-${nanoid()}`; + + // Start a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + + // Read raw chunks until we find one that ends with a partial JSON line + const rawChunks: string[] = []; + let foundPartialJSON = false; + + if (response1.body) { + const reader = response1.body.getReader(); + const decoder = new TextDecoder(); + + try { + while (!foundPartialJSON) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + rawChunks.push(chunk); + const lines = chunk.split("\n"); + for (const line of lines) { + if (line.startsWith("data: {") && !line.includes("}")) { + foundPartialJSON = true; + break; + } + } + + if (rawChunks.length > 10) break; + } + await reader.cancel(); + } catch (_error) { + // Expected when canceling + } + } + + // Extract what we got from the interrupted stream + const partialText = extractTextFromSSE(rawChunks); + expect(partialText.length).toBeGreaterThan(0); + + // Resume the stream - this should handle the partial JSON correctly + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + + // Read the complete resumed stream + const resumedChunks = await readStreamChunks(response2); + const completeText = extractTextFromSSE(resumedChunks); + + const expectedCompleteText = + "This is a much longer response that will be streamed in multiple chunks. It contains enough text to demonstrate the chunking behavior of the resumable streaming system. The response continues with more content to ensure we have sufficient data for testing resumption scenarios."; + + expect(completeText).toBe(expectedCompleteText); + + expect(expectedCompleteText.startsWith(partialText)).toBe(true); + + expect(completeText.length).toBeGreaterThan(partialText.length); + }); + + it("should handle concurrent POST requests to same streamId", async () => { + const customStreamId = `concurrent-post-${nanoid()}`; + + // Start multiple concurrent POST requests to the same stream ID + const promises = Array.from({ length: 3 }, (_, index) => + makeRequest(agentId, "POST", "/chat", { + messages: [ + { + id: `msg${index}`, + role: "user", + parts: [{ type: "text", text: "test" }] + } + ], + streamId: customStreamId + }) + ); + + const responses = await Promise.all(promises); + const successCount = responses.filter((r) => r.status === 200).length; + expect(successCount).toBeGreaterThan(0); + + // All responses should have the same stream ID + responses.forEach((response) => { + if (response.status === 200) { + expect(response.headers.get("X-Stream-Id")).toBe(customStreamId); + } + }); + }); +}); + +describe("Resumable Streaming - Stream Status", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should return stream status for active stream", async () => { + const customStreamId = `status-test-${nanoid()}`; + + // Start a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + + // Check status while stream is active + const statusResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + expect(statusResp.status).toBe(200); + + const status = (await statusResp.json()) as { + position: number; + completed: boolean; + }; + expect(status).toHaveProperty("position"); + expect(status).toHaveProperty("completed"); + expect(typeof status.position).toBe("number"); + expect(typeof status.completed).toBe("boolean"); + }); + + it("should return completed status after stream finishes", async () => { + const customStreamId = `status-complete-${nanoid()}`; + + // Start and complete a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + // Wait a bit for status to update + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Check status + const statusResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + expect(statusResp.status).toBe(200); + + const status = (await statusResp.json()) as { completed: boolean }; + expect(status.completed).toBe(true); + }); + + it("should track position correctly during streaming", async () => { + const customStreamId = `position-test-${nanoid()}`; + + // Start a stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Read some chunks + const { chunks: firstChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + const _firstText = extractTextFromSSE(firstChunks); + + // Check position + const statusResp1 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + const status1 = (await statusResp1.json()) as { position: number }; + expect(status1.position).toBeGreaterThan(0); + + // Read more chunks from the same reader + const secondChunks = await readMoreChunksFromReader(reader, 2); + const _secondText = extractTextFromSSE(secondChunks); + + // Position should have increased + const statusResp2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + const status2 = (await statusResp2.json()) as { position: number }; + expect(status2.position).toBeGreaterThanOrEqual(status1.position); + + await reader.cancel(); + }); + + it("should return 404 for status of non-existent stream", async () => { + const response = await makeRequest( + agentId, + "GET", + `/stream/nonexistent-${nanoid()}/status` + ); + expect(response.status).toBe(404); + + const body = await response.json(); + expect(body).toHaveProperty("error"); + }); +}); + +describe("Resumable Streaming - Stream ID Management", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should auto-generate stream ID when not provided", async () => { + // Start stream without streamId + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ] + }); + + expect(response1.status).toBe(200); + const streamId = response1.headers.get("X-Stream-Id"); + expect(streamId).toBeDefined(); + expect(streamId?.length).toBeGreaterThan(0); + }); + + it("should use provided stream ID when specified", async () => { + const customStreamId = `custom-${nanoid()}`; + + const response = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + expect(response.status).toBe(200); + expect(response.headers.get("X-Stream-Id")).toBe(customStreamId); + }); + + it("should handle multiple streams with different IDs", async () => { + const streamId1 = `multi-1-${nanoid()}`; + const streamId2 = `multi-2-${nanoid()}`; + + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: streamId1 + }); + + const response2 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg2", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: streamId2 + }); + + expect(response1.status).toBe(200); + expect(response2.status).toBe(200); + expect(response1.headers.get("X-Stream-Id")).toBe(streamId1); + expect(response2.headers.get("X-Stream-Id")).toBe(streamId2); + + // Both should be independently resumable + const resume1 = await makeRequest(agentId, "GET", `/stream/${streamId1}`); + const resume2 = await makeRequest(agentId, "GET", `/stream/${streamId2}`); + + expect(resume1.status).toBe(200); + expect(resume2.status).toBe(200); + }); + + it("should handle stream ID reuse correctly", async () => { + const customStreamId = `reuse-${nanoid()}`; + + // First stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + // Wait for completion + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Reuse same stream ID for new stream + const response2 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg2", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + expect(response2.status).toBe(200); + expect(response2.headers.get("X-Stream-Id")).toBe(customStreamId); + }); +}); + +describe("Resumable Streaming - Multiple Resumptions", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should allow multiple sequential resumptions", async () => { + const customStreamId = `multi-resume-${nanoid()}`; + + // Start stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Read partial and cancel + const { chunks: chunks1, reader: reader1 } = await readPartialStreamChunks( + response1, + 2 + ); + const text1 = extractTextFromSSE(chunks1); + await reader1.cancel(); + + // First resume - read partial again + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const { chunks: chunks2, reader: reader2 } = await readPartialStreamChunks( + response2, + 2 + ); + const text2 = extractTextFromSSE(chunks2); + await reader2.cancel(); + + // Second resume - read partial again + const response3 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + const { chunks: chunks3, reader: reader3 } = await readPartialStreamChunks( + response3, + 2 + ); + const text3 = extractTextFromSSE(chunks3); + await reader3.cancel(); + + // All should start from beginning + expect(text1.length).toBeGreaterThan(0); + expect(text2.length).toBeGreaterThan(0); + expect(text3.length).toBeGreaterThan(0); + }); + + it("should handle concurrent resumptions of same stream", async () => { + const customStreamId = `concurrent-resume-${nanoid()}`; + + // Start and complete stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Concurrent resumptions + const promises = Array.from({ length: 3 }, () => + makeRequest(agentId, "GET", `/stream/${customStreamId}`) + ); + + const responses = await Promise.all(promises); + + // All should succeed + responses.forEach((response) => { + expect(response.status).toBe(200); + expect(response.headers.get("X-Stream-Id")).toBe(customStreamId); + }); + + // All should return same content + const contents = await Promise.all( + responses.map(async (response) => { + const chunks = await readStreamChunks(response); + return extractTextFromSSE(chunks); + }) + ); + + contents.forEach((content, index) => { + expect(content.length).toBeGreaterThan(0); + if (index > 0) { + expect(content).toBe(contents[0]); + } + }); + }); +}); + +describe("Resumable Streaming - Message Persistence", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should persist messages with stream", async () => { + const customStreamId = `messages-${nanoid()}`; + + // Start stream with messages + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "hello" }] } + ], + streamId: customStreamId + }); + + expect(response1.status).toBe(200); + + // Read partial and cancel + const { reader } = await readPartialStreamChunks(response1, 1); + await reader.cancel(); + + // Get messages + const messagesResp = await makeRequest(agentId, "GET", "/messages"); + expect(messagesResp.status).toBe(200); + + const data = (await messagesResp.json()) as { messages?: unknown[] }; + expect(data.messages).toBeDefined(); + expect(Array.isArray(data.messages)).toBe(true); + }); + + it("should maintain message history after stream completion", async () => { + const customStreamId = `history-${nanoid()}`; + + // Start and complete stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Get messages + const messagesResp = await makeRequest(agentId, "GET", "/messages"); + expect(messagesResp.status).toBe(200); + + const data = (await messagesResp.json()) as { messages?: unknown[] }; + expect(data.messages).toBeDefined(); + expect(Array.isArray(data.messages)).toBe(true); + expect(data.messages?.length).toBeGreaterThan(0); + }); + + it("should clear messages and streams together", async () => { + const customStreamId = `clear-${nanoid()}`; + + // Create stream + await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Clear messages + const clearResp = await makeRequest(agentId, "DELETE", "/messages"); + expect(clearResp.status).toBe(200); + + // Stream should no longer exist + const statusResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + expect(statusResp.status).toBe(404); + + // Messages should be cleared + const messagesResp = await makeRequest(agentId, "GET", "/messages"); + expect(messagesResp.status).toBe(200); + const data = (await messagesResp.json()) as { messages?: unknown[] }; + expect(data.messages?.length ?? 0).toBe(0); + }); +}); + +describe("Resumable Streaming - Response Headers and Metadata", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should include correct SSE headers", async () => { + const response = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ] + }); + + expect(response.status).toBe(200); + expect(response.headers.get("Content-Type")).toContain("text/event-stream"); + expect(response.headers.get("Cache-Control")).toContain("no-cache"); + expect(response.headers.get("X-Stream-Id")).toBeDefined(); + }); + + it("should include X-Stream-Complete header for completed streams", async () => { + const customStreamId = `complete-header-${nanoid()}`; + + // Start and complete stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId + }); + + // Read to completion + if (response1.body) { + const reader = response1.body.getReader(); + try { + while (true) { + const { done } = await reader.read(); + if (done) break; + } + } finally { + reader.releaseLock(); + } + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Resume completed stream + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + + // Should have completion header + const isComplete = response2.headers.get("X-Stream-Complete"); + expect(isComplete).toBe("true"); + }); + + it("should include X-Messages header when requested", async () => { + const customStreamId = `messages-header-${nanoid()}`; + + // Start stream with includeMessages + const response = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "test" }] } + ], + streamId: customStreamId, + includeMessages: true + }); + + expect(response.status).toBe(200); + // Note: X-Messages header may or may not be present depending on implementation + // This test verifies the request is handled correctly + }); +}); + +describe("Resumable Streaming - Invalid Requests and Edge Cases", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should handle malformed JSON in request body", async () => { + const ctx = createExecutionContext(); + const url = `http://example.com/agents/resumable-stream-agent/${agentId}/chat`; + const request = new Request(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "{ invalid json }" + }); + + const response = await worker.fetch(request, env, ctx); + // Should handle error gracefully + expect(response.status).toBeGreaterThanOrEqual(400); + }); + + it("should handle missing messages array", async () => { + const response = await makeRequest(agentId, "POST", "/chat", { + streamId: "test-id" + }); + + // Should handle missing messages gracefully + expect(response.status).toBeGreaterThanOrEqual(400); + }); + + it("should handle empty messages array", async () => { + const response = await makeRequest(agentId, "POST", "/chat", { + messages: [], + streamId: "test-id" + }); + + // Should handle empty messages + expect(response.status).toBeGreaterThanOrEqual(200); + }); + + it("should handle cancel on non-existent stream", async () => { + const response = await makeRequest( + agentId, + "POST", + `/stream/nonexistent-${nanoid()}/cancel` + ); + + // Should return appropriate status + expect([200, 404]).toContain(response.status); + }); + + // skipping this atm + it.skip("should handle invalid stream ID format", async () => { + // Try with empty string + const response1 = await makeRequest(agentId, "GET", "/stream/"); + expect(response1.status).toBeGreaterThanOrEqual(400); + + // Try with special characters + const response2 = await makeRequest( + agentId, + "GET", + "/stream/../../../etc/passwd" + ); + expect(response2.status).toBeGreaterThanOrEqual(400); + }); + + it("should handle very long stream IDs", async () => { + const longStreamId = "a".repeat(1000); + const response = await makeRequest( + agentId, + "GET", + `/stream/${longStreamId}` + ); + // Should handle gracefully (either 404 or reject) + expect(response.status).toBeGreaterThanOrEqual(400); + }); +}); + +describe("Resumable Streaming - Stream Lifecycle", () => { + let agentId: string; + + beforeEach(async () => { + agentId = `test-${nanoid()}`; + }); + + it("should complete full lifecycle: start -> interrupt -> resume -> complete", async () => { + const customStreamId = `lifecycle-${nanoid()}`; + + // 1. Start stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + expect(response1.status).toBe(200); + expect(response1.headers.get("X-Stream-Id")).toBe(customStreamId); + + // 2. Interrupt + const { chunks: partialChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + const partialText = extractTextFromSSE(partialChunks); + expect(partialText.length).toBeGreaterThan(0); + await reader.cancel(); + + // 3. Check status (should be active) + const statusResp1 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + expect(statusResp1.status).toBe(200); + const status1 = (await statusResp1.json()) as { completed: boolean }; + expect(status1.completed).toBe(false); + + // 4. Resume + const response2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(response2.status).toBe(200); + + // 5. Read to completion + const completeChunks = await readStreamChunks(response2); + const completeText = extractTextFromSSE(completeChunks); + + // 6. Verify completion + await new Promise((resolve) => setTimeout(resolve, 100)); + const statusResp2 = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + const status2 = (await statusResp2.json()) as { completed: boolean }; + expect(status2.completed).toBe(true); + + // 7. Verify content + expect(completeText.length).toBeGreaterThan(partialText.length); + expect(completeText.startsWith(partialText)).toBe(true); + }); + + it("should handle cancel in middle of lifecycle", async () => { + const customStreamId = `cancel-lifecycle-${nanoid()}`; + + // Start stream + const response1 = await makeRequest(agentId, "POST", "/chat", { + messages: [ + { id: "msg1", role: "user", parts: [{ type: "text", text: "long" }] } + ], + streamId: customStreamId + }); + + // Read partial + const { chunks: partialChunks, reader } = await readPartialStreamChunks( + response1, + 2 + ); + const partialText = extractTextFromSSE(partialChunks); + + // Cancel + const cancelResp = await makeRequest( + agentId, + "POST", + `/stream/${customStreamId}/cancel` + ); + expect(cancelResp.status).toBe(200); + + await reader.cancel(); + + // Status should show completed + await new Promise((resolve) => setTimeout(resolve, 100)); + const statusResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}/status` + ); + const status = (await statusResp.json()) as { completed: boolean }; + expect(status.completed).toBe(true); + + // Resume should return only persisted content + const resumeResp = await makeRequest( + agentId, + "GET", + `/stream/${customStreamId}` + ); + expect(resumeResp.headers.get("X-Stream-Complete")).toBe("true"); + const resumedChunks = await readStreamChunks(resumeResp); + const resumedText = extractTextFromSSE(resumedChunks); + expect(resumedText.startsWith(partialText)).toBe(true); + }); +}); diff --git a/packages/agents/src/tests/worker.ts b/packages/agents/src/tests/worker.ts index bbc16e55..4988750c 100644 --- a/packages/agents/src/tests/worker.ts +++ b/packages/agents/src/tests/worker.ts @@ -11,8 +11,13 @@ import { type WSMessage } from "../index.ts"; import { AIChatAgent } from "../ai-chat-agent.ts"; -import type { UIMessage as ChatMessage } from "ai"; +import type { + UIMessage as ChatMessage, + StreamTextOnFinishCallback, + ToolSet +} from "ai"; import type { MCPClientConnection } from "../mcp/client-connection"; +import { AIHttpChatAgent } from "../ai-chat-agent-http.ts"; interface ToolCallPart { type: string; @@ -30,6 +35,7 @@ export type Env = { TestChatAgent: DurableObjectNamespace; TestOAuthAgent: DurableObjectNamespace; TEST_MCP_JURISDICTION: DurableObjectNamespace; + ResumableStreamAgent: DurableObjectNamespace; }; type State = unknown; @@ -390,6 +396,132 @@ export class TestMcpJurisdiction extends McpAgent { } } +// Test agent for resumable streaming functionality +export class TestResumableStreamAgent extends AIHttpChatAgent< + Env, + unknown, + ChatMessage +> { + // Mock AI response for testing + private mockResponses: Map = new Map(); + + // Track requests for testing + requestHistory: Array<{ method: string; url: string; body?: unknown }> = []; + + constructor(ctx: DurableObjectState, env: Env) { + super(ctx, env); + + // Set up some mock responses + this.mockResponses.set("hello", "Hello! How can I help you today?"); + this.mockResponses.set( + "test", + "This is a test response for resumable streaming." + ); + this.mockResponses.set( + "long", + "This is a much longer response that will be streamed in multiple chunks. It contains enough text to demonstrate the chunking behavior of the resumable streaming system. The response continues with more content to ensure we have sufficient data for testing resumption scenarios." + ); + } + + async onChatMessage( + onFinish: StreamTextOnFinishCallback, + options?: { streamId?: string } + ): Promise { + // Track the request + this.requestHistory.push({ + method: "chat", + url: options?.streamId || "unknown", + body: { messages: this.messages } + }); + + // Get the last user message + const lastMessage = this.messages.filter((m) => m.role === "user").pop(); + + if (!lastMessage || !lastMessage.parts) { + return new Response("No message content", { status: 400 }); + } + + const content = lastMessage.parts + .filter((p) => p.type === "text") + .map((p) => p.text) + .join(" "); + + // Find mock response or use default + const responseText = + this.mockResponses.get(content.toLowerCase()) || `Echo: ${content}`; + + // Create a streaming response + const stream = new ReadableStream({ + async start(controller) { + const chunks = responseText.match(/.{1,10}/g) || [responseText]; + + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; + const sseData = `data: ${JSON.stringify({ + type: "text-delta", + delta: chunk + })}\n\n`; + + controller.enqueue(new TextEncoder().encode(sseData)); + + // Small delay to simulate real streaming + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // End the stream + controller.enqueue(new TextEncoder().encode("data: [DONE]\n\n")); + controller.close(); + + // Call onFinish with mock result for testing + await onFinish({ + text: responseText, + toolCalls: [], + toolResults: [], + finishReason: "stop", + usage: { + promptTokens: 0, + completionTokens: 0, + totalTokens: 0 + }, + rawResponse: { + headers: {} + }, + warnings: [] + } as unknown as Parameters[0]); + } + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive" + } + }); + } + + // Helper method to set mock response for testing + setMockResponse(input: string, response: string) { + this.mockResponses.set(input.toLowerCase(), response); + } + + // Helper method to get request history for testing + getRequestHistory() { + return [...this.requestHistory]; + } + + // Helper method to clear history + clearHistory() { + this.requestHistory = []; + this.messages = []; + } + + override onError(error: unknown): void { + // Don't console.error in tests to avoid queueMicrotask issues + throw error; + } +} + export default { async fetch(request: Request, env: Env, ctx: ExecutionContext) { const url = new URL(request.url); diff --git a/packages/agents/src/tests/wrangler.jsonc b/packages/agents/src/tests/wrangler.jsonc index 25a8f3e1..fcd6651e 100644 --- a/packages/agents/src/tests/wrangler.jsonc +++ b/packages/agents/src/tests/wrangler.jsonc @@ -8,6 +8,11 @@ "enable_nodejs_http_modules", "enable_nodejs_perf_hooks_module" ], + + "observability": { + "enabled": true, + "head_sampling_rate": 1 + }, "durable_objects": { "bindings": [ { @@ -38,6 +43,10 @@ { "class_name": "TestMcpJurisdiction", "name": "TEST_MCP_JURISDICTION" + }, + { + "class_name": "TestResumableStreamAgent", + "name": "ResumableStreamAgent" } ] }, @@ -52,7 +61,8 @@ "TestRaceAgent", "TestChatAgent", "TestOAuthAgent", - "TestMcpJurisdiction" + "TestMcpJurisdiction", + "TestResumableStreamAgent" ], "tag": "v1" } diff --git a/packages/agents/src/use-agent-chat-http.tsx b/packages/agents/src/use-agent-chat-http.tsx new file mode 100644 index 00000000..63194310 --- /dev/null +++ b/packages/agents/src/use-agent-chat-http.tsx @@ -0,0 +1,779 @@ +import { useChat, type UseChatOptions } from "@ai-sdk/react"; +import { getToolName, isToolUIPart } from "ai"; +import type { + ChatInit, + ChatTransport, + UIMessage as Message, + UIMessage +} from "ai"; +import { DefaultChatTransport } from "ai"; +import { use, useEffect, useRef, useState, useCallback } from "react"; +import { detectToolsRequiringConfirmation, type AITool } from "./ai-react"; + +type GetInitialMessagesOptions = { + agent: string; + name: string; + url: string; +}; + +// v5 useChat parameters +type UseChatParams = ChatInit & + UseChatOptions; + +/** + * Options for the useAgentChatHttp hook + */ +type UseAgentChatHttpOptions< + _State, + ChatMessage extends UIMessage = UIMessage +> = Omit, "fetch"> & { + /** Agent name for HTTP requests */ + agent: string; + /** Agent instance ID (defaults to "default") */ + id?: string; + /** Polling interval for "poke and pull" updates in milliseconds */ + pollingInterval?: number; + /** Enable resumable streams */ + enableResumableStreams?: boolean; + getInitialMessages?: + | undefined + | null + | ((options: GetInitialMessagesOptions) => Promise); + /** Request credentials */ + credentials?: RequestCredentials; + /** Request headers */ + headers?: HeadersInit; + /** + * @description Whether to automatically resolve tool calls that do not require human interaction. + * @experimental + */ + experimental_automaticToolResolution?: boolean; + /** + * @description Tools object for automatic detection of confirmation requirements. + * Tools without execute function will require confirmation. + */ + tools?: Record>; + /** + * @description Manual override for tools requiring confirmation. + * If not provided, will auto-detect from tools object. + */ + toolsRequiringConfirmation?: string[]; + /** + * When true (default), automatically sends the next message only after + * all pending confirmation-required tool calls have been resolved. + * @default true + */ + autoSendAfterAllConfirmationsResolved?: boolean; +}; + +const requestCache = new Map>(); + +/** + * HTTP-based React hook for building AI chat interfaces with resumable streams + * Uses "poke and pull" pattern instead of WebSockets for real-time updates + * @param options Chat options including the agent URL + * @returns Chat interface controls and state with added stream management + */ +export function useAgentChatHttp< + State = unknown, + ChatMessage extends UIMessage = UIMessage +>( + options: UseAgentChatHttpOptions +): ReturnType> & { + clearChatHistory: () => Promise; + resumeStream: (streamId: string) => Promise; + cancelStream: (streamId: string) => Promise; +} { + const { + agent, + id = "default", + pollingInterval = 2000, // Default 2 second polling + enableResumableStreams = true, + getInitialMessages, + messages: optionsInitialMessages, + experimental_automaticToolResolution, + tools, + toolsRequiringConfirmation: manualToolsRequiringConfirmation, + autoSendAfterAllConfirmationsResolved = true, + credentials, + headers, + ...rest + } = options; + + // Auto-detect tools requiring confirmation, or use manual override + const toolsRequiringConfirmation = + manualToolsRequiringConfirmation ?? detectToolsRequiringConfirmation(tools); + + const [currentStreamId, setCurrentStreamId] = useState(null); + const pollingIntervalRef = useRef(null); + const lastPollingTime = useRef(0); + const chatHelpersRef = useRef> | null>( + null + ); + + // Construct agent URL from agent name and id + const normalizedAgentUrl = `/agents/${agent}/${id}`; + + // Storage key for stream persistence + const storageKey = `agent_stream_${agent}_${id}`; + + async function defaultGetInitialMessagesFetch({ + url + }: GetInitialMessagesOptions) { + const getMessagesUrl = `${url}/messages`; + const response = await fetch(getMessagesUrl, { + credentials, + headers + }); + + if (!response.ok) { + console.warn( + `Failed to fetch initial messages: ${response.status} ${response.statusText}` + ); + return []; + } + + const data = (await response.json()) as { messages?: ChatMessage[] }; + return data.messages || []; + } + + const getInitialMessagesFetch = + getInitialMessages || defaultGetInitialMessagesFetch; + + function doGetInitialMessages( + getInitialMessagesOptions: GetInitialMessagesOptions + ) { + const cacheKey = `${normalizedAgentUrl}_messages`; + if (requestCache.has(cacheKey)) { + return requestCache.get(cacheKey)! as Promise; + } + const promise = getInitialMessagesFetch(getInitialMessagesOptions); + requestCache.set(cacheKey, promise); + return promise; + } + + const initialMessagesPromise = + getInitialMessages === null + ? null + : doGetInitialMessages({ + agent: "http-chat", // Generic agent name for HTTP mode + name: "http-chat", + url: normalizedAgentUrl + }); + + const initialMessages = initialMessagesPromise + ? use(initialMessagesPromise) + : (optionsInitialMessages ?? []); + + useEffect(() => { + if (!initialMessagesPromise) { + return; + } + const cacheKey = `${normalizedAgentUrl}_messages`; + requestCache.set(cacheKey, initialMessagesPromise!); + return () => { + if (requestCache.get(cacheKey) === initialMessagesPromise) { + requestCache.delete(cacheKey); + } + }; + }, [normalizedAgentUrl, initialMessagesPromise]); + + /** + * Custom fetch function for HTTP-based chat + */ + async function httpChatFetch( + _request: RequestInfo | URL, + options: RequestInit = {} + ) { + const chatUrl = `${normalizedAgentUrl}/chat`; + + // Parse the request body to check if we should include a stream ID + let body = {}; + if (options.body) { + try { + body = JSON.parse(options.body as string); + } catch { + // Not JSON, use as is + } + } + + // Include current stream ID if we have one (for reconnection) + if (currentStreamId && typeof body === "object") { + body = { ...body, streamId: currentStreamId }; + } + + // Modify the request to use our HTTP endpoint + const modifiedOptions = { + ...options, + body: JSON.stringify(body), + credentials, + headers: { + ...headers, + ...options.headers, + "Content-Type": "application/json" + } + }; + + const response = await fetch(chatUrl, modifiedOptions); + + if (enableResumableStreams) { + const streamId = response.headers.get("X-Stream-Id"); + if (streamId) { + // Store stream ID for persistence + setCurrentStreamId(streamId); + if (typeof window !== "undefined") { + sessionStorage.setItem(storageKey, streamId); + } + // Polling will start automatically via useEffect watching currentStreamId + } + } + + return response; + } + + const customTransport: ChatTransport = { + sendMessages: async ( + options: Parameters[0] + ) => { + const transport = new DefaultChatTransport({ + api: normalizedAgentUrl, + fetch: httpChatFetch + }); + return transport.sendMessages(options); + }, + reconnectToStream: async ( + options: Parameters< + typeof DefaultChatTransport.prototype.reconnectToStream + >[0] + ) => { + const transport = new DefaultChatTransport({ + api: normalizedAgentUrl, + fetch: httpChatFetch + }); + return transport.reconnectToStream(options); + } + }; + + const useChatHelpers = useChat({ + ...rest, + messages: initialMessages, + transport: customTransport + }); + + // Keep ref updated + chatHelpersRef.current = useChatHelpers; + + /** + * Stop polling for updates + */ + const stopPolling = useCallback(() => { + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + }, []); + + /** + * Start polling for updates using "poke and pull" pattern + */ + const startPolling = useCallback(() => { + if (pollingIntervalRef.current) { + return; // Already polling + } + + pollingIntervalRef.current = setInterval(async () => { + const now = Date.now(); + + // Avoid too frequent polling + if (now - lastPollingTime.current < pollingInterval) { + return; + } + + lastPollingTime.current = now; + + try { + // Check for message updates + const messagesResponse = await fetch(`${normalizedAgentUrl}/messages`, { + credentials, + headers + }); + + if (messagesResponse.ok) { + const data = (await messagesResponse.json()) as { + messages?: ChatMessage[]; + }; + const serverMessages = data.messages || []; + + // Update messages if they've changed + const currentMessages = useChatHelpers.messages; + if ( + serverMessages.length !== currentMessages.length || + (serverMessages.length > 0 && + currentMessages.length > 0 && + serverMessages[serverMessages.length - 1].id !== + currentMessages[currentMessages.length - 1].id) + ) { + useChatHelpers.setMessages(serverMessages); + } + } + + // Check if current stream is still active + if (enableResumableStreams && currentStreamId) { + try { + const statusResponse = await fetch( + `${normalizedAgentUrl}/stream/${currentStreamId}/status`, + { credentials, headers } + ); + + if (statusResponse.ok) { + const status = (await statusResponse.json()) as { + position: number; + completed: boolean; + }; + + // Stop polling if stream is completed + if (status.completed) { + setCurrentStreamId(null); + stopPolling(); + } + } + } catch (error) { + console.warn(`Failed to check stream ${currentStreamId}:`, error); + } + } else { + // No active stream, stop polling + stopPolling(); + } + } catch (error) { + console.warn("Polling error:", error); + } + }, pollingInterval); + }, [ + normalizedAgentUrl, + pollingInterval, + credentials, + headers, + enableResumableStreams, + currentStreamId, + stopPolling, + useChatHelpers + ]); + + /** + * Automatically start/stop polling based on active stream + */ + useEffect(() => { + if (enableResumableStreams && currentStreamId && pollingInterval > 0) { + // Start polling when we have an active stream + startPolling(); + } else { + // Stop polling when no active stream + stopPolling(); + } + }, [ + enableResumableStreams, + currentStreamId, + pollingInterval, + startPolling, + stopPolling + ]); + + /** + * Resume an interrupted stream + */ + const resumeStream = useCallback( + async (streamId: string) => { + try { + console.log(`Attempting to resume stream ${streamId}`); + + // Simply fetch the stream endpoint to check if it's still active + const response = await fetch( + `${normalizedAgentUrl}/stream/${streamId}`, + { + credentials, + headers + } + ); + + if (response.ok) { + // Stream exists, just track it + const isComplete = + response.headers.get("X-Stream-Complete") === "true"; + if (!isComplete) { + setCurrentStreamId(streamId); + } + + // Consume the stream but don't process it here + // Let the polling mechanism handle message updates + if (response.body) { + response.body.cancel(); + } + } + } catch (error) { + console.error(`Failed to resume stream ${streamId}:`, error); + } + }, + [normalizedAgentUrl, credentials, headers] + ); + + /** + * Cancel an active stream + */ + const cancelStream = useCallback( + async (streamId: string) => { + try { + const response = await fetch( + `${normalizedAgentUrl}/stream/${streamId}/cancel`, + { + method: "POST", + credentials, + headers + } + ); + + if (response.ok) { + if (currentStreamId === streamId) { + setCurrentStreamId(null); + } + } + } catch (error) { + console.error(`Failed to cancel stream ${streamId}:`, error); + } + }, + [normalizedAgentUrl, credentials, headers, currentStreamId] + ); + + const processedToolCalls = useRef(new Set()); + + // Calculate pending confirmations for the latest assistant message + const lastMessage = + useChatHelpers.messages[useChatHelpers.messages.length - 1]; + + const pendingConfirmations = (() => { + if (!lastMessage || lastMessage.role !== "assistant") { + return { messageId: undefined, toolCallIds: new Set() }; + } + + const pendingIds = new Set(); + for (const part of lastMessage.parts ?? []) { + if ( + isToolUIPart(part) && + part.state === "input-available" && + toolsRequiringConfirmation.includes(getToolName(part)) + ) { + pendingIds.add(part.toolCallId); + } + } + return { messageId: lastMessage.id, toolCallIds: pendingIds }; + })(); + + const pendingConfirmationsRef = useRef(pendingConfirmations); + pendingConfirmationsRef.current = pendingConfirmations; + + // Tool resolution logic (same as original useAgentChat) + useEffect(() => { + if (!experimental_automaticToolResolution) { + return; + } + + const lastMessage = + useChatHelpers.messages[useChatHelpers.messages.length - 1]; + if (!lastMessage || lastMessage.role !== "assistant") { + return; + } + + const toolCalls = lastMessage.parts.filter( + (part) => + isToolUIPart(part) && + part.state === "input-available" && + !processedToolCalls.current.has(part.toolCallId) + ); + + if (toolCalls.length > 0) { + (async () => { + const toolCallsToResolve = toolCalls.filter( + (part) => + isToolUIPart(part) && + !toolsRequiringConfirmation.includes(getToolName(part)) && + tools?.[getToolName(part)]?.execute + ); + + if (toolCallsToResolve.length > 0) { + for (const part of toolCallsToResolve) { + if (isToolUIPart(part)) { + processedToolCalls.current.add(part.toolCallId); + let toolOutput = null; + const toolName = getToolName(part); + const tool = tools?.[toolName]; + + if (tool?.execute && part.input) { + try { + toolOutput = await tool.execute(part.input); + } catch (error) { + toolOutput = `Error executing tool: ${error instanceof Error ? error.message : String(error)}`; + } + } + + await useChatHelpers.addToolResult({ + toolCallId: part.toolCallId, + tool: toolName, + output: toolOutput + }); + } + } + // If there are NO pending confirmations for the latest assistant message, + // we can continue the conversation. Otherwise, wait for the UI to resolve + // those confirmations; the addToolResult wrapper will send when the last + // pending confirmation is resolved. + if (pendingConfirmationsRef.current.toolCallIds.size === 0) { + useChatHelpers.sendMessage(); + } + } + })(); + } + }, [ + useChatHelpers.messages, + experimental_automaticToolResolution, + useChatHelpers.addToolResult, + useChatHelpers.sendMessage, + toolsRequiringConfirmation, + tools + ]); + + // Load persisted stream ID on mount and resume if active + useEffect(() => { + if (enableResumableStreams && typeof window !== "undefined") { + const savedStreamId = sessionStorage.getItem(storageKey); + if (savedStreamId) { + console.log(`Found saved stream ID: ${savedStreamId}, reconnecting...`); + setCurrentStreamId(savedStreamId); + + // replaying the stored chunks + (async () => { + try { + // Reconnect to the stream - server will provide messages in response + const response = await fetch(`${normalizedAgentUrl}/chat`, { + method: "POST", + credentials, + headers: { + ...headers, + "Content-Type": "application/json" + }, + body: JSON.stringify({ + messages: [], // Empty - server will use its persisted messages + streamId: savedStreamId, + includeMessages: true // Request server to include messages in metadata + }) + }); + + if (response.ok && response.body) { + console.log( + `Successfully reconnected to stream ${savedStreamId}` + ); + + // Optionally hydrate messages from header if present + const messagesHeader = response.headers.get("X-Messages"); + let existingMessages: ChatMessage[] = []; + if (messagesHeader) { + try { + existingMessages = JSON.parse( + decodeURIComponent(messagesHeader) + ); + chatHelpersRef.current?.setMessages(existingMessages); + } catch (e) { + console.warn("Failed to parse messages from header:", e); + } + } + + // Initialize assistant message accumulation for resumed stream + let assistantContent = ""; + const assistantMessageId = `assistant_${Date.now()}`; + + // Process the resumed stream + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + const processStream = async () => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + console.log("Stream completed"); + // Clear the saved stream ID only if truly complete + sessionStorage.removeItem(storageKey); + setCurrentStreamId(null); + break; + } + + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; + + // Parse SSE events + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim() === "") continue; + if (line.startsWith("data: ")) { + const dataStr = line.slice(6); + if (dataStr === "[DONE]") { + console.log("Stream done marker received"); + continue; + } + + try { + const data = JSON.parse(dataStr); + if (data.type === "text-delta" && data.delta) { + assistantContent += data.delta; + const updatedMessages = [...existingMessages]; + const lastMessage = + updatedMessages[updatedMessages.length - 1]; + + if ( + lastMessage && + lastMessage.role === "assistant" + ) { + // const existingParts = lastMessage.parts || []; + // Currently not used but kept for potential future use + // const textPart = existingParts.find( + // (p) => p.type === "text" + // ); + + updatedMessages[updatedMessages.length - 1] = { + ...lastMessage, + id: assistantMessageId, + parts: [ + { + type: "text", + text: assistantContent + } + ] + } as ChatMessage; + } else { + updatedMessages.push({ + id: assistantMessageId, + role: "assistant", + parts: [ + { + type: "text", + text: assistantContent + } + ] + } as unknown as ChatMessage); + } + + chatHelpersRef.current?.setMessages( + updatedMessages + ); + } + } catch (e) { + console.warn("Failed to parse SSE data:", e); + } + } + } + } + } catch (error) { + console.error("Stream processing error:", error); + // Don't clear stream ID on error + } + }; + + processStream(); + } else { + console.log(`Stream ${savedStreamId} not found or completed`); + sessionStorage.removeItem(storageKey); + setCurrentStreamId(null); + } + } catch (error) { + console.error("Failed to reconnect to stream:", error); + sessionStorage.removeItem(storageKey); + setCurrentStreamId(null); + } + })(); + } + } + // Only run once on mount + }, [ + enableResumableStreams, + storageKey, + normalizedAgentUrl, + credentials, + headers + ]); + + // Cleanup polling on unmount + useEffect(() => { + return () => { + stopPolling(); + }; + }, [stopPolling]); + + const { addToolResult } = useChatHelpers; + + // Wrapper that sends only when the last pending confirmation is resolved + const addToolResultAndSendMessage: typeof addToolResult = async (args) => { + const { toolCallId } = args; + + await addToolResult(args); + + if (!autoSendAfterAllConfirmationsResolved) { + // always send immediately + useChatHelpers.sendMessage(); + return; + } + + // wait for all confirmations + const pending = pendingConfirmationsRef.current?.toolCallIds; + if (!pending) { + useChatHelpers.sendMessage(); + return; + } + + const wasLast = pending.size === 1 && pending.has(toolCallId); + if (pending.has(toolCallId)) { + pending.delete(toolCallId); + } + + if (wasLast || pending.size === 0) { + useChatHelpers.sendMessage(); + } + }; + + const clearChatHistory = async () => { + try { + await fetch(`${normalizedAgentUrl}/messages`, { + method: "DELETE", + credentials, + headers + }); + useChatHelpers.setMessages([]); + setCurrentStreamId(null); + if (typeof window !== "undefined") { + sessionStorage.removeItem(storageKey); + } + } catch (error) { + console.error("Failed to clear history:", error); + // Fallback to local clear + useChatHelpers.setMessages([]); + setCurrentStreamId(null); + if (typeof window !== "undefined") { + sessionStorage.removeItem(storageKey); + } + } + }; + + return { + // Core chat functionality from useChat + ...useChatHelpers, + // Override with custom implementations + addToolResult: addToolResultAndSendMessage, + clearChatHistory, + resumeStream, + cancelStream + } as ReturnType> & { + clearChatHistory: () => Promise; + resumeStream: (streamId: string) => Promise; + cancelStream: (streamId: string) => Promise; + }; +} diff --git a/patches/@modelcontextprotocol+sdk+1.21.0.patch b/patches/@modelcontextprotocol+sdk+1.21.0.patch index 0ac54bdd..d6ec9ef4 100644 --- a/patches/@modelcontextprotocol+sdk+1.21.0.patch +++ b/patches/@modelcontextprotocol+sdk+1.21.0.patch @@ -1,5 +1,5 @@ diff --git a/node_modules/@modelcontextprotocol/sdk/dist/esm/validation/ajv-provider.js b/node_modules/@modelcontextprotocol/sdk/dist/esm/validation/ajv-provider.js -index 02762f0..7deca89 100644 +index 02762f0..5b06fff 100644 --- a/node_modules/@modelcontextprotocol/sdk/dist/esm/validation/ajv-provider.js +++ b/node_modules/@modelcontextprotocol/sdk/dist/esm/validation/ajv-provider.js @@ -1,7 +1,7 @@ @@ -7,7 +7,7 @@ index 02762f0..7deca89 100644 * AJV-based JSON Schema validator provider */ -import { Ajv } from 'ajv'; -+import Ajv from 'ajv'; // modifying this because vitest is struggling to pick up the import ++import Ajv from 'ajv'; import _addFormats from 'ajv-formats'; function createDefaultAjvInstance() { const ajv = new Ajv({