Skip to content

Commit 31acdcb

Browse files
authored
fix: prevent responses being sent to wrong client when multiple transports connect (#820)
1 parent ee16173 commit 31acdcb

File tree

2 files changed

+196
-4
lines changed

2 files changed

+196
-4
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import { describe, expect, test, beforeEach } from "@jest/globals";
2+
import { Protocol } from "./protocol.js";
3+
import { Transport } from "./transport.js";
4+
import { Request, Notification, Result, JSONRPCMessage } from "../types.js";
5+
import { z } from "zod";
6+
7+
// Mock Transport class
8+
class MockTransport implements Transport {
9+
id: string;
10+
onclose?: () => void;
11+
onerror?: (error: Error) => void;
12+
onmessage?: (message: unknown) => void;
13+
sentMessages: JSONRPCMessage[] = [];
14+
15+
constructor(id: string) {
16+
this.id = id;
17+
}
18+
19+
async start(): Promise<void> {}
20+
21+
async close(): Promise<void> {
22+
this.onclose?.();
23+
}
24+
25+
async send(message: JSONRPCMessage): Promise<void> {
26+
this.sentMessages.push(message);
27+
}
28+
}
29+
30+
describe("Protocol transport handling bug", () => {
31+
let protocol: Protocol<Request, Notification, Result>;
32+
let transportA: MockTransport;
33+
let transportB: MockTransport;
34+
35+
beforeEach(() => {
36+
protocol = new (class extends Protocol<Request, Notification, Result> {
37+
protected assertCapabilityForMethod(): void {}
38+
protected assertNotificationCapability(): void {}
39+
protected assertRequestHandlerCapability(): void {}
40+
})();
41+
42+
transportA = new MockTransport("A");
43+
transportB = new MockTransport("B");
44+
});
45+
46+
test("should send response to the correct transport when multiple clients are connected", async () => {
47+
// Set up a request handler that simulates processing time
48+
let resolveHandler: (value: Result) => void;
49+
const handlerPromise = new Promise<Result>((resolve) => {
50+
resolveHandler = resolve;
51+
});
52+
53+
const TestRequestSchema = z.object({
54+
method: z.literal("test/method"),
55+
params: z.object({
56+
from: z.string()
57+
}).optional()
58+
});
59+
60+
protocol.setRequestHandler(
61+
TestRequestSchema,
62+
async (request) => {
63+
console.log(`Processing request from ${request.params?.from}`);
64+
return handlerPromise;
65+
}
66+
);
67+
68+
// Client A connects and sends a request
69+
await protocol.connect(transportA);
70+
71+
const requestFromA = {
72+
jsonrpc: "2.0" as const,
73+
method: "test/method",
74+
params: { from: "clientA" },
75+
id: 1
76+
};
77+
78+
// Simulate client A sending a request
79+
transportA.onmessage?.(requestFromA);
80+
81+
// While A's request is being processed, client B connects
82+
// This overwrites the transport reference in the protocol
83+
await protocol.connect(transportB);
84+
85+
const requestFromB = {
86+
jsonrpc: "2.0" as const,
87+
method: "test/method",
88+
params: { from: "clientB" },
89+
id: 2
90+
};
91+
92+
// Client B sends its own request
93+
transportB.onmessage?.(requestFromB);
94+
95+
// Now complete A's request
96+
resolveHandler!({ data: "responseForA" } as Result);
97+
98+
// Wait for async operations to complete
99+
await new Promise(resolve => setTimeout(resolve, 10));
100+
101+
// Check where the responses went
102+
console.log("Transport A received:", transportA.sentMessages);
103+
console.log("Transport B received:", transportB.sentMessages);
104+
105+
// FIXED: Each transport now receives its own response
106+
107+
// Transport A should receive response for request ID 1
108+
expect(transportA.sentMessages.length).toBe(1);
109+
expect(transportA.sentMessages[0]).toMatchObject({
110+
jsonrpc: "2.0",
111+
id: 1,
112+
result: { data: "responseForA" }
113+
});
114+
115+
// Transport B should only receive its own response (when implemented)
116+
expect(transportB.sentMessages.length).toBe(1);
117+
expect(transportB.sentMessages[0]).toMatchObject({
118+
jsonrpc: "2.0",
119+
id: 2,
120+
result: { data: "responseForA" } // Same handler result in this test
121+
});
122+
});
123+
124+
test("demonstrates the timing issue with multiple rapid connections", async () => {
125+
const delays: number[] = [];
126+
const results: { transport: string; response: JSONRPCMessage[] }[] = [];
127+
128+
const DelayedRequestSchema = z.object({
129+
method: z.literal("test/delayed"),
130+
params: z.object({
131+
delay: z.number(),
132+
client: z.string()
133+
}).optional()
134+
});
135+
136+
// Set up handler with variable delay
137+
protocol.setRequestHandler(
138+
DelayedRequestSchema,
139+
async (request, extra) => {
140+
const delay = request.params?.delay || 0;
141+
delays.push(delay);
142+
143+
await new Promise(resolve => setTimeout(resolve, delay));
144+
145+
return {
146+
processedBy: `handler-${extra.requestId}`,
147+
delay: delay
148+
} as Result;
149+
}
150+
);
151+
152+
// Rapid succession of connections and requests
153+
await protocol.connect(transportA);
154+
transportA.onmessage?.({
155+
jsonrpc: "2.0" as const,
156+
method: "test/delayed",
157+
params: { delay: 50, client: "A" },
158+
id: 1
159+
});
160+
161+
// Connect B while A is processing
162+
setTimeout(async () => {
163+
await protocol.connect(transportB);
164+
transportB.onmessage?.({
165+
jsonrpc: "2.0" as const,
166+
method: "test/delayed",
167+
params: { delay: 10, client: "B" },
168+
id: 2
169+
});
170+
}, 10);
171+
172+
// Wait for all processing
173+
await new Promise(resolve => setTimeout(resolve, 100));
174+
175+
// Collect results
176+
if (transportA.sentMessages.length > 0) {
177+
results.push({ transport: "A", response: transportA.sentMessages });
178+
}
179+
if (transportB.sentMessages.length > 0) {
180+
results.push({ transport: "B", response: transportB.sentMessages });
181+
}
182+
183+
console.log("Timing test results:", results);
184+
185+
// FIXED: Each transport receives its own responses
186+
expect(transportA.sentMessages.length).toBe(1);
187+
expect(transportB.sentMessages.length).toBe(1);
188+
});
189+
});

src/shared/protocol.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,11 @@ export abstract class Protocol<
370370
const handler =
371371
this._requestHandlers.get(request.method) ?? this.fallbackRequestHandler;
372372

373+
// Capture the current transport at request time to ensure responses go to the correct client
374+
const capturedTransport = this._transport;
375+
373376
if (handler === undefined) {
374-
this._transport
377+
capturedTransport
375378
?.send({
376379
jsonrpc: "2.0",
377380
id: request.id,
@@ -393,7 +396,7 @@ export abstract class Protocol<
393396

394397
const fullExtra: RequestHandlerExtra<SendRequestT, SendNotificationT> = {
395398
signal: abortController.signal,
396-
sessionId: this._transport?.sessionId,
399+
sessionId: capturedTransport?.sessionId,
397400
_meta: request.params?._meta,
398401
sendNotification:
399402
(notification) =>
@@ -414,7 +417,7 @@ export abstract class Protocol<
414417
return;
415418
}
416419

417-
return this._transport?.send({
420+
return capturedTransport?.send({
418421
result,
419422
jsonrpc: "2.0",
420423
id: request.id,
@@ -425,7 +428,7 @@ export abstract class Protocol<
425428
return;
426429
}
427430

428-
return this._transport?.send({
431+
return capturedTransport?.send({
429432
jsonrpc: "2.0",
430433
id: request.id,
431434
error: {

0 commit comments

Comments
 (0)