diff --git a/src/common/logger.ts b/src/common/logger.ts index 1cdd0c4a..0bfafbb2 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -51,6 +51,8 @@ export const LogId = { streamableHttpTransportSessionCloseNotificationFailure: mongoLogId(1_006_004), streamableHttpTransportRequestFailure: mongoLogId(1_006_005), streamableHttpTransportCloseFailure: mongoLogId(1_006_006), + streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007), + streamableHttpTransportKeepAlive: mongoLogId(1_006_008), exportCleanupError: mongoLogId(1_007_001), exportCreationError: mongoLogId(1_007_002), diff --git a/src/index.ts b/src/index.ts index 31f39346..b1ac4b48 100644 --- a/src/index.ts +++ b/src/index.ts @@ -91,8 +91,10 @@ async function main(): Promise { transportRunner.logger.info({ id: LogId.serverCloseRequested, context: "server", - message: "Closing server", + message: `Closing server due to error: ${error as string}`, + noRedaction: true, }); + try { await transportRunner.close(); transportRunner.logger.info({ diff --git a/src/transports/streamableHttp.ts b/src/transports/streamableHttp.ts index 74ad3062..1718252c 100644 --- a/src/transports/streamableHttp.ts +++ b/src/transports/streamableHttp.ts @@ -74,7 +74,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { jsonrpc: "2.0", error: { code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID, - message: `session id is invalid`, + message: "session id is invalid", }, }); return; @@ -85,7 +85,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { jsonrpc: "2.0", error: { code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND, - message: `session not found`, + message: "session not found", }, }); return; @@ -114,12 +114,48 @@ export class StreamableHttpRunner extends TransportRunnerBase { } const server = this.setupServer(); + let keepAliveLoop: NodeJS.Timeout; const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: (): string => randomUUID().toString(), onsessioninitialized: (sessionId): void => { server.session.logger.setAttribute("sessionId", sessionId); this.sessionStore.setSession(sessionId, transport, server.session.logger); + + let failedPings = 0; + // eslint-disable-next-line @typescript-eslint/no-misused-promises + keepAliveLoop = setInterval(async () => { + try { + this.logger.debug({ + id: LogId.streamableHttpTransportKeepAlive, + context: "streamableHttpTransport", + message: "Sending ping", + }); + + await transport.send({ + jsonrpc: "2.0", + method: "ping", + }); + failedPings = 0; + } catch (err) { + try { + failedPings++; + this.logger.warning({ + id: LogId.streamableHttpTransportKeepAliveFailure, + context: "streamableHttpTransport", + message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`, + }); + + if (failedPings > 3) { + clearInterval(keepAliveLoop); + await transport.close(); + } + } catch { + // Ignore the error of the transport close as there's nothing else + // we can do at this point. + } + } + }, 30_000); }, onsessionclosed: async (sessionId): Promise => { try { @@ -135,6 +171,8 @@ export class StreamableHttpRunner extends TransportRunnerBase { }); transport.onclose = (): void => { + clearInterval(keepAliveLoop); + server.close().catch((error) => { this.logger.error({ id: LogId.streamableHttpTransportCloseFailure,