Skip to content

Commit ef8dfb9

Browse files
committed
feat: enhance reconnect logic and add tests for aborting retries on close
1 parent 1148912 commit ef8dfb9

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

nodejs/src/client/wsConnector.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ export class WebSocketConnector {
295295
}
296296

297297
private shouldSkipReconnect(conn: w3cwebsocket): boolean {
298-
if (!this._allowReconnect) {
298+
if (!this.isReconnectAllowed()) {
299299
return true;
300300
}
301301
return this._suppressedSockets.has(conn);
@@ -439,7 +439,14 @@ export class WebSocketConnector {
439439
await new Promise((resolve) => setTimeout(resolve, ms));
440440
}
441441

442+
private isReconnectAllowed(): boolean {
443+
return this._allowReconnect;
444+
}
445+
442446
private async triggerReconnect(): Promise<void> {
447+
if (!this.isReconnectAllowed()) {
448+
return;
449+
}
443450
if (!this._reconnectLock) {
444451
this._reconnectLock = this._doReconnect();
445452
}
@@ -455,6 +462,9 @@ export class WebSocketConnector {
455462
}
456463

457464
private async _doReconnect(): Promise<void> {
465+
if (!this.isReconnectAllowed()) {
466+
return;
467+
}
458468
this._isReconnecting = true;
459469
try {
460470
await this.attemptReconnect();
@@ -485,6 +495,9 @@ export class WebSocketConnector {
485495

486496
for (let i = 0; i < totalAddresses; i++) {
487497
for (let retry = 0; retry < this._retryConfig.retries; retry++) {
498+
if (!this.isReconnectAllowed()) {
499+
return;
500+
}
488501
try {
489502
logger.info(`Reconnecting to ${this.getCurrentAddress()}, attempt ${retry + 1}`);
490503
await this.reconnect();
@@ -539,16 +552,19 @@ export class WebSocketConnector {
539552
}
540553

541554
close() {
555+
this._allowReconnect = false;
556+
this.failAllInflightRequests(
557+
new TDWebSocketClientError(
558+
ErrorCode.ERR_CONNECTION_CLOSED,
559+
"websocket connection closed"
560+
)
561+
);
542562
if (this._conn) {
543-
this._allowReconnect = false;
544563
AddressConnectionTracker.instance().decrement(this.getCurrentAddress());
545564
this._suppressedSockets.add(this._conn);
546565
this._conn.close();
547566
} else {
548-
throw new TDWebSocketClientError(
549-
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
550-
"WebSocket connection is undefined"
551-
);
567+
logger.warn("close() called but websocket connection is undefined");
552568
}
553569
}
554570

nodejs/test/client/wsConnector.failover.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,49 @@ describe("WebSocketConnector failover and retry", () => {
344344
expect(connector._suppressedSockets.has(connector._conn)).toBe(true);
345345
});
346346

347+
test("attemptReconnect aborts silently when close is called mid-retry", async () => {
348+
const connector = createBareConnector("ws://root:taosdata@host1:6041");
349+
connector._retryConfig = new RetryConfig(3, 1, 8);
350+
connector.reconnect = jest.fn(async () => {
351+
throw new Error("host down");
352+
});
353+
connector.sleep = jest.fn(async () => {
354+
connector.close();
355+
});
356+
357+
await expect(connector.attemptReconnect()).resolves.toBeUndefined();
358+
expect(connector.reconnect).toHaveBeenCalledTimes(1);
359+
});
360+
361+
test("close rejects retriable inflight request immediately", async () => {
362+
const connector = createBareConnector();
363+
connector.triggerReconnect = jest.fn(() => new Promise<void>(() => { }));
364+
connector._conn.send = jest.fn(() => {
365+
throw new Error("cannot call send() while not connected");
366+
});
367+
368+
const pending = connector.sendMsg(
369+
JSON.stringify({
370+
action: "insert",
371+
args: {
372+
req_id: 401,
373+
},
374+
})
375+
);
376+
void pending.catch(() => { });
377+
378+
await delay(20);
379+
connector.close();
380+
381+
const state = await Promise.race([
382+
pending.then(() => "resolved").catch(() => "rejected"),
383+
delay(40).then(() => "pending"),
384+
]);
385+
386+
expect(state).toBe("rejected");
387+
expect(hasInflightRequest(connector, 401n)).toBe(false);
388+
});
389+
347390
test("reconnect decrements current address before closing old socket", async () => {
348391
const connector = createBareConnector();
349392
const decrementSpy = jest

0 commit comments

Comments
 (0)