Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export const LogId = {
streamableHttpTransportSessionCloseNotificationFailure: mongoLogId(1_006_004),
streamableHttpTransportRequestFailure: mongoLogId(1_006_005),
streamableHttpTransportCloseFailure: mongoLogId(1_006_006),
streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007),
streamableHttpTransportKeepAlive: mongoLogId(1_006_008),

exportCleanupError: mongoLogId(1_007_001),
exportCreationError: mongoLogId(1_007_002),
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ async function main(): Promise<void> {
transportRunner.logger.info({
id: LogId.serverCloseRequested,
context: "server",
message: "Closing server",
message: `Closing server due to error: ${error as string}`,
noRedaction: true,
});

try {
await transportRunner.close();
transportRunner.logger.info({
Expand Down
42 changes: 40 additions & 2 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
message: `session id is invalid`,
message: "session id is invalid",
},
});
return;
Expand All @@ -85,7 +85,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
message: `session not found`,
message: "session not found",
},
});
return;
Expand Down Expand Up @@ -114,12 +114,48 @@ export class StreamableHttpRunner extends TransportRunnerBase {
}

const server = this.setupServer();
let keepAliveLoop: NodeJS.Timeout;
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: (): string => randomUUID().toString(),
onsessioninitialized: (sessionId): void => {
server.session.logger.setAttribute("sessionId", sessionId);

this.sessionStore.setSession(sessionId, transport, server.session.logger);

let failedPings = 0;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
this.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
});

await transport.send({
jsonrpc: "2.0",
method: "ping",
});
failedPings = 0;
} catch (err) {
try {
failedPings++;
this.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
});

if (failedPings > 3) {
clearInterval(keepAliveLoop);
await transport.close();
}
} catch {
// Ignore the error of the transport close as there's nothing else
// we can do at this point.
}
}
}, 30_000);
},
onsessionclosed: async (sessionId): Promise<void> => {
try {
Expand All @@ -135,6 +171,8 @@ export class StreamableHttpRunner extends TransportRunnerBase {
});

transport.onclose = (): void => {
clearInterval(keepAliveLoop);

server.close().catch((error) => {
this.logger.error({
id: LogId.streamableHttpTransportCloseFailure,
Expand Down
Loading