Skip to content

Commit 9d3bdb8

Browse files
committed
feat: enhance WsStmt2 with improved error handling and recovery logic; add comprehensive tests for failover scenarios
1 parent 4b8e1a9 commit 9d3bdb8

File tree

3 files changed

+166
-122
lines changed

3 files changed

+166
-122
lines changed

nodejs/src/stmt/wsStmt2.ts

Lines changed: 62 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class WsStmt2 implements WsStmt {
6666
const wsStmt = new WsStmt2(wsClient, precision);
6767
return await wsStmt.init(reqId);
6868
} catch (e: any) {
69-
logger.error(`WsStmt init is failed, ${e.code}, ${e.message}`);
69+
logger.error(`stmt2 init failed, ${e.code}, ${e.message}`);
7070
throw e;
7171
}
7272
}
@@ -81,18 +81,30 @@ export class WsStmt2 implements WsStmt {
8181
await this.doInit(reqId);
8282
return this;
8383
} catch (e: any) {
84-
logger.error(`stmt init filed, ${e.code}, ${e.message}`);
84+
if (this._wsClient.isNetworkError(e)) {
85+
await this.recover(StmtStep.INIT);
86+
return this;
87+
}
88+
logger.error(`stmt2 init failed, ${e.code}, ${e.message}`);
8589
throw e;
8690
}
8791
}
8892
throw new TDWebSocketClientError(
8993
ErrorCode.ERR_CONNECTION_CLOSED,
90-
"stmt connect closed"
94+
"stmt2 connect closed"
9195
);
9296
}
9397

