Skip to content

Commit b9faec3

Browse files
committed
Merge branch 'ihrpr/fix-streamable-connection-close' into ihrpr/examples
2 parents 6c6df5e + cc7f8e4 commit b9faec3

File tree

1 file changed

+34
-34
lines changed

1 file changed

+34
-34
lines changed

src/server/streamableHttp.test.ts

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -842,8 +842,8 @@ describe("StreamableHTTPServerTransport", () => {
842842
},
843843
body: JSON.stringify(initMessage),
844844
});
845-
846-
await transport.handleRequest(initReq, mockResponse);
845+
const initResponse = createMockResponse();
846+
await transport.handleRequest(initReq, initResponse);
847847
mockResponse.writeHead.mockClear();
848848
});
849849

@@ -934,17 +934,17 @@ describe("StreamableHTTPServerTransport", () => {
934934
// Now stream should be closed
935935
expect(mockResponse.end).toHaveBeenCalled();
936936
});
937-
937+
938938
it("should keep stream open when multiple requests share the same connection", async () => {
939939
// Create a fresh response for this test
940940
const sharedResponse = createMockResponse();
941-
941+
942942
// Send two requests in a batch that will share the same connection
943943
const batchRequests: JSONRPCMessage[] = [
944944
{ jsonrpc: "2.0", method: "method1", params: {}, id: "req1" },
945945
{ jsonrpc: "2.0", method: "method2", params: {}, id: "req2" }
946946
];
947-
947+
948948
const req = createMockRequest({
949949
method: "POST",
950950
headers: {
@@ -954,40 +954,40 @@ describe("StreamableHTTPServerTransport", () => {
954954
},
955955
body: JSON.stringify(batchRequests)
956956
});
957-
957+
958958
await transport.handleRequest(req, sharedResponse);
959-
959+
960960
// Respond to first request
961961
const response1: JSONRPCMessage = {
962-
jsonrpc: "2.0",
962+
jsonrpc: "2.0",
963963
result: { value: "result1" },
964964
id: "req1"
965965
};
966-
966+
967967
await transport.send(response1);
968-
968+
969969
// Connection should remain open because req2 is still pending
970970
expect(sharedResponse.write).toHaveBeenCalledWith(
971971
expect.stringContaining(`event: message\ndata: ${JSON.stringify(response1)}\n\n`)
972972
);
973973
expect(sharedResponse.end).not.toHaveBeenCalled();
974-
974+
975975
// Respond to second request
976976
const response2: JSONRPCMessage = {
977977
jsonrpc: "2.0",
978978
result: { value: "result2" },
979979
id: "req2"
980980
};
981-
981+
982982
await transport.send(response2);
983-
983+
984984
// Now connection should close as all requests are complete
985985
expect(sharedResponse.write).toHaveBeenCalledWith(
986-
expect.stringContaining(`event: message\ndata: ${JSON.stringify(response2)}\n\n`)
986+
expect.stringContaining(`event: message\ndata: ${JSON.stringify(response2)}\n\n`)
987987
);
988988
expect(sharedResponse.end).toHaveBeenCalled();
989989
});
990-
990+
991991
it("should clean up connection tracking when a response is sent", async () => {
992992
const req = createMockRequest({
993993
method: "POST",
@@ -997,32 +997,32 @@ describe("StreamableHTTPServerTransport", () => {
997997
"mcp-session-id": transport.sessionId
998998
},
999999
body: JSON.stringify({
1000-
jsonrpc: "2.0",
1001-
method: "test",
1002-
params: {},
1000+
jsonrpc: "2.0",
1001+
method: "test",
1002+
params: {},
10031003
id: "cleanup-test"
10041004
})
10051005
});
1006-
1006+
10071007
const response = createMockResponse();
10081008
await transport.handleRequest(req, response);
1009-
1009+
10101010
// Verify that the request is tracked in the SSE map
1011-
expect(transport["_sseResponseMapping"].size).toBe(1);
1011+
expect(transport["_sseResponseMapping"].size).toBe(2);
10121012
expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(true);
1013-
1013+
10141014
// Send a response
10151015
await transport.send({
10161016
jsonrpc: "2.0",
10171017
result: {},
10181018
id: "cleanup-test"
10191019
});
1020-
1020+
10211021
// Verify that the mapping was cleaned up
1022-
expect(transport["_sseResponseMapping"].size).toBe(0);
1022+
expect(transport["_sseResponseMapping"].size).toBe(1);
10231023
expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(false);
10241024
});
1025-
1025+
10261026
it("should clean up connection tracking when client disconnects", async () => {
10271027
// Setup two requests that share a connection
10281028
const req = createMockRequest({
@@ -1034,12 +1034,12 @@ describe("StreamableHTTPServerTransport", () => {
10341034
},
10351035
body: JSON.stringify([
10361036
{ jsonrpc: "2.0", method: "longRunning1", params: {}, id: "req1" },
1037-
{ jsonrpc: "2.0", method: "longRunning2", params: {}, id: "req2" }
1037+
{ jsonrpc: "2.0", method: "longRunning2", params: {}, id: "req2" }
10381038
])
10391039
});
1040-
1040+
10411041
const response = createMockResponse();
1042-
1042+
10431043
// We need to manually store the callback to trigger it later
10441044
let closeCallback: (() => void) | undefined;
10451045
response.on.mockImplementation((event, callback: () => void) => {
@@ -1048,19 +1048,19 @@ describe("StreamableHTTPServerTransport", () => {
10481048
}
10491049
return response;
10501050
});
1051-
1051+
10521052
await transport.handleRequest(req, response);
1053-
1053+
10541054
// Both requests should be mapped to the same response
1055-
expect(transport["_sseResponseMapping"].size).toBe(2);
1055+
expect(transport["_sseResponseMapping"].size).toBe(3);
10561056
expect(transport["_sseResponseMapping"].get("req1")).toBe(response);
10571057
expect(transport["_sseResponseMapping"].get("req2")).toBe(response);
1058-
1058+
10591059
// Simulate client disconnect by triggering the stored callback
10601060
if (closeCallback) closeCallback();
1061-
1061+
10621062
// All entries using this response should be removed
1063-
expect(transport["_sseResponseMapping"].size).toBe(0);
1063+
expect(transport["_sseResponseMapping"].size).toBe(1);
10641064
expect(transport["_sseResponseMapping"].has("req1")).toBe(false);
10651065
expect(transport["_sseResponseMapping"].has("req2")).toBe(false);
10661066
});

0 commit comments

Comments
 (0)