Skip to content

Commit 7d25067

Browse files
authored
Merge pull request #7272 from continuedev/nate/cli-fixes
fix: max abort listeners warning
2 parents 49f87bc + dddfb19 commit 7d25067

File tree

4 files changed

+131
-84
lines changed

4 files changed

+131
-84
lines changed

extensions/cli/src/streamChatResponse.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ export async function processStreamingResponse(
300300
} = options;
301301
const requestStartTime = Date.now();
302302

303-
const streamFactory = async () => {
303+
const streamFactory = async (retryAbortSignal: AbortSignal) => {
304304
logger.debug("Creating chat completion stream", {
305305
model,
306306
messageCount: chatHistory.length,
@@ -315,7 +315,7 @@ export async function processStreamingResponse(
315315
tools,
316316
...getDefaultCompletionOptions(model.defaultCompletionOptions),
317317
},
318-
abortController.signal,
318+
retryAbortSignal,
319319
);
320320
};
321321

extensions/cli/src/ui/hooks/useChat.ts

Lines changed: 97 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ export function useChat({
111111
return null;
112112
});
113113

114+
// Clean up abort controller on unmount
115+
useEffect(() => {
116+
return () => {
117+
if (abortController && !abortController.signal.aborted) {
118+
abortController.abort();
119+
}
120+
};
121+
}, [abortController]);
122+
114123
// Remote mode polling
115124
useEffect(() => {
116125
if (!isRemoteMode || !remoteUrl || process.env.NODE_ENV === "test") return;
@@ -158,6 +167,92 @@ export function useChat({
158167
}
159168
}, [initialPrompt, isChatHistoryInitialized]);
160169

170+
const executeStreamingResponse = async (
171+
newHistory: ChatCompletionMessageParam[],
172+
currentCompactionIndex: number | null,
173+
message: string,
174+
) => {
175+
// Clean up previous abort controller if it exists
176+
if (abortController && !abortController.signal.aborted) {
177+
abortController.abort();
178+
}
179+
180+
// Start streaming response
181+
const controller = new AbortController();
182+
setAbortController(controller);
183+
setIsWaitingForResponse(true);
184+
setResponseStartTime(Date.now());
185+
setInputMode(false);
186+
logger.debug("Starting chat response stream", {
187+
messageLength: message.length,
188+
historyLength: newHistory.length,
189+
});
190+
191+
try {
192+
const currentStreamingMessageRef = {
193+
current: null as DisplayMessage | null,
194+
};
195+
const streamCallbacks = createStreamCallbacks(
196+
{ setMessages, setActivePermissionRequest },
197+
currentStreamingMessageRef,
198+
);
199+
200+
// Execute streaming chat response
201+
await executeStreaming({
202+
newHistory,
203+
model,
204+
llmApi,
205+
controller,
206+
streamCallbacks,
207+
currentCompactionIndex,
208+
});
209+
210+
if (
211+
currentStreamingMessageRef.current &&
212+
currentStreamingMessageRef.current.content
213+
) {
214+
const messageContent = currentStreamingMessageRef.current.content;
215+
setMessages((prev) => [
216+
...prev,
217+
{
218+
role: "assistant",
219+
content: messageContent,
220+
isStreaming: false,
221+
},
222+
]);
223+
}
224+
225+
// Update the chat history with the complete conversation after streaming
226+
setChatHistory(newHistory);
227+
logger.debug("Chat history updated", {
228+
finalHistoryLength: newHistory.length,
229+
});
230+
231+
// Save the updated history to session
232+
logger.debug("Saving session", { historyLength: newHistory.length });
233+
saveSession(newHistory);
234+
logger.debug("Session saved");
235+
} catch (error: any) {
236+
const errorMessage = `Error: ${formatError(error)}`;
237+
setMessages((prev) => [
238+
...prev,
239+
{
240+
role: "system",
241+
content: errorMessage,
242+
messageType: "system" as const,
243+
},
244+
]);
245+
} finally {
246+
// Stop active time tracking
247+
telemetryService.stopActiveTime();
248+
249+
setAbortController(null);
250+
setIsWaitingForResponse(false);
251+
setResponseStartTime(null);
252+
setInputMode(true);
253+
}
254+
};
255+
161256
const handleUserMessage = async (message: string) => {
162257
// Handle special commands
163258
const handled = await handleSpecialCommands({
@@ -247,80 +342,8 @@ export function useChat({
247342
setChatHistory(newHistory);
248343
setMessages((prev) => [...prev, { role: "user", content: message }]);
249344

250-
// Start streaming response
251-
const controller = new AbortController();
252-
setAbortController(controller);
253-
setIsWaitingForResponse(true);
254-
setResponseStartTime(Date.now());
255-
setInputMode(false);
256-
logger.debug("Starting chat response stream", {
257-
messageLength: message.length,
258-
historyLength: newHistory.length,
259-
});
260-
261-
try {
262-
const currentStreamingMessageRef = {
263-
current: null as DisplayMessage | null,
264-
};
265-
const streamCallbacks = createStreamCallbacks(
266-
{ setMessages, setActivePermissionRequest },
267-
currentStreamingMessageRef,
268-
);
269-
270-
// Execute streaming chat response
271-
await executeStreaming({
272-
newHistory,
273-
model,
274-
llmApi,
275-
controller,
276-
streamCallbacks,
277-
currentCompactionIndex,
278-
});
279-
280-
if (
281-
currentStreamingMessageRef.current &&
282-
currentStreamingMessageRef.current.content
283-
) {
284-
const messageContent = currentStreamingMessageRef.current.content;
285-
setMessages((prev) => [
286-
...prev,
287-
{
288-
role: "assistant",
289-
content: messageContent,
290-
isStreaming: false,
291-
},
292-
]);
293-
}
294-
295-
// Update the chat history with the complete conversation after streaming
296-
setChatHistory(newHistory);
297-
logger.debug("Chat history updated", {
298-
finalHistoryLength: newHistory.length,
299-
});
300-
301-
// Save the updated history to session
302-
logger.debug("Saving session", { historyLength: newHistory.length });
303-
saveSession(newHistory);
304-
logger.debug("Session saved");
305-
} catch (error: any) {
306-
const errorMessage = `Error: ${formatError(error)}`;
307-
setMessages((prev) => [
308-
...prev,
309-
{
310-
role: "system",
311-
content: errorMessage,
312-
messageType: "system" as const,
313-
},
314-
]);
315-
} finally {
316-
// Stop active time tracking
317-
telemetryService.stopActiveTime();
318-
319-
setAbortController(null);
320-
setIsWaitingForResponse(false);
321-
setResponseStartTime(null);
322-
setInputMode(true);
323-
}
345+
// Execute the streaming response
346+
await executeStreamingResponse(newHistory, currentCompactionIndex, message);
324347
};
325348

326349
const handleInterrupt = () => {

extensions/cli/src/util/exponentialBackoff.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,23 +192,38 @@ export async function chatCompletionStreamWithBackoff(
192192
* If the generator throws an error during iteration, it will retry the entire generator.
193193
*/
194194
export async function* withExponentialBackoff<T>(
195-
generatorFactory: () => Promise<AsyncGenerator<T>>,
195+
generatorFactory: (
196+
retryAbortSignal: AbortSignal,
197+
) => Promise<AsyncGenerator<T>>,
196198
abortSignal: AbortSignal,
197199
options: ExponentialBackoffOptions = {},
198200
): AsyncGenerator<T> {
199201
const opts = { ...DEFAULT_OPTIONS, ...options };
200202
let lastError: any;
201203

202204
for (let attempt = 0; attempt <= opts.maxRetries; attempt++) {
203-
const yieldedValues: T[] = [];
205+
// Create a new AbortController for this retry attempt
206+
// This prevents accumulating listeners on the original signal
207+
const retryAbortController = new AbortController();
208+
209+
// Forward abort from the original signal to the retry signal
210+
const abortListener = () => {
211+
retryAbortController.abort();
212+
};
213+
214+
if (abortSignal.aborted) {
215+
retryAbortController.abort();
216+
} else if (typeof abortSignal.addEventListener === "function") {
217+
abortSignal.addEventListener("abort", abortListener);
218+
}
204219

205220
try {
206221
// Check if we should abort before creating the generator
207222
if (abortSignal.aborted) {
208223
throw new Error("Request aborted");
209224
}
210225

211-
const generator = await generatorFactory();
226+
const generator = await generatorFactory(retryAbortController.signal);
212227

213228
// Iterate through the generator and yield each value
214229
for await (const chunk of generator) {
@@ -217,15 +232,24 @@ export async function* withExponentialBackoff<T>(
217232
throw new Error("Request aborted");
218233
}
219234

220-
yieldedValues.push(chunk);
221235
yield chunk;
222236
}
223237

238+
// Clean up the abort listener since we succeeded
239+
if (typeof abortSignal.removeEventListener === "function") {
240+
abortSignal.removeEventListener("abort", abortListener);
241+
}
242+
224243
// If we successfully completed the generator, we're done
225244
return;
226245
} catch (err: any) {
227246
lastError = err;
228247

248+
// Clean up the abort listener
249+
if (typeof abortSignal.removeEventListener === "function") {
250+
abortSignal.removeEventListener("abort", abortListener);
251+
}
252+
229253
// Don't retry if the request was aborted
230254
if (abortSignal.aborted) {
231255
throw err;

extensions/cli/src/util/withExponentialBackoff.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe("withExponentialBackoff", () => {
2020

2121
it("should successfully yield all values from generator", async () => {
2222
const mockData = ["chunk1", "chunk2", "chunk3"];
23-
const generatorFactory = vi.fn(async () => {
23+
const generatorFactory = vi.fn(async (retryAbortSignal: AbortSignal) => {
2424
return (async function* () {
2525
for (const chunk of mockData) {
2626
yield chunk;
@@ -44,7 +44,7 @@ describe("withExponentialBackoff", () => {
4444

4545
it("should retry on retryable errors during generator creation", async () => {
4646
let callCount = 0;
47-
const generatorFactory = vi.fn(async () => {
47+
const generatorFactory = vi.fn(async (retryAbortSignal: AbortSignal) => {
4848
callCount++;
4949
if (callCount === 1) {
5050
const error = new Error("Connection reset");
@@ -75,7 +75,7 @@ describe("withExponentialBackoff", () => {
7575
});
7676

7777
it("should not retry on non-retryable errors", async () => {
78-
const generatorFactory = vi.fn(async () => {
78+
const generatorFactory = vi.fn(async (retryAbortSignal: AbortSignal) => {
7979
const error = new Error("Bad request");
8080
(error as any).status = 400;
8181
throw error;
@@ -96,7 +96,7 @@ describe("withExponentialBackoff", () => {
9696
});
9797

9898
it("should handle abort signal", async () => {
99-
const generatorFactory = vi.fn(async () => {
99+
const generatorFactory = vi.fn(async (retryAbortSignal: AbortSignal) => {
100100
return (async function* () {
101101
yield "should-not-yield";
102102
})();

0 commit comments

Comments
 (0)