Skip to content

Commit 1819991

Browse files
whoiskatrinthreepointone
authored andcommitted
Refactor ID generation across agents to use nanoid (same as base agent). improve stream manager with a destroy method for proper cleanup during agent destruction
1 parent 0c872c8 commit 1819991

File tree

5 files changed

+44
-15
lines changed

5 files changed

+44
-15
lines changed

packages/agents/src/ai-chat-agent-http.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {
33
StreamTextOnFinishCallback,
44
ToolSet
55
} from "ai";
6+
import { nanoid } from "nanoid";
67
import { Agent, type AgentContext } from "./";
78
import { autoTransformMessages } from "./ai-chat-v5-migration";
89
import { ResumableStreamManager } from "./resumable-stream-manager";
@@ -178,9 +179,7 @@ export class AIHttpChatAgent<
178179
}
179180

180181
// Generate or reuse stream ID
181-
const streamId =
182-
requestStreamId ||
183-
`stream_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`;
182+
const streamId = requestStreamId || `stream_${nanoid()}`;
184183

185184
// Delegate to stream manager
186185
return this._streamManager.startStream(
@@ -276,4 +275,13 @@ export class AIHttpChatAgent<
276275
async cleanupOldStreams(maxAgeHours = 24): Promise<void> {
277276
await this._streamManager.cleanupOldStreams(maxAgeHours);
278277
}
278+
279+
/**
280+
* Override destroy to properly clean up resumable streaming
281+
*/
282+
override async destroy(): Promise<void> {
283+
// Clean up resumable streaming first
284+
await this._streamManager.destroy();
285+
await super.destroy();
286+
}
279287
}

packages/agents/src/client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
PartySocket,
44
type PartySocketOptions
55
} from "partysocket";
6+
import { nanoid } from "nanoid";
67
import type { RPCRequest, RPCResponse } from "./";
78
import type {
89
SerializableReturnValue,
@@ -181,7 +182,7 @@ export class AgentClient<State = unknown> extends PartySocket {
181182
streamOptions?: StreamOptions
182183
): Promise<T> {
183184
return new Promise<T>((resolve, reject) => {
184-
const id = Math.random().toString(36).slice(2);
185+
const id = nanoid();
185186
this._pendingCalls.set(id, {
186187
reject,
187188
resolve: (value: unknown) => resolve(value as T),

packages/agents/src/mcp/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type ElicitResult
1010
} from "@modelcontextprotocol/sdk/types.js";
1111
import type { Connection, ConnectionContext } from "../";
12+
import { nanoid } from "nanoid";
1213
import { Agent } from "../index";
1314
import type { BaseTransportType, MaybePromise, ServeOptions } from "./types";
1415
import {
@@ -224,7 +225,7 @@ export abstract class McpAgent<
224225
message: string;
225226
requestedSchema: unknown;
226227
}): Promise<ElicitResult> {
227-
const requestId = `elicit_${Math.random().toString(36).substring(2, 11)}`;
228+
const requestId = `elicit_${nanoid()}`;
228229

229230
// Store pending request in durable storage
230231
await this.ctx.storage.put(`elicitation:${requestId}`, {

packages/agents/src/react.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { PartySocket } from "partysocket";
22
import { usePartySocket } from "partysocket/react";
33
import { useCallback, useRef, use, useMemo, useEffect } from "react";
4+
import { nanoid } from "nanoid";
45
import type { Agent, MCPServersState, RPCRequest, RPCResponse } from "./";
56
import type { StreamOptions } from "./client";
67
import type { Method, RPCMethod } from "./serializable";
@@ -390,7 +391,7 @@ export function useAgent<State>(
390391
streamOptions?: StreamOptions
391392
): Promise<T> => {
392393
return new Promise((resolve, reject) => {
393-
const id = Math.random().toString(36).slice(2);
394+
const id = nanoid();
394395
pendingCallsRef.current.set(id, {
395396
reject,
396397
resolve: resolve as (value: unknown) => void,

packages/agents/src/resumable-stream-manager.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,9 @@ import type {
33
StreamTextOnFinishCallback,
44
ToolSet
55
} from "ai";
6+
import { nanoid } from "nanoid";
67
import type { AgentContext } from "./";
78

8-
type SqlQueryFunction = <T = Record<string, string | number | boolean | null>>(
9-
strings: TemplateStringsArray,
10-
...values: (string | number | boolean | null)[]
11-
) => T[];
12-
139
interface StreamStateRow {
1410
stream_id: string;
1511
seq: number;
@@ -53,9 +49,18 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
5349
>;
5450

5551
private ctx: AgentContext;
56-
private sql: SqlQueryFunction;
57-
58-
constructor(ctx: AgentContext, sql: SqlQueryFunction) {
52+
private sql: <T = Record<string, string | number | boolean | null>>(
53+
strings: TemplateStringsArray,
54+
...values: (string | number | boolean | null)[]
55+
) => T[];
56+
57+
constructor(
58+
ctx: AgentContext,
59+
sql: <T = Record<string, string | number | boolean | null>>(
60+
strings: TemplateStringsArray,
61+
...values: (string | number | boolean | null)[]
62+
) => T[]
63+
) {
5964
this.ctx = ctx;
6065
this.sql = sql;
6166
this._activeStreams = new Map();
@@ -264,6 +269,19 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
264269
this._activeStreams.clear();
265270
}
266271

272+
/**
273+
* Destroy all resumable streaming
274+
* Should be called during Agent destruction
275+
*/
276+
async destroy(): Promise<void> {
277+
// Clear in-memory state first
278+
this._activeStreams.clear();
279+
280+
// Drop all tables
281+
this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_streams`;
282+
this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_chunks`;
283+
}
284+
267285
/**
268286
* Clean up old completed streams (call periodically)
269287
*/
@@ -369,7 +387,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
369387
if (!streamState) return;
370388

371389
let assistantMessageText = "";
372-
const assistantMessageId = `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`;
390+
const assistantMessageId = `assistant_${nanoid()}`;
373391
let buffer = "";
374392

375393
try {

0 commit comments

Comments
 (0)