Skip to content

Commit d9c4f8b

Browse files
committed
feat: implement failover handling for non-retriable callbacks and enhance tests for disconnect scenarios
1 parent d081e0d commit d9c4f8b

File tree

4 files changed

+114
-5
lines changed

4 files changed

+114
-5
lines changed

nodejs/src/client/wsConnector.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,20 @@ export class WebSocketConnector {
286286
return this._suppressedSockets.has(conn);
287287
}
288288

289+
private async failNonRetriableCallbacksOnDisconnect(): Promise<void> {
290+
const keepReqIds = new Set<bigint>(
291+
this._inflightStore.getRequests().map((req) => req.reqId)
292+
);
293+
await WsEventCallback.instance().rejectCallbacksExceptReqIds(
294+
keepReqIds,
295+
new TDWebSocketClientError(
296+
ErrorCode.ERR_CONNECTION_CLOSED,
297+
"websocket connection closed before response was received"
298+
),
299+
this._poolKey
300+
);
301+
}
302+
289303
private async handleDisconnect(conn: w3cwebsocket, event?: ICloseEvent): Promise<void> {
290304
if (this.shouldSkipReconnect(conn) || this._isReconnecting) {
291305
return;
@@ -294,6 +308,7 @@ export class WebSocketConnector {
294308
logger.info("Websocket closed normally, skipping reconnect.");
295309
return;
296310
}
311+
await this.failNonRetriableCallbacksOnDisconnect();
297312
try {
298313
await this.triggerReconnect();
299314
} catch (err: unknown) {
@@ -335,10 +350,16 @@ export class WebSocketConnector {
335350
}
336351

337352
const errObj = err as { code?: unknown; message?: unknown };
338-
const code = typeof errObj?.code === "string" ? errObj.code.toLowerCase() : "";
339-
if (code.length > 0 && NETWORK_ERROR_CODES.has(code)) {
353+
const code = errObj?.code;
354+
if (typeof code === "number" && code === ErrorCode.ERR_CONNECTION_CLOSED) {
340355
return true;
341356
}
357+
if (typeof code === "string") {
358+
const loweredCode = code.toLowerCase();
359+
if (loweredCode.length > 0 && NETWORK_ERROR_CODES.has(loweredCode)) {
360+
return true;
361+
}
362+
}
342363

343364
const message = err instanceof Error
344365
? err.message
@@ -539,6 +560,7 @@ export class WebSocketConnector {
539560
action: msg.action,
540561
req_id: reqId,
541562
timeout: this._timeout,
563+
poolKey: this._poolKey,
542564
},
543565
resolveResp,
544566
rejectResp
@@ -644,6 +666,7 @@ export class WebSocketConnector {
644666
req_id: reqId,
645667
timeout: this._timeout,
646668
id: callbackId,
669+
poolKey: this._poolKey,
647670
},
648671
safeResolve,
649672
safeReject

nodejs/src/client/wsEventCallback.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ interface MessageId {
1212
req_id: bigint;
1313
id?: bigint;
1414
timeout?: number;
15+
poolKey?: string;
1516
}
1617

1718
interface MessageAction {
@@ -156,4 +157,34 @@ export class WsEventCallback {
156157
release();
157158
}
158159
}
160+
161+
async rejectCallbacksExceptReqIds(
162+
keepReqIds: Set<bigint>,
163+
error: Error,
164+
poolKey?: string
165+
): Promise<void> {
166+
const toReject: Function[] = [];
167+
const release = await eventMutex.acquire();
168+
try {
169+
for (const [k, v] of WsEventCallback._msgActionRegister) {
170+
if (poolKey !== undefined && k.poolKey !== poolKey) {
171+
continue;
172+
}
173+
const keepByReqId = keepReqIds.has(k.req_id);
174+
const keepByCallbackId = k.id !== undefined && keepReqIds.has(k.id);
175+
if (keepByReqId || keepByCallbackId) {
176+
continue;
177+
}
178+
clearTimeout(v.timer);
179+
WsEventCallback._msgActionRegister.delete(k);
180+
toReject.push(v.reject);
181+
}
182+
} finally {
183+
release();
184+
}
185+
186+
for (const reject of toReject) {
187+
reject(error);
188+
}
189+
}
159190
}

nodejs/test/bulkPulling/stmt2.func.test.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,6 @@ describe("TDWebSocket.Stmt()", () => {
323323
"('2024-12-19 17:12:47.642', 72.30000, 206, 0.31000) ";
324324
await wsSql.exec(insertQuery);
325325

326-
// let result = await wsSql.exec("select * from query_meters")
327-
// console.log(result)
328-
329326
let stmt = await wsSql.stmtInit();
330327
expect(stmt).toBeTruthy();
331328
expect(stmt).toBeInstanceOf(WsStmt2);

nodejs/test/bulkPulling/wsConnector.failover.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,64 @@ describe("WebSocketConnector failover and retry", () => {
173173
expect(connector.triggerReconnect).toHaveBeenCalledTimes(1);
174174
});
175175

176+
test("handleDisconnect rejects non-retriable pending callbacks immediately", async () => {
177+
const connector = createBareConnector();
178+
connector.triggerReconnect = jest.fn(() => new Promise<void>(() => { }));
179+
connector._conn.send = jest.fn(() => { });
180+
181+
const pending = connector.sendMsg(
182+
JSON.stringify({
183+
action: "query",
184+
args: {
185+
req_id: 1201,
186+
},
187+
})
188+
);
189+
void connector.handleDisconnect(
190+
connector._conn,
191+
{ code: 1006, reason: "abnormal close" }
192+
);
193+
194+
const state = await Promise.race([
195+
pending.then(() => "resolved").catch(() => "rejected"),
196+
delay(40).then(() => "pending"),
197+
]);
198+
199+
expect(state).toBe("rejected");
200+
expect(connector.triggerReconnect).toHaveBeenCalledTimes(1);
201+
expect(hasInflightRequest(connector, 1201n)).toBe(false);
202+
});
203+
204+
test("handleDisconnect keeps retriable pending callbacks for replay", async () => {
205+
const connector = createBareConnector();
206+
connector.triggerReconnect = jest.fn(() => new Promise<void>(() => { }));
207+
connector._conn.send = jest.fn(() => { });
208+
209+
const pending = connector.sendMsg(
210+
JSON.stringify({
211+
action: "insert",
212+
args: {
213+
req_id: 1202,
214+
},
215+
})
216+
);
217+
void pending.catch(() => { });
218+
void connector.handleDisconnect(
219+
connector._conn,
220+
{ code: 1006, reason: "abnormal close" }
221+
);
222+
223+
const state = await Promise.race([
224+
pending.then(() => "resolved").catch(() => "rejected"),
225+
delay(40).then(() => "pending"),
226+
]);
227+
228+
expect(state).toBe("pending");
229+
expect(connector.triggerReconnect).toHaveBeenCalledTimes(1);
230+
expect(hasInflightRequest(connector, 1202n)).toBe(true);
231+
connector.failAllInflightRequests(new Error("cleanup"));
232+
});
233+
176234
test("_doReconnect fails all inflight requests when reconnect fails", async () => {
177235
const connector = createBareConnector();
178236
const rejectSpy = jest.fn();

0 commit comments

Comments
 (0)