Skip to content

Commit 78f63c3

Browse files
committed
feat: improve callback registration handling during connection closure in WebSocketConnector
1 parent ac14bfa commit 78f63c3

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

nodejs/src/client/wsConnector.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,11 @@ export class WebSocketConnector {
717717
safeReject
718718
);
719719

720+
if (settled) {
721+
await WsEventCallback.instance().unregisterCallback(reqId);
722+
return;
723+
}
724+
720725
try {
721726
this.send(message);
722727
} catch (err) {

nodejs/test/client/wsConnector.failover.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,21 @@ function resetAddressTracker(): void {
8989
(tracker as any)._counts.clear();
9090
}
9191

92+
function resetCallbackRegistry(): void {
93+
const CallbackClass = WsEventCallback as any;
94+
CallbackClass._msgActionRegister = new Map();
95+
}
96+
9297
describe("WebSocketConnector failover and retry", () => {
9398
beforeEach(() => {
9499
resetAddressTracker();
100+
resetCallbackRegistry();
95101
});
96102

97103
afterEach(() => {
98104
jest.restoreAllMocks();
99105
resetAddressTracker();
106+
resetCallbackRegistry();
100107
});
101108

102109
test("deduplicates concurrent reconnect triggers with reconnect lock", async () => {
@@ -387,6 +394,37 @@ describe("WebSocketConnector failover and retry", () => {
387394
expect(hasInflightRequest(connector, 401n)).toBe(false);
388395
});
389396

397+
test("close during callback registration does not leave callback entry until timeout", async () => {
398+
const connector = createBareConnector();
399+
connector._conn.send = jest.fn();
400+
401+
const callback = WsEventCallback.instance();
402+
const originalRegister = callback.registerCallback.bind(callback);
403+
jest
404+
.spyOn(callback, "registerCallback")
405+
.mockImplementation(async (id: any, res: any, rej: any) => {
406+
await delay(20);
407+
await originalRegister(id, res, rej);
408+
});
409+
410+
const pending = connector.sendMsg(
411+
JSON.stringify({
412+
action: "insert",
413+
args: {
414+
req_id: 402,
415+
},
416+
})
417+
);
418+
void pending.catch(() => { });
419+
420+
await delay(5);
421+
connector.close();
422+
await delay(30);
423+
424+
expect((WsEventCallback as any)._msgActionRegister.size).toBe(0);
425+
expect(connector._conn.send).not.toHaveBeenCalled();
426+
});
427+
390428
test("reconnect decrements current address before closing old socket", async () => {
391429
const connector = createBareConnector();
392430
const decrementSpy = jest

0 commit comments

Comments
 (0)