Skip to content

Commit 425db3a

Browse files
committed
feat: enhance WebSocketConnector to exclude failed addresses during reconnection attempts and update related tests
1 parent b8e4909 commit 425db3a

File tree

2 files changed

+75
-9
lines changed

2 files changed

+75
-9
lines changed

nodejs/src/client/wsConnector.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,10 +403,15 @@ export class WebSocketConnector {
403403
return `${this._currentAddress.host}:${this._currentAddress.port}`;
404404
}
405405

406-
private selectLeastConnectedAddress(): Address {
406+
private selectLeastConnectedAddress(excludedAddresses: Set<string> = new Set()): Address {
407+
const candidates = this._dsn.addresses.filter((address) => {
408+
const addressKey = `${address.host}:${address.port}`;
409+
return !excludedAddresses.has(addressKey);
410+
});
411+
const selectableAddresses = candidates.length > 0 ? candidates : this._dsn.addresses;
407412
const selectedIndex = AddressConnectionTracker.instance()
408-
.selectLeastConnected(this._dsn.addresses);
409-
return this._dsn.addresses[selectedIndex];
413+
.selectLeastConnected(selectableAddresses);
414+
return selectableAddresses[selectedIndex];
410415
}
411416

412417
private async sleep(ms: number): Promise<void> {
@@ -455,6 +460,7 @@ export class WebSocketConnector {
455460

456461
private async attemptReconnect(): Promise<void> {
457462
const totalAddresses = this._dsn.addresses.length;
463+
const failedAddresses = new Set<string>();
458464

459465
for (let i = 0; i < totalAddresses; i++) {
460466
for (let retry = 0; retry < this._retryConfig.retries; retry++) {
@@ -475,7 +481,8 @@ export class WebSocketConnector {
475481
}
476482

477483
if (i < totalAddresses - 1) {
478-
this._currentAddress = this.selectLeastConnectedAddress();
484+
failedAddresses.add(this.getCurrentAddress());
485+
this._currentAddress = this.selectLeastConnectedAddress(failedAddresses);
479486
logger.info(`Switching to least-connected address: ${this.getCurrentAddress()}`);
480487
}
481488
}

nodejs/test/client/wsConnector.failover.test.ts

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ function createInflightStore(): any {
3636
};
3737
}
3838

39-
function createBareConnector(): any {
39+
function createBareConnector(
40+
dsn: string = "ws://root:taosdata@host1:6041,host2:6042"
41+
): any {
4042
const connector = Object.create(WebSocketConnector.prototype) as any;
4143
connector._timeout = 5000;
42-
connector._dsn = parse("ws://root:taosdata@host1:6041,host2:6042");
44+
connector._dsn = parse(dsn);
4345
connector._currentAddress = connector._dsn.addresses[0];
4446
connector._retryConfig = new RetryConfig(1, 1, 8);
4547
connector._reconnectLock = null;
@@ -117,7 +119,7 @@ describe("WebSocketConnector failover and retry", () => {
117119
const connector = createBareConnector();
118120
const leastSelector = jest
119121
.spyOn(AddressConnectionTracker.instance(), "selectLeastConnected")
120-
.mockReturnValue(1);
122+
.mockImplementation(() => 0);
121123
const attempts: string[] = [];
122124
connector.sleep = jest.fn(async () => { });
123125
connector.reconnect = jest.fn(async () => {
@@ -135,16 +137,73 @@ describe("WebSocketConnector failover and retry", () => {
135137
"host1:6041",
136138
"host2:6042",
137139
]);
138-
expect(leastSelector).toHaveBeenCalledWith(connector._dsn.addresses);
140+
expect(leastSelector).toHaveBeenCalledWith([
141+
connector._dsn.addresses[1],
142+
]);
139143
expect(`${connector._currentAddress.host}:${connector._currentAddress.port}`)
140144
.toBe("host2:6042");
141145
});
142146

147+
test("attemptReconnect does not reselect failed addresses in one reconnect round", async () => {
148+
const connector = createBareConnector(
149+
"ws://root:taosdata@host1:6041,host2:6042,host3:6043"
150+
);
151+
const leastSelector = jest
152+
.spyOn(AddressConnectionTracker.instance(), "selectLeastConnected")
153+
.mockImplementation(() => 0);
154+
const attempts: string[] = [];
155+
connector.sleep = jest.fn(async () => { });
156+
connector.reconnect = jest.fn(async () => {
157+
const current = connector._currentAddress;
158+
attempts.push(`${current.host}:${current.port}`);
159+
throw new Error("all down");
160+
});
161+
162+
await expect(connector.attemptReconnect()).rejects.toThrow(
163+
"Failed to reconnect to any available address"
164+
);
165+
166+
expect(attempts).toEqual([
167+
"host1:6041",
168+
"host2:6042",
169+
"host3:6043",
170+
]);
171+
expect(leastSelector).toHaveBeenNthCalledWith(1, [
172+
connector._dsn.addresses[1],
173+
connector._dsn.addresses[2],
174+
]);
175+
expect(leastSelector).toHaveBeenNthCalledWith(2, [
176+
connector._dsn.addresses[2],
177+
]);
178+
});
179+
180+
test("attemptReconnect keeps retrying same address for single-address dsn", async () => {
181+
const connector = createBareConnector("ws://root:taosdata@host1:6041");
182+
connector._retryConfig = new RetryConfig(3, 1, 8);
183+
connector.sleep = jest.fn(async () => { });
184+
const attempts: string[] = [];
185+
connector.reconnect = jest.fn(async () => {
186+
const current = connector._currentAddress;
187+
attempts.push(`${current.host}:${current.port}`);
188+
throw new Error("host1 down");
189+
});
190+
191+
await expect(connector.attemptReconnect()).rejects.toThrow(
192+
"Failed to reconnect to any available address"
193+
);
194+
195+
expect(attempts).toEqual([
196+
"host1:6041",
197+
"host1:6041",
198+
"host1:6041",
199+
]);
200+
});
201+
143202
test("attemptReconnect throws after all addresses and retries are exhausted", async () => {
144203
const connector = createBareConnector();
145204
jest
146205
.spyOn(AddressConnectionTracker.instance(), "selectLeastConnected")
147-
.mockReturnValue(1);
206+
.mockImplementation(() => 0);
148207
connector.sleep = jest.fn(async () => { });
149208
connector.reconnect = jest.fn(async () => {
150209
throw new Error("all down");

0 commit comments

Comments
 (0)