94-
getStmtId(): bigint | undefined | null {
95-
return this._stmt_id;
98+
private async doInit(reqId?: number): Promise<void> {
99+
const msg = {
100+
action: "stmt2_init",
101+
args: {
102+
req_id: ReqId.getReqID(reqId),
103+
single_stb_insert: true,
104+
single_table_bind_once: true,
105+
},
106+
};
107+
await this.execute(msg);
96108
}
97109

98110
async prepare(sql: string): Promise<void> {
@@ -103,7 +115,7 @@ export class WsStmt2 implements WsStmt {
103115
if (!this._wsClient.isNetworkError(err)) {
104116
throw err;
105117
}
106-
await this.rebuildContext(StmtStep.PREPARE);
118+
await this.recover(StmtStep.PREPARE);
107119
}
108120
}
109121

@@ -126,9 +138,7 @@ export class WsStmt2 implements WsStmt {
126138
}
127139

128140
if (this._isInsert && this.fields) {
129-
this._precision = this.fields[0].precision
130-
? this.fields[0].precision
131-
: 0;
141+
this._precision = this.fields[0].precision ? this.fields[0].precision : 0;
132142
this._toBeBindColCount = 0;
133143
this._toBeBindTagCount = 0;
134144
this.fields.forEach((field, index) => {
@@ -178,13 +188,11 @@ export class WsStmt2 implements WsStmt {
178188
"SetBinaryTags paramArray is invalid!"
179189
);
180190
}
181-
182191
if (!this._currentTableInfo) {
183192
this._currentTableInfo = new TableInfo();
184193
this._stmtTableInfoList.push(this._currentTableInfo);
185194
}
186195
await this._currentTableInfo.setTags(paramsArray);
187-
188196
return Promise.resolve();
189197
}
190198

@@ -213,11 +221,7 @@ export class WsStmt2 implements WsStmt {
213221
);
214222
}
215223

216-
if (
217-
this._isInsert &&
218-
this.fields &&
219-
paramsArray.getBindCount() == this.fields.length
220-
) {
224+
if (this._isInsert && this.fields && paramsArray.getBindCount() == this.fields.length) {
221225
const tableNameIndex = this._toBeBindTableNameIndex;
222226
if (tableNameIndex === null || tableNameIndex === undefined) {
223227
throw new TaosResultError(
@@ -299,9 +303,8 @@ export class WsStmt2 implements WsStmt {
299303
);
300304
}
301305

302-
const reqId = BigInt(ReqId.getReqID());
303306
const bytes = stmt2BinaryBlockEncode(
304-
reqId,
307+
BigInt(ReqId.getReqID()),
305308
this._stmtTableInfoList,
306309
this._stmt_id,
307310
this._toBeBindTableNameIndex,
@@ -317,35 +320,28 @@ export class WsStmt2 implements WsStmt {
317320
if (!this._wsClient.isNetworkError(err)) {
318321
throw err;
319322
}
320-
await this.rebuildContext(StmtStep.EXEC);
323+
await this.recover(StmtStep.EXEC);
324+
} finally {
325+
if (this._isInsert) {
326+
this.cleanup();
327+
}
321328
}
329+
}
322330

323-
if (this._isInsert) {
324-
this.cleanup();
325-
}
331+
private async doSendBindBytes(bytes: ArrayBuffer): Promise<void> {
332+
const reqId = new DataView(bytes).getBigUint64(0, true);
333+
await this.sendBinaryMsg(reqId, "stmt2_bind", bytes);
326334
}
327335

328336
private async doExec(): Promise<void> {
329-
const execMsg = {
337+
const msg = {
330338
action: "stmt2_exec",
331339
args: {
332340
req_id: ReqId.getReqID(),
333341
stmt_id: this._stmt_id,
334342
},
335343
};
336-
await this.execute(execMsg);
337-
}
338-
339-
private cleanup() {
340-
this._stmtTableInfo.clear();
341-
this._stmtTableInfoList = [];
342-
this._currentTableInfo = new TableInfo();
343-
this._savedSql = undefined;
344-
this._savedBindBytes = undefined;
345-
}
346-
347-
getLastAffected(): number | null | undefined {
348-
return this.lastAffected;
344+
await this.execute(msg);
349345
}
350346

351347
async resultSet(): Promise<WSRows> {
@@ -355,7 +351,7 @@ export class WsStmt2 implements WsStmt {
355351
if (!this._wsClient.isNetworkError(err)) {
356352
throw err;
357353
}
358-
return await this.rebuildContext(StmtStep.RESULT);
354+
return await this.recover(StmtStep.RESULT);
359355
} finally {
360356
this.cleanup();
361357
}
@@ -380,47 +376,46 @@ export class WsStmt2 implements WsStmt {
380376
}
381377

382378
async close(): Promise<void> {
383-
const queryMsg = {
379+
const msg = {
384380
action: "stmt2_close",
385381
args: {
386382
req_id: ReqId.getReqID(),
387383
stmt_id: this._stmt_id,
388384
},
389385
};
390386
try {
391-
await this.execute(queryMsg);
387+
await this.execute(msg);
392388
} catch (err: any) {
393-
logger.warn(`stmt2 close failed and ignored: ${err.message}`);
389+
logger.warn("stmt2 close failed: " + err.message);
390+
} finally {
391+
this.cleanup();
394392
}
395393
}
396394

397-
private async doInit(reqId?: number): Promise<void> {
398-
const msg = {
399-
action: "stmt2_init",
400-
args: {
401-
req_id: ReqId.getReqID(reqId),
402-
single_stb_insert: true,
403-
single_table_bind_once: true,
404-
},
405-
};
406-
await this.execute(msg);
395+
private cleanup() {
396+
this._stmtTableInfo.clear();
397+
this._stmtTableInfoList = [];
398+
this._currentTableInfo = new TableInfo();
399+
this._savedSql = undefined;
400+
this._savedBindBytes = undefined;
407401
}
408402

409-
private extractReqId(bytes: ArrayBuffer): bigint {
410-
if (bytes.byteLength < 8) {
403+
private buildBindBytes(): ArrayBuffer {
404+
if (this._savedBindBytes === undefined) {
411405
throw new TaosResultError(
412406
ErrorCode.ERR_INVALID_PARAMS,
413-
"Invalid stmt2 bind message bytes"
407+
"bind bytes are missing for stmt2 rebuild"
414408
);
415409
}
416-
return new DataView(bytes).getBigUint64(0, true);
417-
}
418410

419-
private async doSendBindBytes(bytes: ArrayBuffer): Promise<void> {
420-
await this.sendBinaryMsg(this.extractReqId(bytes), "stmt2_bind", bytes);
411+
const bytes = this._savedBindBytes.slice(0);
412+
const view = new DataView(bytes);
413+
view.setBigUint64(0, BigInt(ReqId.getReqID()), true);
414+
view.setBigUint64(8, this._stmt_id!, true);
415+
return bytes;
421416
}
422417

423-
private async rebuildContext(failedStep: StmtStep): Promise<any> {
418+
private async recover(failedStep: StmtStep): Promise<any> {
424419
while (true) {
425420
try {
426421
await this._wsClient.waitForReady();
@@ -441,13 +436,7 @@ export class WsStmt2 implements WsStmt {
441436
return;
442437
}
443438

444-
if (this._savedBindBytes === undefined) {
445-
throw new TaosResultError(
446-
ErrorCode.ERR_INVALID_PARAMS,
447-
"bind bytes are missing for stmt2 rebuild"
448-
);
449-
}
450-
await this.doSendBindBytes(this._savedBindBytes);
439+
await this.doSendBindBytes(this.buildBindBytes());
451440
if (failedStep === StmtStep.BIND) {
452441
return;
453442
}
@@ -466,18 +455,19 @@ export class WsStmt2 implements WsStmt {
466455
}
467456
}
468457

458+
getStmtId(): bigint | undefined | null {
459+
return this._stmt_id;
460+
}
461+
462+
getLastAffected(): number | null | undefined {
463+
return this.lastAffected;
464+
}
465+
469466
private async execute(
470467
stmtMsg: StmtMessageInfo | ArrayBuffer,
471468
register: boolean = true
472469
): Promise<WsStmtQueryResponse | void> {
473470
try {
474-
if (this._wsClient.getState() <= 0) {
475-
throw new TDWebSocketClientError(
476-
ErrorCode.ERR_CONNECTION_CLOSED,
477-
"websocket connect has closed!"
478-
);
479-
}
480-
481471
const reqMsg = JSONBig.stringify(stmtMsg);
482472

483473
if (register) {
@@ -511,13 +501,6 @@ export class WsStmt2 implements WsStmt {
511501
action: string,
512502
message: ArrayBuffer
513503
): Promise<void> {
514-
if (this._wsClient.getState() <= 0) {
515-
throw new TDWebSocketClientError(
516-
ErrorCode.ERR_CONNECTION_CLOSED,
517-
"websocket connect has closed!"
518-
);
519-
}
520-
521504
const result = await this._wsClient.sendBinaryMsg(
522505
reqId,
523506
action,

nodejs/src/tmq/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export class TmqConfig {
1313
topics?: Array<string>;
1414
auto_commit: boolean = true;
1515
auto_commit_interval_ms: number = 5 * 1000;
16-
timeout: number = 5000;
16+
timeout: number = 60000;
1717
otherConfigs: Map<string, any>;
1818

1919
constructor(wsConfig: Map<string, any>) {

0 commit comments

Comments
 (0)