Skip to content

Commit e36fbac

Browse files
committed
feat: implement network error handling and recovery for stmt2 operations
1 parent 8e7e1aa commit e36fbac

File tree

4 files changed

+737
-65
lines changed

4 files changed

+737
-65
lines changed

nodejs/src/client/wsClient.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,14 @@ export class WsClient {
337337
}
338338
}
339339

340+
isNetworkError(err: unknown): boolean {
341+
return this.getWsConnector().isNetworkError(err);
342+
}
343+
344+
async waitForReady(): Promise<void> {
345+
await this.getWsConnector().ready();
346+
}
347+
340348
async close(): Promise<void> {
341349
if (this._wsConnector) {
342350
this._wsConnector.setSessionRecoveryHook(null);

nodejs/src/client/wsConnector.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ export class WebSocketConnector {
329329
return BINARY_RETRIABLE_ACTIONS.has(action);
330330
}
331331

332-
private isNetworkError(err: unknown): boolean {
332+
public isNetworkError(err: unknown): boolean {
333333
if (!this._conn || this._conn.readyState !== w3cwebsocket.OPEN) {
334334
return true;
335335
}

nodejs/src/stmt/wsStmt2.ts

Lines changed: 154 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ import { TableInfo } from "./wsTableInfo";
2222
import { WSRows } from "../sql/wsRows";
2323
import { FieldBindParams } from "./FieldBindParams";
2424

25+
enum StmtStep {
26+
INIT,
27+
PREPARE,
28+
BIND,
29+
EXEC,
30+
RESULT,
31+
}
32+
2533
export class WsStmt2 implements WsStmt {
2634
private _wsClient: WsClient;
2735
private _stmt_id: bigint | undefined | null;
@@ -35,6 +43,8 @@ export class WsStmt2 implements WsStmt {
3543
private _toBeBindColCount: number;
3644
private _toBeBindTableNameIndex: number | undefined | null;
3745
private _isInsert: boolean = false;
46+
private _savedSql: string | undefined;
47+
private _savedBindBytes: ArrayBuffer | undefined;
3848

3949
private constructor(wsClient: WsClient, precision?: number) {
4050
this._wsClient = wsClient;
@@ -69,17 +79,13 @@ export class WsStmt2 implements WsStmt {
6979
await this._wsClient.connect();
7080
await this._wsClient.checkVersion();
7181
}
72-
const msg = {
73-
action: "stmt2_init",
74-
args: {
75-
req_id: ReqId.getReqID(reqId),
76-
single_stb_insert: true,
77-
single_table_bind_once: true,
78-
},
79-
};
80-
await this.execute(msg);
82+
await this.doInit(reqId);
8183
return this;
8284
} catch (e: any) {
85+
if (this._wsClient.isNetworkError(e)) {
86+
await this.rebuildContext(StmtStep.INIT);
87+
return this;
88+
}
8389
logger.error(`stmt init filed, ${e.code}, ${e.message}`);
8490
throw e;
8591
}
@@ -95,41 +101,12 @@ export class WsStmt2 implements WsStmt {
95101
}
96102

97103
async prepare(sql: string): Promise<void> {
98-
const msg = {
99-
action: "stmt2_prepare",
100-
args: {
101-
req_id: ReqId.getReqID(),
102-
sql: sql,
103-
stmt_id: this._stmt_id,
104-
get_fields: true,
105-
},
106-
};
107-
const resp = await this.execute(msg);
108-
if (this._isInsert && this.fields) {
109-
this._precision = this.fields[0].precision
110-
? this.fields[0].precision
111-
: 0;
112-
this._toBeBindColCount = 0;
113-
this._toBeBindTagCount = 0;
114-
this.fields?.forEach((field, index) => {
115-
if (field.bind_type == FieldBindType.TAOS_FIELD_TBNAME) {
116-
this._toBeBindTableNameIndex = index;
117-
} else if (field.bind_type == FieldBindType.TAOS_FIELD_TAG) {
118-
this._toBeBindTagCount++;
119-
} else if (field.bind_type == FieldBindType.TAOS_FIELD_COL) {
120-
this._toBeBindColCount++;
121-
}
122-
});
123-
} else {
124-
if (resp && resp.fields_count && resp.fields_count > 0) {
125-
this._stmtTableInfoList = [this._currentTableInfo];
126-
this._toBeBindColCount = resp.fields_count;
127-
} else {
128-
throw new TaosResultError(
129-
ErrorCode.ERR_INVALID_PARAMS,
130-
"prepare No columns to bind!"
131-
);
132-
}
104+
this._savedSql = sql;
105+
try {
106+
await this.doPrepare(sql);
107+
} catch (err: any) {
108+
if (!this._wsClient.isNetworkError(err)) throw err;
109+
await this.rebuildContext(StmtStep.PREPARE);
133110
}
134111
}
135112

@@ -288,30 +265,125 @@ export class WsStmt2 implements WsStmt {
288265
this._toBeBindTagCount,
289266
this._toBeBindColCount
290267
);
291-
await this.sendBinaryMsg(reqId, "stmt2_bind", bytes);
292-
293-
const execMsg = {
294-
action: "stmt2_exec",
295-
args: {
296-
req_id: ReqId.getReqID(),
297-
stmt_id: this._stmt_id,
298-
},
299-
};
300-
await this.execute(execMsg);
301-
this.cleanup();
268+
this._savedBindBytes = bytes;
269+
try {
270+
await this.doSendBindBytes(bytes);
271+
await this.doExec();
272+
} catch (err: any) {
273+
if (!this._wsClient.isNetworkError(err)) throw err;
274+
await this.rebuildContext(StmtStep.EXEC);
275+
}
276+
if (this._isInsert) this.cleanup();
302277
}
303278

304279
private cleanup() {
305280
this._stmtTableInfo.clear();
306281
this._stmtTableInfoList = [];
307282
this._currentTableInfo = new TableInfo();
283+
this._savedSql = undefined;
284+
this._savedBindBytes = undefined;
308285
}
309286

310287
getLastAffected(): number | null | undefined {
311288
return this.lastAffected;
312289
}
313290

314291
async resultSet(): Promise<WSRows> {
292+
try {
293+
return await this.doResult();
294+
} catch (err: any) {
295+
if (!this._wsClient.isNetworkError(err)) throw err;
296+
return await this.rebuildContext(StmtStep.RESULT) as WSRows;
297+
} finally {
298+
this.cleanup();
299+
}
300+
}
301+
302+
async close(): Promise<void> {
303+
let queryMsg = {
304+
action: "stmt2_close",
305+
args: {
306+
req_id: ReqId.getReqID(),
307+
stmt_id: this._stmt_id,
308+
},
309+
};
310+
try {
311+
await this.execute(queryMsg);
312+
} catch (_e) {
313+
// close failures are ignored per design
314+
}
315+
}
316+
317+
// --- Internal send methods (no retry logic) ---
318+
319+
private async doInit(reqId?: number): Promise<void> {
320+
const msg = {
321+
action: "stmt2_init",
322+
args: {
323+
req_id: ReqId.getReqID(reqId),
324+
single_stb_insert: true,
325+
single_table_bind_once: true,
326+
},
327+
};
328+
await this.execute(msg);
329+
}
330+
331+
private async doPrepare(sql: string): Promise<void> {
332+
const msg = {
333+
action: "stmt2_prepare",
334+
args: {
335+
req_id: ReqId.getReqID(),
336+
sql: sql,
337+
stmt_id: this._stmt_id,
338+
get_fields: true,
339+
},
340+
};
341+
const resp = await this.execute(msg);
342+
if (this._isInsert && this.fields) {
343+
this._precision = this.fields[0].precision
344+
? this.fields[0].precision
345+
: 0;
346+
this._toBeBindColCount = 0;
347+
this._toBeBindTagCount = 0;
348+
this.fields?.forEach((field, index) => {
349+
if (field.bind_type == FieldBindType.TAOS_FIELD_TBNAME) {
350+
this._toBeBindTableNameIndex = index;
351+
} else if (field.bind_type == FieldBindType.TAOS_FIELD_TAG) {
352+
this._toBeBindTagCount++;
353+
} else if (field.bind_type == FieldBindType.TAOS_FIELD_COL) {
354+
this._toBeBindColCount++;
355+
}
356+
});
357+
} else {
358+
if (resp && resp.fields_count && resp.fields_count > 0) {
359+
this._stmtTableInfoList = [this._currentTableInfo];
360+
this._toBeBindColCount = resp.fields_count;
361+
} else {
362+
throw new TaosResultError(
363+
ErrorCode.ERR_INVALID_PARAMS,
364+
"prepare No columns to bind!"
365+
);
366+
}
367+
}
368+
}
369+
370+
private async doSendBindBytes(bytes: ArrayBuffer): Promise<void> {
371+
const reqId = BigInt(ReqId.getReqID());
372+
await this.sendBinaryMsg(reqId, "stmt2_bind", bytes);
373+
}
374+
375+
private async doExec(): Promise<void> {
376+
const execMsg = {
377+
action: "stmt2_exec",
378+
args: {
379+
req_id: ReqId.getReqID(),
380+
stmt_id: this._stmt_id,
381+
},
382+
};
383+
await this.execute(execMsg);
384+
}
385+
386+
private async doResult(): Promise<WSRows> {
315387
const msg = {
316388
action: "stmt2_result",
317389
args: {
@@ -329,17 +401,35 @@ export class WsStmt2 implements WsStmt {
329401
return new WSRows(this._wsClient, resp);
330402
}
331403

332-
async close(): Promise<void> {
333-
let queryMsg = {
334-
action: "stmt2_close",
335-
args: {
336-
req_id: ReqId.getReqID(),
337-
stmt_id: this._stmt_id,
338-
},
339-
};
340-
await this.execute(queryMsg);
404+
// --- Unified rebuild method ---
405+
406+
private async rebuildContext(failedStep: StmtStep): Promise<any> {
407+
while (true) {
408+
try {
409+
await this._wsClient.waitForReady();
410+
411+
await this.doInit();
412+
if (failedStep === StmtStep.INIT) return;
413+
414+
await this.doPrepare(this._savedSql!);
415+
if (failedStep === StmtStep.PREPARE) return;
416+
417+
await this.doSendBindBytes(this._savedBindBytes!);
418+
if (failedStep === StmtStep.BIND) return;
419+
420+
await this.doExec();
421+
if (failedStep === StmtStep.EXEC) return;
422+
423+
return await this.doResult();
424+
} catch (err: any) {
425+
if (!this._wsClient.isNetworkError(err)) throw err;
426+
// Network error during rebuild, retry the entire rebuild
427+
}
428+
}
341429
}
342430

431+
// --- Low-level transport ---
432+
343433
private async execute(
344434
stmtMsg: StmtMessageInfo | ArrayBuffer,
345435
register: boolean = true

0 commit comments

Comments
 (0)