Skip to content

Commit d081e0d

Browse files
committed
feat: add comprehensive tests for stmt2 failover scenarios with ws proxy
1 parent 469a949 commit d081e0d

File tree

2 files changed

+212
-0
lines changed

2 files changed

+212
-0
lines changed

nodejs/test/bulkPulling/stmt2.failover.mock.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ describe("WsStmt2 failover (mock)", () => {
121121
expect(recoverSpy).toHaveBeenCalledWith(Step.EXEC);
122122
expect(cleanupSpy).not.toHaveBeenCalled();
123123
});
124+
124125
test("resultSet recovers on network error and cleans up", async () => {
125126
const { stmt, wsClient } = createBareStmt();
126127
const networkError = new Error("connection reset");
@@ -136,6 +137,7 @@ describe("WsStmt2 failover (mock)", () => {
136137
expect(recoverSpy).toHaveBeenCalledWith(Step.RESULT);
137138
expect(cleanupSpy).toHaveBeenCalledTimes(1);
138139
});
140+
139141
test("exec only cleans up immediately for insert statements", async () => {
140142
const bindBytes = new Uint8Array([7, 8, 9]).buffer;
141143

@@ -178,6 +180,7 @@ describe("WsStmt2 failover (mock)", () => {
178180
await expect(stmt.exec()).rejects.toThrow("recover failed");
179181
expect(cleanupSpy).toHaveBeenCalledTimes(1);
180182
});
183+
181184
test("non-network errors are rethrown without recover in prepare", async () => {
182185
const { stmt, wsClient } = createBareStmt();
183186
const nonNetworkError = new Error("invalid sql");
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool";
2+
import { WSConfig } from "../../src/common/config";
3+
import { WsSql } from "../../src/sql/wsSql";
4+
import { testPassword, testUsername } from "../helpers/utils";
5+
import { WsProxy, WsProxyEvent, WsProxyMessageEvent } from "../helpers/wsProxy";
6+
7+
interface StageCase {
8+
name: string;
9+
shouldRestart: (event: WsProxyEvent) => boolean;
10+
}
11+
12+
function parseJsonAction(rawData: Buffer | string): string | null {
13+
if (typeof rawData !== "string") {
14+
return null;
15+
}
16+
try {
17+
const parsed = JSON.parse(rawData);
18+
return typeof parsed.action === "string" ? parsed.action : null;
19+
} catch (_err) {
20+
return null;
21+
}
22+
}
23+
24+
function parseBinaryAction(rawData: Buffer | string): bigint | null {
25+
if (typeof rawData === "string" || rawData.byteLength < 24) {
26+
return null;
27+
}
28+
return rawData.readBigInt64LE(16);
29+
}
30+
31+
function asClientToUpstreamMessage(event: WsProxyEvent): WsProxyMessageEvent | null {
32+
if (event.type !== "message") {
33+
return null;
34+
}
35+
if (event.direction !== "client_to_upstream") {
36+
return null;
37+
}
38+
return event;
39+
}
40+
41+
const stageCases: StageCase[] = [
42+
{
43+
name: "init",
44+
shouldRestart: (event) => {
45+
const messageEvent = asClientToUpstreamMessage(event);
46+
return !!messageEvent &&
47+
parseJsonAction(messageEvent.rawData) === "stmt2_init";
48+
},
49+
},
50+
{
51+
name: "prepare",
52+
shouldRestart: (event) => {
53+
const messageEvent = asClientToUpstreamMessage(event);
54+
return !!messageEvent &&
55+
parseJsonAction(messageEvent.rawData) === "stmt2_prepare";
56+
},
57+
},
58+
{
59+
name: "bind",
60+
shouldRestart: (event) => {
61+
const messageEvent = asClientToUpstreamMessage(event);
62+
return !!messageEvent &&
63+
messageEvent.isBinary &&
64+
parseBinaryAction(messageEvent.rawData) === 9n;
65+
},
66+
},
67+
{
68+
name: "exec",
69+
shouldRestart: (event) => {
70+
const messageEvent = asClientToUpstreamMessage(event);
71+
return !!messageEvent &&
72+
parseJsonAction(messageEvent.rawData) === "stmt2_exec";
73+
},
74+
},
75+
{
76+
name: "result",
77+
shouldRestart: (event) => {
78+
const messageEvent = asClientToUpstreamMessage(event);
79+
return !!messageEvent &&
80+
parseJsonAction(messageEvent.rawData) === "stmt2_result";
81+
},
82+
},
83+
];
84+
85+
describe("stmt2 failover with ws proxy", () => {
86+
jest.setTimeout(180 * 1000);
87+
88+
afterEach(async () => {
89+
WebSocketConnectionPool.instance().destroyed();
90+
jest.restoreAllMocks();
91+
});
92+
93+
test.each(stageCases)(
94+
"single-address reconnect recovers stmt2 when network error happens at $name stage",
95+
async ({ name, shouldRestart }) => {
96+
const dbName = `test_1774332664_${name}`;
97+
const tableName = "t0";
98+
const baseTs = 1700100000000;
99+
const localDsn = `ws://${testUsername()}:${testPassword()}@127.0.0.1:6041`;
100+
let setupSql: WsSql | null = null;
101+
let cleanupSql: WsSql | null = null;
102+
let wsSql: WsSql | null = null;
103+
let stmt: any = null;
104+
let wsRows: any = null;
105+
let restartTriggered = false;
106+
let clientConnectedCount = 0;
107+
let matchedStageMessageCount = 0;
108+
let rowCount = 0;
109+
110+
setupSql = await WsSql.open(new WSConfig(localDsn));
111+
try {
112+
await setupSql.exec(`drop database if exists ${dbName}`);
113+
await setupSql.exec(`create database ${dbName}`);
114+
await setupSql.exec(`create table ${dbName}.${tableName}(ts timestamp, c1 int)`);
115+
await setupSql.exec(
116+
`insert into ${dbName}.${tableName} values` +
117+
` (${baseTs}, 1) (${baseTs + 1}, 2) (${baseTs + 2}, 3)`
118+
);
119+
} finally {
120+
await setupSql.close();
121+
setupSql = null;
122+
}
123+
124+
const proxy = await WsProxy.create({
125+
host: "127.0.0.1",
126+
port: 0,
127+
onEvent: (event, control) => {
128+
if (event.type === "client_connected") {
129+
clientConnectedCount += 1;
130+
}
131+
if (restartTriggered) {
132+
return;
133+
}
134+
if (!shouldRestart(event)) {
135+
return;
136+
}
137+
matchedStageMessageCount += 1;
138+
restartTriggered = true;
139+
void control.restart({
140+
downtimeMs: 200,
141+
reason: `trigger stmt2 ${name} failover`,
142+
});
143+
},
144+
});
145+
146+
try {
147+
const dsn =
148+
`ws://${testUsername()}:${testPassword()}@127.0.0.1:${proxy.getPort()}` +
149+
`?retries=20&retry_backoff_ms=15&retry_backoff_max_ms=80`;
150+
const conf = new WSConfig(dsn);
151+
conf.setDb(dbName);
152+
conf.setTimeOut(10000);
153+
wsSql = await WsSql.open(conf);
154+
155+
stmt = await wsSql.stmtInit();
156+
await stmt.prepare(
157+
`select ts, c1 from ${tableName} where ts >= ? and ts <= ? order by ts`
158+
);
159+
const params = stmt.newStmtParam();
160+
params.setTimestamp([BigInt(baseTs)]);
161+
params.setTimestamp([BigInt(baseTs + 2)]);
162+
await stmt.bind(params);
163+
await stmt.exec();
164+
wsRows = await stmt.resultSet();
165+
166+
while (await wsRows.next()) {
167+
const data = wsRows.getData();
168+
expect(data).toBeTruthy();
169+
rowCount += 1;
170+
}
171+
172+
expect(matchedStageMessageCount).toBe(1);
173+
expect(restartTriggered).toBe(true);
174+
expect(clientConnectedCount).toBeGreaterThanOrEqual(2);
175+
expect(rowCount).toBe(3);
176+
} finally {
177+
if (wsRows) {
178+
try {
179+
await wsRows.close();
180+
} catch (_err) {
181+
// ignore cleanup error
182+
}
183+
wsRows = null;
184+
}
185+
if (stmt) {
186+
try {
187+
await stmt.close();
188+
} catch (_err) {
189+
// ignore cleanup error
190+
}
191+
stmt = null;
192+
}
193+
if (wsSql) {
194+
await wsSql.close();
195+
wsSql = null;
196+
}
197+
await proxy.stop("test cleanup");
198+
199+
cleanupSql = await WsSql.open(new WSConfig(localDsn));
200+
try {
201+
await cleanupSql.exec(`drop database if exists ${dbName}`);
202+
} finally {
203+
await cleanupSql.close();
204+
cleanupSql = null;
205+
}
206+
}
207+
}
208+
);
209+
});

0 commit comments

Comments
 (0)