From f54b86fef3de4a4ee00ab3da30c2dcd525900855 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Wed, 27 Aug 2025 13:53:56 +0200 Subject: [PATCH 1/2] add a ping loop for http transport clients --- src/common/logger.ts | 2 ++ src/index.ts | 4 +++- src/transports/streamableHttp.ts | 36 ++++++++++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/common/logger.ts b/src/common/logger.ts index 1cdd0c4a..0bfafbb2 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -51,6 +51,8 @@ export const LogId = { streamableHttpTransportSessionCloseNotificationFailure: mongoLogId(1_006_004), streamableHttpTransportRequestFailure: mongoLogId(1_006_005), streamableHttpTransportCloseFailure: mongoLogId(1_006_006), + streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007), + streamableHttpTransportKeepAlive: mongoLogId(1_006_008), exportCleanupError: mongoLogId(1_007_001), exportCreationError: mongoLogId(1_007_002), diff --git a/src/index.ts b/src/index.ts index 31f39346..b1ac4b48 100644 --- a/src/index.ts +++ b/src/index.ts @@ -91,8 +91,10 @@ async function main(): Promise { transportRunner.logger.info({ id: LogId.serverCloseRequested, context: "server", - message: "Closing server", + message: `Closing server due to error: ${error as string}`, + noRedaction: true, }); + try { await transportRunner.close(); transportRunner.logger.info({ diff --git a/src/transports/streamableHttp.ts b/src/transports/streamableHttp.ts index 74ad3062..8fc5458d 100644 --- a/src/transports/streamableHttp.ts +++ b/src/transports/streamableHttp.ts @@ -74,7 +74,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { jsonrpc: "2.0", error: { code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID, - message: `session id is invalid`, + message: "session id is invalid", }, }); return; @@ -85,7 +85,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { jsonrpc: "2.0", error: { code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND, - message: `session not found`, + message: "session not found", }, }); return; @@ -114,12 +114,42 @@ export class StreamableHttpRunner extends TransportRunnerBase { } const server = this.setupServer(); + let keepAliveLoop: NodeJS.Timeout; const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: (): string => randomUUID().toString(), onsessioninitialized: (sessionId): void => { server.session.logger.setAttribute("sessionId", sessionId); this.sessionStore.setSession(sessionId, transport, server.session.logger); + + let failedPings = 0; + // eslint-disable-next-line @typescript-eslint/no-misused-promises + keepAliveLoop = setInterval(async () => { + try { + this.logger.debug({ + id: LogId.streamableHttpTransportKeepAlive, + context: "streamableHttpTransport", + message: "Sending ping", + }); + + await transport.send({ + jsonrpc: "2.0", + method: "ping", + }); + failedPings = 0; + } catch (err) { + this.logger.warning({ + id: LogId.streamableHttpTransportKeepAliveFailure, + context: "streamableHttpTransport", + message: `Error sending ping (attempt #${failedPings + 1}): ${err instanceof Error ? err.message : String(err)}`, + }); + + if (++failedPings > 3) { + clearInterval(keepAliveLoop); + await transport.close(); + } + } + }, 30_000); }, onsessionclosed: async (sessionId): Promise => { try { @@ -135,6 +165,8 @@ export class StreamableHttpRunner extends TransportRunnerBase { }); transport.onclose = (): void => { + clearInterval(keepAliveLoop); + server.close().catch((error) => { this.logger.error({ id: LogId.streamableHttpTransportCloseFailure, From 6930bf1f49652cd209eb0a31a119e5e08c6915b6 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Thu, 28 Aug 2025 11:41:08 +0200 Subject: [PATCH 2/2] Wrap the transport.close in a try-catch --- src/transports/streamableHttp.ts | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/transports/streamableHttp.ts b/src/transports/streamableHttp.ts index 8fc5458d..1718252c 100644 --- a/src/transports/streamableHttp.ts +++ b/src/transports/streamableHttp.ts @@ -138,15 +138,21 @@ export class StreamableHttpRunner extends TransportRunnerBase { }); failedPings = 0; } catch (err) { - this.logger.warning({ - id: LogId.streamableHttpTransportKeepAliveFailure, - context: "streamableHttpTransport", - message: `Error sending ping (attempt #${failedPings + 1}): ${err instanceof Error ? err.message : String(err)}`, - }); - - if (++failedPings > 3) { - clearInterval(keepAliveLoop); - await transport.close(); + try { + failedPings++; + this.logger.warning({ + id: LogId.streamableHttpTransportKeepAliveFailure, + context: "streamableHttpTransport", + message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`, + }); + + if (failedPings > 3) { + clearInterval(keepAliveLoop); + await transport.close(); + } + } catch { + // Ignore the error of the transport close as there's nothing else + // we can do at this point. } } }, 30_000);