Skip to content

Commit 6219242

Browse files
committed
enh: improve token handling and logging in TmqConfig and WsConsumer
1 parent 61a17a1 commit 6219242

File tree

4 files changed

+86
-35
lines changed

4 files changed

+86
-35
lines changed

nodejs/src/common/utils.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ export function decimalToString(
197197
return decimalStr;
198198
}
199199

200-
const SENSITIVE_FIELD_REGEX = /("(?:password|bearer_token)"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g;
200+
const SENSITIVE_FIELD_REGEX =
201+
/("(?:password|bearer_token|td\.connect\.token)"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g;
201202

202203
export function maskSensitiveForLog(message: string): string {
203204
return message.replace(SENSITIVE_FIELD_REGEX, '$1"[REDACTED]"');
@@ -219,19 +220,19 @@ export function maskUrlForLog(url: URL | null): string {
219220
return masked.toString().replace(/%5BREDACTED%5D/g, "[REDACTED]");
220221
}
221222

222-
export function maskTmqConfigForLog(config: TmqConfig): object {
223-
const masked = { ...config, otherConfigs: Object.fromEntries(config.otherConfigs) };
224-
if (masked.url) {
225-
masked.url = new URL(maskUrlForLog(masked.url));
226-
}
227-
if (masked.sql_url) {
228-
masked.sql_url = new URL(maskUrlForLog(masked.sql_url));
229-
}
223+
export function maskTmqConfigForLog(config: TmqConfig): string {
224+
const masked = {
225+
...config,
226+
otherConfigs: Object.fromEntries(config.otherConfigs)
227+
};
230228
if (masked.token) {
231229
masked.token = "[REDACTED]";
232230
}
233231
if (masked.password) {
234232
masked.password = "[REDACTED]";
235233
}
236-
return masked;
234+
return JSON.stringify(masked, (key, value) =>
235+
(key === "url" || key === "sql_url")
236+
? maskUrlForLog(value) : (key === "td.connect.token" ? "[REDACTED]" : value)
237+
);
237238
}

nodejs/src/tmq/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class TmqConfig {
3030
break;
3131
case TMQConstants.CONNECT_TOKEN:
3232
this.token = value;
33+
this.otherConfigs.set(key, value);
3334
break;
3435
case TMQConstants.GROUP_ID:
3536
this.group_id = value;
@@ -71,6 +72,7 @@ export class TmqConfig {
7172
const bearerToken = this.url.searchParams.get("bearer_token");
7273
if (bearerToken) {
7374
this.token = bearerToken;
75+
this.otherConfigs.set(TMQConstants.CONNECT_TOKEN, bearerToken);
7476
} else {
7577
this.url.searchParams.delete("bearer_token");
7678
}

nodejs/src/tmq/wsTmq.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class WsConsumer {
3434
private constructor(wsConfig: Map<string, any>) {
3535
this._wsConfig = new TmqConfig(wsConfig);
3636
if (logger.isDebugEnabled()) {
37-
logger.debug(maskTmqConfigForLog(this._wsConfig));
37+
logger.debug("WsConsumer config: " + maskTmqConfigForLog(this._wsConfig));
3838
}
3939
if (wsConfig.size == 0 || !this._wsConfig.url) {
4040
throw new WebSocketInterfaceError(
@@ -102,14 +102,13 @@ export class WsConsumer {
102102
req_id: ReqId.getReqID(reqId),
103103
user: this._wsConfig.user,
104104
password: this._wsConfig.password,
105-
...(this._wsConfig.token && { bearer_token: this._wsConfig.token }),
106105
group_id: this._wsConfig.group_id,
107106
client_id: this._wsConfig.client_id,
108107
topics: topics,
109108
offset_rest: this._wsConfig.offset_rest,
110109
auto_commit: this._wsConfig.auto_commit,
111110
auto_commit_interval_ms: this._wsConfig.auto_commit_interval_ms,
112-
config: this._wsConfig.otherConfigs,
111+
config: Object.fromEntries(this._wsConfig.otherConfigs),
113112
},
114113
};
115114
this._topics = topics;

nodejs/test/bulkPulling/tmq.test.ts

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ beforeAll(async () => {
248248
let insertRes = await ws.exec(insert);
249249
insert = insertStable(tableCNValues, stableTags, stable);
250250
insertRes = await ws.exec(insert);
251-
await ws.exec("create user tmq_token_user pass 'token_pass_1'");
252251
await ws.exec(`create topic if not exists ${tokenTopic} as select * from ${db}.${stable}`);
253252
await ws.close();
254253
});
@@ -398,20 +397,69 @@ describe("TDWebSocket.Tmq()", () => {
398397

399398
testEnterprise("connect with token", async () => {
400399
const conf = new WSConfig(dsn);
401-
conf.setUser(testUsername());
402-
conf.setPwd(testPassword());
403400
const wsSql = await WsSql.open(conf);
404-
const wsRows = await wsSql.query("create token test_tmq_token from user tmq_token_user");
401+
await wsSql.exec("drop token if exists test_tmq_token");
402+
const wsRows = await wsSql.query(`create token test_tmq_token from user ${testUsername()}`);
405403
await wsRows.next();
406404
const token = wsRows.getData()?.[0] as string;
407405
expect(token).toBeTruthy();
408406
await wsRows.close();
407+
408+
const tmqConf = new Map([
409+
[TMQConstants.WS_URL, "ws://localhost:6041"],
410+
[TMQConstants.CONNECT_USER, "invalid_user"],
411+
[TMQConstants.CONNECT_PASS, "invalid_pass"],
412+
[TMQConstants.CONNECT_TOKEN, token],
413+
[TMQConstants.GROUP_ID, "g1101"],
414+
[TMQConstants.CLIENT_ID, "c1101"],
415+
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
416+
[TMQConstants.ENABLE_AUTO_COMMIT, "false"],
417+
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, "1000"],
418+
]);
419+
const consumer = await WsConsumer.newConsumer(tmqConf);
420+
await consumer.subscribe([tokenTopic]);
421+
422+
let count: number = 0;
423+
for (let i = 0; i < 5; i++) {
424+
const res = await consumer.poll(500);
425+
for (const [, value] of res) {
426+
const data = value.getData();
427+
if (data == null || data.length == 0) {
428+
break;
429+
}
430+
count += data.length;
431+
}
432+
}
433+
expect(count).toEqual(10);
434+
435+
await Sleep(3000);
436+
await consumer.unsubscribe();
437+
await consumer.close();
438+
await wsSql.exec("drop token if exists test_tmq_token");
409439
await wsSql.close();
440+
});
441+
442+
testEnterprise("connect with token url", async () => {
443+
const conf = new WSConfig(dsn);
444+
const wsSql = await WsSql.open(conf);
445+
await wsSql.exec("drop token if exists test_tmq_token_url");
446+
const wsRows = await wsSql.query(`create token test_tmq_token_url from user ${testUsername()}`);
447+
await wsRows.next();
448+
const token = wsRows.getData()?.[0] as string;
449+
expect(token).toBeTruthy();
450+
await wsRows.close();
410451

411-
const tokenConfigMap = new Map(configMap);
412-
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, token);
413-
tokenConfigMap.set(TMQConstants.GROUP_ID, "token_group");
414-
const consumer = await WsConsumer.newConsumer(tokenConfigMap);
452+
const tmqConf = new Map([
453+
[TMQConstants.WS_URL, `ws://localhost:6041?bearer_token=${token}`],
454+
[TMQConstants.CONNECT_USER, "invalid_user"],
455+
[TMQConstants.CONNECT_PASS, "invalid_pass"],
456+
[TMQConstants.GROUP_ID, "g1103"],
457+
[TMQConstants.CLIENT_ID, "c1103"],
458+
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
459+
[TMQConstants.ENABLE_AUTO_COMMIT, "false"],
460+
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, "1000"],
461+
]);
462+
const consumer = await WsConsumer.newConsumer(tmqConf);
415463
await consumer.subscribe([tokenTopic]);
416464

417465
let count: number = 0;
@@ -430,33 +478,35 @@ describe("TDWebSocket.Tmq()", () => {
430478
await Sleep(3000);
431479
await consumer.unsubscribe();
432480
await consumer.close();
481+
await wsSql.exec("drop token if exists test_tmq_token_url");
482+
await wsSql.close();
433483
});
434484

435485
testEnterprise("connect with invalid token", async () => {
436-
const tokenConfigMap = new Map([
437-
[TMQConstants.GROUP_ID, "token_group1"],
438-
[TMQConstants.CLIENT_ID, "token_client1"],
486+
const conf = new Map([
487+
[TMQConstants.GROUP_ID, "g1102"],
488+
[TMQConstants.CLIENT_ID, "c1102"],
439489
[TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=invalid_token"],
440490
]);
441-
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
491+
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
442492
message: expect.stringMatching(/invalid token/i),
443493
});
444494

445-
tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041");
446-
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, "invalid_token1");
447-
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
495+
conf.set(TMQConstants.WS_URL, "ws://localhost:6041");
496+
conf.set(TMQConstants.CONNECT_TOKEN, "invalid_token1");
497+
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
448498
message: expect.stringMatching(/invalid token/i),
449499
});
450500

451-
tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=");
452-
tokenConfigMap.delete(TMQConstants.CONNECT_TOKEN);
453-
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
501+
conf.set(TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=");
502+
conf.delete(TMQConstants.CONNECT_TOKEN);
503+
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
454504
message: expect.stringMatching(/invalid url/i),
455505
});
456506

457-
tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041");
458-
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, "");
459-
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
507+
conf.set(TMQConstants.WS_URL, "ws://localhost:6041");
508+
conf.set(TMQConstants.CONNECT_TOKEN, "");
509+
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
460510
message: expect.stringMatching(/invalid url/i),
461511
});
462512
});
@@ -469,7 +519,6 @@ afterAll(async () => {
469519
await ws.exec(dropTopic);
470520
await ws.exec(`drop topic if exists ${tokenTopic}`);
471521
await ws.exec(dropDB);
472-
await ws.exec("drop user tmq_token_user");
473522
await ws.close();
474523
WebSocketConnectionPool.instance().destroyed();
475524
});

0 commit comments

Comments
 (0)