Skip to content

Commit afdef6d

Browse files
committed
fix(web): treat fire stream aborts as transient retries
1 parent f3a78c0 commit afdef6d

File tree

7 files changed

+124
-7
lines changed

7 files changed

+124
-7
lines changed

typescript/clients/web-ag-ui/apps/web/src/hooks/useAgentConnection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import { fireAgentRun, logFireCommandDebug } from '../utils/fireAgentRun';
4747
import { resumeInterruptViaAgent } from '../utils/interruptResolution';
4848
import { scheduleCycleAfterInterruptResolution } from '../utils/interruptAutoCycle';
4949
import { canonicalizeChainLabel } from '../utils/iconResolution';
50-
import { isAgentRunning, isBusyRunError } from '../utils/runConcurrency';
50+
import { isAbortLikeError, isAgentRunning, isBusyRunError } from '../utils/runConcurrency';
5151
import { deriveUiState } from '../utils/deriveUiState';
5252
import {
5353
isAgentInterrupt,
@@ -494,6 +494,7 @@ export function useAgentConnection(agentId: string): UseAgentConnectionResult {
494494
runAgent: async (currentAgent) => copilotkit.runAgent({ agent: currentAgent }),
495495
createId: v7,
496496
isBusyRunError,
497+
isAbortLikeError,
497498
isAgentRunning,
498499
onSyncingChange: (isSyncing) => {
499500
setSyncingState({

typescript/clients/web-ag-ui/apps/web/src/utils/agentCommandScheduler.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export function createAgentCommandScheduler<TAgent extends SchedulableAgent>(par
3838
runAgent: (agent: TAgent) => Promise<unknown>;
3939
createId: () => string;
4040
isBusyRunError: (error: unknown) => boolean;
41+
isAbortLikeError?: (error: unknown) => boolean;
4142
isAgentRunning: (agent: TAgent) => boolean;
4243
onSyncingChange: (isSyncing: boolean) => void;
4344
onCommandError?: (command: string, error: unknown) => void;
@@ -122,7 +123,8 @@ export function createAgentCommandScheduler<TAgent extends SchedulableAgent>(par
122123
syncRunInFlight = false;
123124

124125
const busy = params.isBusyRunError(error) || params.isAgentRunning(agent);
125-
if (busy && syncBusyRetries < syncBusyMaxRetries) {
126+
const aborted = params.isAbortLikeError?.(error) ?? false;
127+
if ((busy || aborted) && syncBusyRetries < syncBusyMaxRetries) {
126128
syncBusyRetries += 1;
127129
pendingSyncIntent = true;
128130
refreshSyncing();
@@ -142,7 +144,8 @@ export function createAgentCommandScheduler<TAgent extends SchedulableAgent>(par
142144
}
143145

144146
const busy = params.isBusyRunError(error) || params.isAgentRunning(agent);
145-
if (busy) {
147+
const aborted = params.isAbortLikeError?.(error) ?? false;
148+
if (busy || aborted) {
146149
params.onCommandBusy?.(command, error);
147150
return;
148151
}

typescript/clients/web-ag-ui/apps/web/src/utils/agentCommandScheduler.unit.test.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ describe('agentCommandScheduler', () => {
4444
createId: () => 'msg-1',
4545
isBusyRunError: (error) =>
4646
error instanceof Error && error.message.includes('already active'),
47+
isAbortLikeError: (error) => error instanceof Error && error.message.includes('aborted'),
4748
isAgentRunning: (value) => value.isRunning === true,
4849
onSyncingChange,
4950
onCommandError,
@@ -140,6 +141,31 @@ describe('agentCommandScheduler', () => {
140141
}
141142
});
142143

144+
it('retries sync on abort-like transport failures before surfacing busy state', async () => {
145+
vi.useFakeTimers();
146+
147+
runAgent = vi
148+
.fn<() => Promise<void>>()
149+
.mockRejectedValueOnce(new Error('BodyStreamBuffer was aborted'))
150+
.mockResolvedValueOnce(undefined);
151+
152+
const scheduler = createScheduler({ syncReplayDelayMs: 10, syncBusyMaxRetries: 2 });
153+
154+
try {
155+
scheduler.dispatch('sync', { allowSyncCoalesce: true });
156+
157+
await Promise.resolve();
158+
await vi.advanceTimersByTimeAsync(20);
159+
160+
expect(runAgent).toHaveBeenCalledTimes(2);
161+
expect(onCommandBusy).not.toHaveBeenCalled();
162+
expect(onCommandError).not.toHaveBeenCalled();
163+
} finally {
164+
scheduler.dispose();
165+
vi.useRealTimers();
166+
}
167+
});
168+
143169
it('rejects non-sync command dispatch while run is in flight', () => {
144170
runInFlight = true;
145171
const scheduler = createScheduler();
@@ -189,6 +215,22 @@ describe('agentCommandScheduler', () => {
189215
scheduler.dispose();
190216
});
191217

218+
it('treats abort-like failures for non-sync commands as transient busy responses', async () => {
219+
runAgent = vi.fn(async () => {
220+
throw new Error('BodyStreamBuffer was aborted');
221+
});
222+
const scheduler = createScheduler();
223+
224+
scheduler.dispatch('hire');
225+
await Promise.resolve();
226+
227+
expect(onCommandBusy).toHaveBeenCalledTimes(1);
228+
expect(onCommandBusy).toHaveBeenCalledWith('hire', expect.any(Error));
229+
expect(onCommandError).not.toHaveBeenCalled();
230+
231+
scheduler.dispose();
232+
});
233+
192234
it('dispatches custom runs through the same scheduler gating path', async () => {
193235
const scheduler = createScheduler();
194236
const runCustom = vi.fn(async () => undefined);

typescript/clients/web-ag-ui/apps/web/src/utils/fireAgentRun.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { v7 as uuidv7 } from 'uuid';
2-
import { isAgentRunning, isBusyRunError } from './runConcurrency';
2+
import { isAbortLikeError, isAgentRunning, isBusyRunError } from './runConcurrency';
33

44
type AgentLike = {
55
addMessage: (message: { id: string; role: 'user'; content: string }) => void;
@@ -122,13 +122,14 @@ const startFireRun = async <TAgent extends AgentLike>(
122122
});
123123
const maxRetries = options?.maxRetries ?? FIRE_RUN_MAX_RETRIES;
124124
const retryDelayMs = options?.retryDelayMs ?? FIRE_RUN_RETRY_DELAY_MS;
125-
if (isBusyRunError(error) && attempt < maxRetries - 1) {
125+
const shouldRetry = isBusyRunError(error) || isAbortLikeError(error);
126+
if (shouldRetry && attempt < maxRetries - 1) {
126127
await sleep(retryDelayMs);
127128
return startFireRun(runAgent, agent, runInFlightRef, options, attempt + 1);
128129
}
129130

130131
runInFlightRef.current = false;
131-
if (isBusyRunError(error)) {
132+
if (isBusyRunError(error) || isAbortLikeError(error)) {
132133
throw new Error('Agent run is still active. Please retry in a moment.');
133134
}
134135
throw error;

typescript/clients/web-ag-ui/apps/web/src/utils/fireAgentRun.unit.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,47 @@ describe('fireAgentRun', () => {
284284
}
285285
});
286286

287+
it('retries aborted fire starts and avoids surfacing transient stream abort errors', async () => {
288+
vi.useFakeTimers();
289+
290+
const onError = vi.fn();
291+
const agent = {
292+
isRunning: false,
293+
addMessage: vi.fn(),
294+
detachActiveRun: vi.fn(async () => undefined),
295+
};
296+
const runInFlightRef = { current: false };
297+
const runAgent = vi
298+
.fn()
299+
.mockRejectedValueOnce(new Error('BodyStreamBuffer was aborted'))
300+
.mockResolvedValueOnce(undefined);
301+
302+
try {
303+
const ok = await fireAgentRun({
304+
agent,
305+
runAgent,
306+
threadId: 'thread-1',
307+
runInFlightRef,
308+
createId: () => 'msg-1',
309+
onError,
310+
busyRunMaxRetries: 4,
311+
busyRunRetryDelayMs: 100,
312+
});
313+
314+
expect(ok).toBe(true);
315+
expect(runAgent).toHaveBeenCalledTimes(1);
316+
317+
await vi.advanceTimersByTimeAsync(120);
318+
319+
expect(runAgent).toHaveBeenCalledTimes(2);
320+
expect(onError).not.toHaveBeenCalled();
321+
expect(runInFlightRef.current).toBe(true);
322+
expect(agent.addMessage).toHaveBeenCalledTimes(1);
323+
} finally {
324+
vi.useRealTimers();
325+
}
326+
});
327+
287328
it('does nothing when threadId is missing', async () => {
288329
const agent = { abortRun: vi.fn(), detachActiveRun: vi.fn(), addMessage: vi.fn() };
289330
const copilotkit = { runAgent: vi.fn() };

typescript/clients/web-ag-ui/apps/web/src/utils/runConcurrency.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ type HttpStatus = number | string;
22

33
type BusyError = {
44
message?: string;
5+
name?: string;
56
status?: HttpStatus;
67
statusCode?: HttpStatus;
78
code?: string;
@@ -41,6 +42,27 @@ export function isBusyRunError(error: unknown): boolean {
4142
);
4243
}
4344

45+
export function isAbortLikeError(error: unknown): boolean {
46+
const maybeError = error as BusyError;
47+
const status = toStatusCode(
48+
maybeError?.status ??
49+
maybeError?.statusCode ??
50+
(typeof maybeError?.response === 'object' ? maybeError.response?.status : undefined),
51+
);
52+
if (status === 499) return true;
53+
54+
const name = `${maybeError?.name ?? ''}`.toLowerCase();
55+
if (name.includes('abort')) return true;
56+
57+
const message = `${maybeError?.message ?? ''}`.toLowerCase();
58+
return (
59+
message.includes('aborterror') ||
60+
message.includes('aborted') ||
61+
message.includes('abort') ||
62+
message.includes('bodystreambuffer')
63+
);
64+
}
65+
4466
export function isAgentRunning(agent: RunAwareAgent): boolean {
4567
const field = agent.isRunning;
4668
if (typeof field === 'function') return field();

typescript/clients/web-ag-ui/apps/web/src/utils/runConcurrency.unit.test.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { describe, expect, it } from 'vitest';
22

3-
import { isAgentRunning, isBusyRunError } from './runConcurrency';
3+
import { isAbortLikeError, isAgentRunning, isBusyRunError } from './runConcurrency';
44

55
describe('runConcurrency', () => {
66
it('recognizes server busy statuses', () => {
@@ -20,6 +20,13 @@ describe('runConcurrency', () => {
2020
expect(isBusyRunError(new Error('network error'))).toBe(false);
2121
});
2222

23+
it('recognizes abort-like transport errors', () => {
24+
expect(isAbortLikeError({ name: 'AbortError' })).toBe(true);
25+
expect(isAbortLikeError({ message: 'BodyStreamBuffer was aborted' })).toBe(true);
26+
expect(isAbortLikeError({ status: 499 })).toBe(true);
27+
expect(isAbortLikeError({ message: 'network error' })).toBe(false);
28+
});
29+
2330
it('detects running agent flag as boolean or function', () => {
2431
expect(isAgentRunning({ isRunning: true })).toBe(true);
2532
expect(isAgentRunning({ isRunning: false })).toBe(false);

0 commit comments

Comments
 (0)