Skip to content

Commit 4ffd2bc

Browse files
committed
feat: enhance session recovery and retry configuration handling in WebSocket client
1 parent 9be7e1b commit 4ffd2bc

File tree

7 files changed

+139
-11
lines changed

7 files changed

+139
-11
lines changed

nodejs/src/client/wsClient.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,20 @@ export class WsClient {
9191
return this._dsn.endpoint === WS_SQL_ENDPOINT;
9292
}
9393

94+
private normalizeConnectedDatabase(database?: string | null): string | null {
95+
if (database && database.length > 0) {
96+
return database;
97+
}
98+
return this.isSqlPath() ? "information_schema" : null;
99+
}
100+
94101
private async recoverSqlSessionContext(): Promise<void> {
95102
if (!this._wsConnector) {
96103
return;
97104
}
98-
const connMsg = this.buildConnMessage(this._connectedDatabase);
105+
const connMsg = this.buildConnMessage(
106+
this.normalizeConnectedDatabase(this._connectedDatabase)
107+
);
99108
await this.sendMsgDirect(JSON.stringify(connMsg), false);
100109

101110
if (this._connectionOptions.size <= 0) {
@@ -138,14 +147,14 @@ export class WsClient {
138147
);
139148
this.bindReconnectRecoveryHook();
140149
if (this._wsConnector.readyState() === w3cwebsocket.OPEN) {
141-
this._connectedDatabase = database ?? null;
150+
this._connectedDatabase = this.normalizeConnectedDatabase(database ?? null);
142151
return;
143152
}
144153
try {
145154
await this._wsConnector.ready();
146155
let result: any = await this._wsConnector.sendMsg(JSON.stringify(connMsg));
147156
if (result.msg.code == 0) {
148-
this._connectedDatabase = database ?? null;
157+
this._connectedDatabase = this.normalizeConnectedDatabase(database ?? null);
149158
return;
150159
}
151160
await this.close();

nodejs/src/client/wsConnector.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ export class WebSocketConnector {
155155
private readonly _poolKey: string;
156156
private readonly _dsn: Dsn;
157157
private _currentAddress: Address;
158-
private readonly _retryConfig: RetryConfig;
158+
private _retryConfig: RetryConfig;
159159
private _inflightStore: InflightRequestStore;
160160
private readonly _suppressedSockets: WeakSet<w3cwebsocket> = new WeakSet();
161161
private _reconnectLock: Promise<void> | null = null;
@@ -188,6 +188,10 @@ export class WebSocketConnector {
188188
this.createConnection();
189189
}
190190

191+
public refreshRetryConfig(dsn: Dsn): void {
192+
this._retryConfig = RetryConfig.fromDsn(dsn);
193+
}
194+
191195
private buildUrl(addr: Address): string {
192196
const path = this._dsn.path();
193197
const url = new URL(`${this._dsn.scheme}://${addr.host}:${addr.port}/${path}`);

nodejs/src/client/wsConnectorPool.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ export class WebSocketConnectionPool {
3333
return poolKey.replace(/#auth=[^#]*/, "#auth=[REDACTED]");
3434
}
3535

36-
private buildAuth(dsn: Dsn): string {
36+
private buildAuthScope(dsn: Dsn): string {
3737
const token = dsn.params.get("token") || "";
3838
const bearerToken = dsn.params.get("bearer_token") || "";
39-
const raw = `${dsn.username}:${dsn.password}:${token}:${bearerToken}`;
39+
const raw = JSON.stringify({
40+
username: dsn.username,
41+
password: dsn.password,
42+
token,
43+
bearerToken,
44+
});
4045
return createHash("sha256").update(raw).digest("hex");
4146
}
4247

@@ -45,7 +50,7 @@ export class WebSocketConnectionPool {
4550
.sort((a, b) => `${a.host}:${a.port}`.localeCompare(`${b.host}:${b.port}`))
4651
.map((addr) => `${addr.host}:${addr.port}`)
4752
.join(",");
48-
const auth = this.buildAuth(dsn);
53+
const auth = this.buildAuthScope(dsn);
4954
const path = dsn.path();
5055
return `${dsn.scheme}://${addrs}/${path}#auth=${auth}`;
5156
}
@@ -67,6 +72,7 @@ export class WebSocketConnectionPool {
6772
continue;
6873
}
6974
if (candidate.readyState() === w3cwebsocket.OPEN) {
75+
candidate.refreshRetryConfig(dsn);
7076
connector = candidate;
7177
break;
7278
}

nodejs/src/common/dsn.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,27 @@ export function parse(url: string): Dsn {
102102
const scheme = schemeMatch[1].toLowerCase();
103103
let remainder = url.slice(schemeMatch[0].length);
104104

105-
// Extract username:password@ if present
105+
// Isolate authority from path/query before parsing userinfo.
106+
const slashIndex = remainder.indexOf("/");
107+
const queryMarkIndex = remainder.indexOf("?");
108+
let authorityEndIndex = remainder.length;
109+
if (slashIndex !== -1) {
110+
authorityEndIndex = Math.min(authorityEndIndex, slashIndex);
111+
}
112+
if (queryMarkIndex !== -1) {
113+
authorityEndIndex = Math.min(authorityEndIndex, queryMarkIndex);
114+
}
115+
const authority = remainder.slice(0, authorityEndIndex);
116+
const suffix = remainder.slice(authorityEndIndex);
117+
118+
// Extract username:password@ from authority only.
106119
let username = "";
107120
let password = "";
108-
const atIndex = remainder.indexOf("@");
121+
const atIndex = authority.lastIndexOf("@");
122+
let hostPort = authority;
109123
if (atIndex !== -1) {
110-
const userInfo = remainder.slice(0, atIndex);
111-
remainder = remainder.slice(atIndex + 1);
124+
const userInfo = authority.slice(0, atIndex);
125+
hostPort = authority.slice(atIndex + 1);
112126
const colonIndex = userInfo.indexOf(":");
113127
if (colonIndex !== -1) {
114128
username = userInfo.slice(0, colonIndex);
@@ -117,6 +131,7 @@ export function parse(url: string): Dsn {
117131
username = userInfo;
118132
}
119133
}
134+
remainder = `${hostPort}${suffix}`;
120135

121136
// Extract query params (after ?)
122137
let params = new Map<string, string>();

nodejs/test/client/wsClient.recovery.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,26 @@ describe("WsClient recovery hook", () => {
115115
await expect(hook()).rejects.toBeInstanceOf(WebSocketQueryError);
116116
expect(customRecoveryHook).not.toHaveBeenCalled();
117117
});
118+
119+
test("restores default information_schema during sql recovery when no db provided", async () => {
120+
const dsn = parse("ws://root:taosdata@localhost:6041");
121+
const connector = createMockConnector();
122+
jest
123+
.spyOn(WebSocketConnectionPool.instance(), "getConnection")
124+
.mockResolvedValue(connector as any);
125+
126+
const client = new WsClient(dsn, 5000);
127+
await client.connect();
128+
129+
const hookCalls = connector.setSessionRecoveryHook.mock.calls;
130+
const hook = hookCalls[hookCalls.length - 1]?.[0];
131+
expect(hook).toBeTruthy();
132+
await hook();
133+
134+
expect(connector.sendMsgDirect).toHaveBeenCalledTimes(1);
135+
const firstCall = connector.sendMsgDirect.mock.calls[0] as [string];
136+
const connMsg = JSON.parse(firstCall[0]);
137+
expect(connMsg.action).toBe("conn");
138+
expect(connMsg.args.db).toBe("information_schema");
139+
});
118140
});

nodejs/test/client/wsConnectorPool.key.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { WebSocketConnectionPool } from "@src/client/wsConnectorPool";
2+
import { RetryConfig } from "@src/client/wsConnector";
23
import { WSConfig } from "@src/common/config";
34
import { parse, WS_TMQ_ENDPOINT } from "@src/common/dsn";
45
import { WsSql } from "@src/sql/wsSql";
56
import { testPassword, testUsername } from "@test-helpers/utils";
7+
import { w3cwebsocket } from "websocket";
68

79
function resetPoolSingleton() {
810
const PoolClass = WebSocketConnectionPool as any;
@@ -52,6 +54,66 @@ describe("WebSocketConnectionPool key generation", () => {
5254
expect(keyA).not.toBe(keyB);
5355
});
5456

57+
test("does not collide auth scope when credentials contain colon", () => {
58+
const pool = WebSocketConnectionPool.instance();
59+
60+
const dsnA = parse("ws://host1:6041/mydb");
61+
dsnA.username = "a:b";
62+
dsnA.password = "c";
63+
64+
const dsnB = parse("ws://host1:6041/mydb");
65+
dsnB.username = "a";
66+
dsnB.password = "b:c";
67+
68+
const keyA = (pool as any).getPoolKey(dsnA);
69+
const keyB = (pool as any).getPoolKey(dsnB);
70+
expect(keyA).not.toBe(keyB);
71+
});
72+
73+
test("does not split pool key by reconnect policy", () => {
74+
const pool = WebSocketConnectionPool.instance();
75+
const lowRetryDsn = parse(
76+
"ws://root:taosdata@host1:6041/mydb?retries=1&retry_backoff_ms=10&retry_backoff_max_ms=20"
77+
);
78+
const highRetryDsn = parse(
79+
"ws://root:taosdata@host1:6041/mydb?retries=60&retry_backoff_ms=100&retry_backoff_max_ms=500"
80+
);
81+
82+
const lowRetryKey = (pool as any).getPoolKey(lowRetryDsn);
83+
const highRetryKey = (pool as any).getPoolKey(highRetryDsn);
84+
85+
expect(lowRetryKey).toBe(highRetryKey);
86+
});
87+
88+
test("updates connector retry policy when reusing pooled connector", async () => {
89+
const pool = WebSocketConnectionPool.instance();
90+
const lowRetryDsn = parse(
91+
"ws://root:taosdata@host1:6041/mydb?retries=1&retry_backoff_ms=10&retry_backoff_max_ms=20"
92+
);
93+
const highRetryDsn = parse(
94+
"ws://root:taosdata@host1:6041/mydb?retries=60&retry_backoff_ms=100&retry_backoff_max_ms=500"
95+
);
96+
const poolKey = (pool as any).getPoolKey(lowRetryDsn);
97+
98+
let retries = 1;
99+
const connector = {
100+
readyState: jest.fn(() => w3cwebsocket.OPEN),
101+
close: jest.fn(),
102+
refreshRetryConfig: jest.fn((dsn) => {
103+
retries = RetryConfig.fromDsn(dsn).retries;
104+
}),
105+
getReconnectRetries: jest.fn(() => retries),
106+
getPoolKey: jest.fn(() => poolKey),
107+
};
108+
109+
(pool as any).pool.set(poolKey, [connector]);
110+
const reused = await pool.getConnection(highRetryDsn, 3000);
111+
112+
expect(reused).toBe(connector);
113+
expect(connector.refreshRetryConfig).toHaveBeenCalledWith(highRetryDsn);
114+
expect((reused as any).getReconnectRetries()).toBe(60);
115+
});
116+
55117
test("includes endpoint-derived websocket path in the pool key scope", () => {
56118
const pool = WebSocketConnectionPool.instance();
57119
const sqlDsn = parse("ws://root:taosdata@host1:6041/mydb");

nodejs/test/common/dsn.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ describe("dsn", () => {
238238
expect(result.params.get("token")).toBe("abc123");
239239
});
240240

241+
test("does not parse @ in query parameter as userinfo delimiter", () => {
242+
const result = parse("ws://localhost:6041?bearer_token=a@b");
243+
expect(result.scheme).toBe("ws");
244+
expect(result.username).toBe("");
245+
expect(result.password).toBe("");
246+
expect(result.addresses).toEqual([{ host: "localhost", port: 6041 }]);
247+
expect(result.database).toBe("");
248+
expect(result.params.get("bearer_token")).toBe("a@b");
249+
});
250+
241251
test("database with query parameters", () => {
242252
const result = parse("ws://root:taosdata@localhost:6041/mydb?timezone=UTC");
243253
expect(result.scheme).toBe("ws");

0 commit comments

Comments
 (0)