Skip to content

Commit 3f5f73e

Browse files
committed
add timeout manager split
1 parent be7ab37 commit 3f5f73e

File tree

3 files changed

+281
-77
lines changed

3 files changed

+281
-77
lines changed

packages/core/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export * from './shared/progressManager.js';
66
export * from './shared/protocol.js';
77
export * from './shared/responseMessage.js';
88
export * from './shared/stdio.js';
9+
export * from './shared/timeoutManager.js';
910
export * from './shared/toolNameValidation.js';
1011
export * from './shared/transport.js';
1112
export * from './shared/uriTemplate.js';

packages/core/src/shared/protocol.ts

Lines changed: 31 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import { parseWithCompat } from '../util/zodJsonSchemaCompat.js';
5353
import type { ProgressCallback } from './progressManager.js';
5454
import { ProgressManager } from './progressManager.js';
5555
import type { ResponseMessage } from './responseMessage.js';
56+
import { TimeoutManager } from './timeoutManager.js';
5657
import type { Transport, TransportSendOptions } from './transport.js';
5758

5859
/**
@@ -300,18 +301,6 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
300301
closeStandaloneSSEStream?: () => void;
301302
};
302303

303-
/**
304-
* Information about a request's timeout state
305-
*/
306-
type TimeoutInfo = {
307-
timeoutId: ReturnType<typeof setTimeout>;
308-
startTime: number;
309-
timeout: number;
310-
maxTotalTimeout?: number;
311-
resetTimeoutOnProgress: boolean;
312-
onTimeout: () => void;
313-
};
314-
315304
/**
316305
* Implements MCP protocol framing on top of a pluggable transport, including
317306
* features like request/response linking, notifications, and progress.
@@ -326,8 +315,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
326315
private _requestHandlerAbortControllers: Map<RequestId, AbortController> = new Map();
327316
private _notificationHandlers: Map<string, (notification: JSONRPCNotification) => Promise<void>> = new Map();
328317
private _responseHandlers: Map<number, (response: JSONRPCResultResponse | Error) => void> = new Map();
329-
private _progressManager: ProgressManager = new ProgressManager();
330-
private _timeoutInfo: Map<number, TimeoutInfo> = new Map();
318+
readonly #progressManager: ProgressManager = new ProgressManager();
319+
readonly #timeoutManager: TimeoutManager = new TimeoutManager();
331320
private _pendingDebouncedNotifications = new Set<string>();
332321

333322
private _taskStore?: TaskStore;
@@ -550,49 +539,6 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
550539
controller?.abort(notification.params.reason);
551540
}
552541

553-
private _setupTimeout(
554-
messageId: number,
555-
timeout: number,
556-
maxTotalTimeout: number | undefined,
557-
onTimeout: () => void,
558-
resetTimeoutOnProgress: boolean = false
559-
) {
560-
this._timeoutInfo.set(messageId, {
561-
timeoutId: setTimeout(onTimeout, timeout),
562-
startTime: Date.now(),
563-
timeout,
564-
maxTotalTimeout,
565-
resetTimeoutOnProgress,
566-
onTimeout
567-
});
568-
}
569-
570-
private _resetTimeout(messageId: number): boolean {
571-
const info = this._timeoutInfo.get(messageId);
572-
if (!info) return false;
573-
574-
const totalElapsed = Date.now() - info.startTime;
575-
if (info.maxTotalTimeout && totalElapsed >= info.maxTotalTimeout) {
576-
this._timeoutInfo.delete(messageId);
577-
throw McpError.fromError(ErrorCode.RequestTimeout, 'Maximum total timeout exceeded', {
578-
maxTotalTimeout: info.maxTotalTimeout,
579-
totalElapsed
580-
});
581-
}
582-
583-
clearTimeout(info.timeoutId);
584-
info.timeoutId = setTimeout(info.onTimeout, info.timeout);
585-
return true;
586-
}
587-
588-
private _cleanupTimeout(messageId: number) {
589-
const info = this._timeoutInfo.get(messageId);
590-
if (info) {
591-
clearTimeout(info.timeoutId);
592-
this._timeoutInfo.delete(messageId);
593-
}
594-
}
595-
596542
/**
597543
* Attaches to the given transport, starts it, and starts listening for messages.
598544
*
@@ -632,7 +578,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
632578
private _onclose(): void {
633579
const responseHandlers = this._responseHandlers;
634580
this._responseHandlers = new Map();
635-
this._progressManager.clear();
581+
this.#progressManager.clear();
636582
this._pendingDebouncedNotifications.clear();
637583

638584
const error = McpError.fromError(ErrorCode.ConnectionClosed, 'Connection closed');
@@ -821,28 +767,31 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
821767
const progressToken = notification.params.progressToken;
822768
const messageId = Number(progressToken);
823769

824-
if (!this._progressManager.hasHandler(messageId)) {
770+
if (!this.#progressManager.hasHandler(messageId)) {
825771
this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`));
826772
return;
827773
}
828774

829775
const responseHandler = this._responseHandlers.get(messageId);
830-
const timeoutInfo = this._timeoutInfo.get(messageId);
776+
const timeoutInfo = this.#timeoutManager.get(messageId);
831777

832778
if (timeoutInfo && responseHandler && timeoutInfo.resetTimeoutOnProgress) {
833-
try {
834-
this._resetTimeout(messageId);
835-
} catch (error) {
779+
const resetResult = this.#timeoutManager.reset(messageId);
780+
if (!resetResult.success && resetResult.maxTotalTimeoutExceeded) {
836781
// Clean up if maxTotalTimeout was exceeded
837782
this._responseHandlers.delete(messageId);
838-
this._progressManager.removeHandler(messageId);
839-
this._cleanupTimeout(messageId);
840-
responseHandler(error as Error);
783+
this.#progressManager.removeHandler(messageId);
784+
this.#timeoutManager.cleanup(messageId);
785+
const error = McpError.fromError(ErrorCode.RequestTimeout, 'Maximum total timeout exceeded', {
786+
maxTotalTimeout: resetResult.maxTotalTimeoutExceeded.maxTotalTimeout,
787+
totalElapsed: resetResult.maxTotalTimeoutExceeded.elapsed
788+
});
789+
responseHandler(error);
841790
return;
842791
}
843792
}
844793

845-
this._progressManager.handleProgress(notification);
794+
this.#progressManager.handleProgress(notification);
846795
}
847796

848797
private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void {
@@ -868,7 +817,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
868817
}
869818

870819
this._responseHandlers.delete(messageId);
871-
this._cleanupTimeout(messageId);
820+
this.#timeoutManager.cleanup(messageId);
872821

873822
// Keep progress handler alive for CreateTaskResult responses
874823
let isTaskResponse = false;
@@ -878,13 +827,13 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
878827
const task = result.task as Record<string, unknown>;
879828
if (typeof task.taskId === 'string') {
880829
isTaskResponse = true;
881-
this._progressManager.linkTaskToProgressToken(task.taskId, messageId);
830+
this.#progressManager.linkTaskToProgressToken(task.taskId, messageId);
882831
}
883832
}
884833
}
885834

886835
if (!isTaskResponse) {
887-
this._progressManager.removeHandler(messageId);
836+
this.#progressManager.removeHandler(messageId);
888837
}
889838

890839
if (isJSONRPCResultResponse(response)) {
@@ -1107,7 +1056,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
11071056
};
11081057

11091058
if (options?.onprogress) {
1110-
this._progressManager.registerHandler(messageId, options.onprogress);
1059+
this.#progressManager.registerHandler(messageId, options.onprogress);
11111060
jsonrpcRequest.params = {
11121061
...request.params,
11131062
_meta: {
@@ -1138,8 +1087,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
11381087

11391088
const cancel = (reason: unknown) => {
11401089
this._responseHandlers.delete(messageId);
1141-
this._progressManager.removeHandler(messageId);
1142-
this._cleanupTimeout(messageId);
1090+
this.#progressManager.removeHandler(messageId);
1091+
this.#timeoutManager.cleanup(messageId);
11431092

11441093
this._transport
11451094
?.send(
@@ -1189,7 +1138,12 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
11891138
const timeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
11901139
const timeoutHandler = () => cancel(McpError.fromError(ErrorCode.RequestTimeout, 'Request timed out', { timeout }));
11911140

1192-
this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false);
1141+
this.#timeoutManager.setup(messageId, {
1142+
timeout,
1143+
maxTotalTimeout: options?.maxTotalTimeout,
1144+
resetTimeoutOnProgress: options?.resetTimeoutOnProgress ?? false,
1145+
onTimeout: timeoutHandler
1146+
});
11931147

11941148
// Queue request if related to a task
11951149
const relatedTaskId = relatedTask?.taskId;
@@ -1211,7 +1165,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
12111165
message: jsonrpcRequest,
12121166
timestamp: Date.now()
12131167
}).catch(error => {
1214-
this._cleanupTimeout(messageId);
1168+
this.#timeoutManager.cleanup(messageId);
12151169
reject(error);
12161170
});
12171171

@@ -1220,7 +1174,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
12201174
} else {
12211175
// No related task - send through transport normally
12221176
this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
1223-
this._cleanupTimeout(messageId);
1177+
this.#timeoutManager.cleanup(messageId);
12241178
reject(error);
12251179
});
12261180
}
@@ -1450,7 +1404,7 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
14501404
* This should be called when a task reaches a terminal status.
14511405
*/
14521406
private _cleanupTaskProgressHandler(taskId: string): void {
1453-
this._progressManager.cleanupTaskProgressHandler(taskId);
1407+
this.#progressManager.cleanupTaskProgressHandler(taskId);
14541408
}
14551409

14561410
/**

0 commit comments

Comments
 (0)