Skip to content

Commit 0977fc7

Browse files
PavelPashovnkaradzhov
authored andcommitted
test: add E2E tests for Redis Enterprise maintenance timeout handling (#3)
1 parent 11c1197 commit 0977fc7

File tree

3 files changed

+294
-0
lines changed

3 files changed

+294
-0
lines changed

packages/client/lib/tests/test-scenario/fault-injector-client.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,41 @@ export class FaultInjectorClient {
108108
throw new Error(`Timeout waiting for action ${actionId}`);
109109
}
110110

111+
async migrateAndBindAction({
112+
bdbId,
113+
clusterIndex,
114+
}: {
115+
bdbId: string | number;
116+
clusterIndex: string | number;
117+
}) {
118+
const bdbIdStr = bdbId.toString();
119+
const clusterIndexStr = clusterIndex.toString();
120+
121+
return this.triggerAction<{
122+
action_id: string;
123+
}>({
124+
type: "sequence_of_actions",
125+
parameters: {
126+
bdbId: bdbIdStr,
127+
actions: [
128+
{
129+
type: "migrate",
130+
parameters: {
131+
cluster_index: clusterIndexStr,
132+
},
133+
},
134+
{
135+
type: "bind",
136+
parameters: {
137+
cluster_index: clusterIndexStr,
138+
bdb_id: bdbIdStr,
139+
},
140+
},
141+
],
142+
},
143+
});
144+
}
145+
111146
async #request<T>(
112147
method: string,
113148
path: string,
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import { randomUUID } from "node:crypto";
2+
import { setTimeout } from "node:timers/promises";
3+
import { createClient } from "../../..";
4+
5+
/**
6+
* Options for the `fireCommandsUntilStopSignal` method
7+
*/
8+
type FireCommandsUntilStopSignalOptions = {
9+
/**
10+
* Number of commands to fire in each batch
11+
*/
12+
batchSize: number;
13+
/**
14+
* Timeout between batches in milliseconds
15+
*/
16+
timeoutMs: number;
17+
/**
18+
* Function that creates the commands to be executed
19+
*/
20+
createCommands: (
21+
client: ReturnType<typeof createClient<any, any, any, any>>
22+
) => Array<() => Promise<unknown>>;
23+
};
24+
25+
export class TestCommandRunner {
26+
constructor(
27+
private client: ReturnType<typeof createClient<any, any, any, any>>
28+
) {}
29+
30+
private defaultOptions: FireCommandsUntilStopSignalOptions = {
31+
batchSize: 60,
32+
timeoutMs: 10,
33+
createCommands: (
34+
client: ReturnType<typeof createClient<any, any, any, any>>
35+
) => [
36+
() => client.set(randomUUID(), Date.now()),
37+
() => client.get(randomUUID()),
38+
],
39+
};
40+
41+
#toSettled<T>(p: Promise<T>) {
42+
return p
43+
.then((value) => ({ status: "fulfilled" as const, value, error: null }))
44+
.catch((reason) => ({
45+
status: "rejected" as const,
46+
value: null,
47+
error: reason,
48+
}));
49+
}
50+
51+
async #racePromises<S, T>({
52+
timeout,
53+
stopper,
54+
}: {
55+
timeout: Promise<S>;
56+
stopper: Promise<T>;
57+
}) {
58+
return Promise.race([
59+
this.#toSettled<S>(timeout).then((result) => ({
60+
...result,
61+
stop: false,
62+
})),
63+
this.#toSettled<T>(stopper).then((result) => ({ ...result, stop: true })),
64+
]);
65+
}
66+
67+
/**
68+
* Fires commands until a stop signal is received.
69+
* @param stopSignalPromise Promise that resolves when the command execution should stop
70+
* @param options Options for the command execution
71+
* @returns Promise that resolves when the stop signal is received
72+
*/
73+
async fireCommandsUntilStopSignal(
74+
stopSignalPromise: Promise<unknown>,
75+
options?: Partial<FireCommandsUntilStopSignalOptions>
76+
) {
77+
const executeOptions = {
78+
...this.defaultOptions,
79+
...options,
80+
};
81+
82+
const commandPromises = [];
83+
84+
while (true) {
85+
for (let i = 0; i < executeOptions.batchSize; i++) {
86+
for (const command of executeOptions.createCommands(this.client)) {
87+
commandPromises.push(this.#toSettled(command()));
88+
}
89+
}
90+
91+
const result = await this.#racePromises({
92+
timeout: setTimeout(executeOptions.timeoutMs),
93+
stopper: stopSignalPromise,
94+
});
95+
96+
if (result.stop) {
97+
return {
98+
commandPromises,
99+
stopResult: result,
100+
};
101+
}
102+
}
103+
}
104+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import assert from "node:assert";
2+
import { setTimeout } from "node:timers/promises";
3+
import { FaultInjectorClient } from "./fault-injector-client";
4+
import {
5+
getDatabaseConfig,
6+
getDatabaseConfigFromEnv,
7+
getEnvConfig,
8+
RedisConnectionConfig,
9+
} from "./test-scenario.util";
10+
import { createClient } from "../../../dist";
11+
import { before } from "mocha";
12+
import { TestCommandRunner } from "./test-command-runner";
13+
14+
describe("Timeout Handling During Notifications", () => {
15+
let clientConfig: RedisConnectionConfig;
16+
let client: ReturnType<typeof createClient<any, any, any, 3>>;
17+
let faultInjectorClient: FaultInjectorClient;
18+
let commandRunner: TestCommandRunner;
19+
20+
before(() => {
21+
const envConfig = getEnvConfig();
22+
const redisConfig = getDatabaseConfigFromEnv(
23+
envConfig.redisEndpointsConfigPath
24+
);
25+
26+
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
27+
clientConfig = getDatabaseConfig(redisConfig);
28+
});
29+
30+
beforeEach(async () => {
31+
client = createClient({
32+
socket: {
33+
host: clientConfig.host,
34+
port: clientConfig.port,
35+
...(clientConfig.tls === true ? { tls: true } : {}),
36+
},
37+
password: clientConfig.password,
38+
username: clientConfig.username,
39+
RESP: 3,
40+
maintPushNotifications: "auto",
41+
maintMovingEndpointType: "auto",
42+
});
43+
44+
client.on("error", (err: Error) => {
45+
throw new Error(`Client error: ${err.message}`);
46+
});
47+
48+
commandRunner = new TestCommandRunner(client);
49+
50+
await client.connect();
51+
});
52+
53+
afterEach(() => {
54+
client.destroy();
55+
});
56+
57+
it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => {
58+
// PART 1
59+
// Set very low timeout to trigger errors
60+
client.options!.maintRelaxedCommandTimeout = 50;
61+
62+
const { action_id: lowTimeoutBindAndMigrateActionId } =
63+
await faultInjectorClient.migrateAndBindAction({
64+
bdbId: clientConfig.bdbId,
65+
clusterIndex: 0,
66+
});
67+
68+
const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
69+
lowTimeoutBindAndMigrateActionId
70+
);
71+
72+
const lowTimeoutCommandPromises =
73+
await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise);
74+
75+
const lowTimeoutRejectedCommands = (
76+
await Promise.all(lowTimeoutCommandPromises.commandPromises)
77+
).filter((result) => result.status === "rejected");
78+
79+
assert.ok(lowTimeoutRejectedCommands.length > 0);
80+
assert.strictEqual(
81+
lowTimeoutRejectedCommands.filter((rejected) => {
82+
return (
83+
// TODO instanceof doesn't work for some reason
84+
rejected.error.constructor.name ===
85+
"CommandTimeoutDuringMaintananceError"
86+
);
87+
}).length,
88+
lowTimeoutRejectedCommands.length
89+
);
90+
91+
// PART 2
92+
// Set high timeout to avoid errors
93+
client.options!.maintRelaxedCommandTimeout = 10000;
94+
95+
const { action_id: highTimeoutBindAndMigrateActionId } =
96+
await faultInjectorClient.migrateAndBindAction({
97+
bdbId: clientConfig.bdbId,
98+
clusterIndex: 0,
99+
});
100+
101+
const highTimeoutWaitPromise = faultInjectorClient.waitForAction(
102+
highTimeoutBindAndMigrateActionId
103+
);
104+
105+
const highTimeoutCommandPromises =
106+
await commandRunner.fireCommandsUntilStopSignal(highTimeoutWaitPromise);
107+
108+
const highTimeoutRejectedCommands = (
109+
await Promise.all(highTimeoutCommandPromises.commandPromises)
110+
).filter((result) => result.status === "rejected");
111+
112+
assert.strictEqual(highTimeoutRejectedCommands.length, 0);
113+
});
114+
115+
// TODO this is WIP
116+
it.skip("should unrelax command timeout after MAINTENANCE", async () => {
117+
client.options!.maintRelaxedCommandTimeout = 10000;
118+
client.options!.commandOptions = {
119+
...client.options!.commandOptions,
120+
timeout: 1, // Set very low timeout to trigger errors
121+
};
122+
123+
const { action_id: bindAndMigrateActionId } =
124+
await faultInjectorClient.migrateAndBindAction({
125+
bdbId: clientConfig.bdbId,
126+
clusterIndex: 0,
127+
});
128+
129+
const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
130+
bindAndMigrateActionId
131+
);
132+
133+
const relaxedTimeoutCommandPromises =
134+
await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise);
135+
136+
const relaxedTimeoutRejectedCommands = (
137+
await Promise.all(relaxedTimeoutCommandPromises.commandPromises)
138+
).filter((result) => result.status === "rejected");
139+
console.log(
140+
"relaxedTimeoutRejectedCommands",
141+
relaxedTimeoutRejectedCommands
142+
);
143+
144+
assert.ok(relaxedTimeoutRejectedCommands.length === 0);
145+
146+
const unrelaxedCommandPromises =
147+
await commandRunner.fireCommandsUntilStopSignal(setTimeout(1 * 1000));
148+
149+
const unrelaxedRejectedCommands = (
150+
await Promise.all(unrelaxedCommandPromises.commandPromises)
151+
).filter((result) => result.status === "rejected");
152+
153+
assert.ok(unrelaxedRejectedCommands.length > 0);
154+
});
155+
});

0 commit comments

Comments
 (0)