Skip to content

Commit 9e49969

Browse files
PavelPashovnkaradzhov
authored andcommitted
test(cluster): add fault injector infrastructure for hitless upgrade testing
1 parent c97f43c commit 9e49969

File tree

8 files changed

+526
-27
lines changed

8 files changed

+526
-27
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import assert from "node:assert";
2+
import diagnostics_channel from "node:diagnostics_channel";
3+
4+
import testUtils from "../test-utils";
5+
import { DiagnosticsEvent } from "../client/enterprise-maintenance-manager";
6+
7+
describe("Cluster Maintenance", () => {
8+
const MASTERS_COUNT = 3;
9+
10+
before(() => {
11+
process.env.REDIS_EMIT_DIAGNOSTICS = "1";
12+
});
13+
14+
after(() => {
15+
delete process.env.REDIS_EMIT_DIAGNOSTICS;
16+
});
17+
18+
let diagnosticEvents: DiagnosticsEvent[] = [];
19+
20+
const onMessage = (message: unknown) => {
21+
const event = message as DiagnosticsEvent;
22+
if (["SMIGRATING", "SMIGRATED"].includes(event.type)) {
23+
diagnosticEvents.push(event);
24+
}
25+
};
26+
27+
beforeEach(() => {
28+
diagnostics_channel.subscribe("redis.maintenance", onMessage);
29+
diagnosticEvents = [];
30+
});
31+
32+
afterEach(() => {
33+
diagnostics_channel.unsubscribe("redis.maintenance", onMessage);
34+
});
35+
36+
testUtils.testWithProxiedCluster(
37+
"should handle failover",
38+
async (cluster, { faultInjectorClient }) => {
39+
assert.equal(
40+
diagnosticEvents.length,
41+
0,
42+
"should not have received any notifications yet"
43+
);
44+
assert.equal(
45+
cluster.masters.length,
46+
MASTERS_COUNT,
47+
`should have ${MASTERS_COUNT} masters at start`
48+
);
49+
50+
const { action_id: failoverActionId } =
51+
await faultInjectorClient.triggerAction({
52+
type: "failover",
53+
parameters: {
54+
cluster_index: 0,
55+
},
56+
});
57+
58+
await faultInjectorClient.waitForAction(failoverActionId);
59+
60+
const sMigratingEventCount = diagnosticEvents.filter(
61+
(event) => event.type === "SMIGRATING"
62+
).length;
63+
assert(
64+
sMigratingEventCount >= 1,
65+
"should have received at least one SMIGRATING notification"
66+
);
67+
const sMigratedEventCount = diagnosticEvents.filter(
68+
(event) => event.type === "SMIGRATED"
69+
).length;
70+
assert(
71+
sMigratedEventCount >= 1,
72+
"should have received at least one SMIGRATED notification"
73+
);
74+
assert.equal(
75+
cluster.masters.length,
76+
MASTERS_COUNT - 1,
77+
`should have ${MASTERS_COUNT - 1} masters after failover`
78+
);
79+
},
80+
{
81+
numberOfMasters: MASTERS_COUNT,
82+
clusterConfiguration: {
83+
defaults: {
84+
maintNotifications: "enabled",
85+
maintEndpointType: "auto",
86+
},
87+
RESP: 3,
88+
},
89+
}
90+
);
91+
});

packages/client/lib/tests/test-scenario/fault-injector-client.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { setTimeout } from "node:timers/promises";
22

3+
// TODO remove types and utilize IFaultInjectorClient
34
export type ActionType =
45
| "dmc_restart"
56
| "failover"
@@ -54,20 +55,6 @@ export class FaultInjectorClient {
5455
return this.#request<T>("POST", "/action", action);
5556
}
5657

57-
// public async printStatus() {
58-
// const action = {
59-
// type: 'execute_rladmin_command',
60-
// parameters: {
61-
// rladmin_command: "status",
62-
// bdb_id: "1"
63-
// }
64-
// }
65-
// const { action_id } = await this.#request<{action_id: string}>("POST", "/action", action);
66-
// const status = await this.waitForAction(action_id);
67-
// //@ts-ignore
68-
// console.log(status.output.output);
69-
// }
70-
7158
/**
7259
* Gets the status of a specific action.
7360
* @param actionId The ID of the action to check

packages/test-utils/lib/dockers.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ export async function spawnProxiedRedisServerDocker(
159159
"--network", "host",
160160
"-e", `LISTEN_PORT=${ports.join(',')}`,
161161
"-e", `API_PORT=${apiPort}`,
162-
"-e", "TIEOUT=0",
162+
"-e", "TIMEOUT=0",
163+
"-e", "TARGET_HOST=0.0.0.0",
163164
"-e", `DEFAULT_INTERCEPTORS=${config.defaultInterceptors.join(',')}`,
164165
"-e", "ENABLE_LOGGING=true",
165166
"cae-resp-proxy-standalone"
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import { setTimeout } from "node:timers/promises";
2+
3+
import { ActionRequest, ActionStatus, IFaultInjectorClient } from "./types";
4+
5+
export class FaultInjectorClient implements IFaultInjectorClient {
6+
private baseUrl: string;
7+
#fetch: typeof fetch;
8+
9+
constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) {
10+
this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash
11+
this.#fetch = fetchImpl;
12+
}
13+
14+
/**
15+
* Lists all available actions.
16+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
17+
*/
18+
public listActions<T = unknown>(): Promise<T> {
19+
return this.#request<T>("GET", "/action");
20+
}
21+
22+
/**
23+
* Triggers a specific action.
24+
* @param action The action request to trigger
25+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
26+
*/
27+
public triggerAction<T extends { action_id: string }>(
28+
action: ActionRequest
29+
): Promise<T> {
30+
return this.#request<T>("POST", "/action", action);
31+
}
32+
33+
/**
34+
* Gets the status of a specific action.
35+
* @param actionId The ID of the action to check
36+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
37+
*/
38+
public getActionStatus<T = ActionStatus>(actionId: string): Promise<T> {
39+
return this.#request<T>("GET", `/action/${actionId}`);
40+
}
41+
42+
/**
43+
* Waits for an action to complete.
44+
* @param actionId The ID of the action to wait for
45+
* @param options Optional timeout and max wait time
46+
* @throws {Error} When the action does not complete within the max wait time
47+
*/
48+
public async waitForAction(
49+
actionId: string,
50+
{
51+
timeoutMs,
52+
maxWaitTimeMs,
53+
}: {
54+
timeoutMs?: number;
55+
maxWaitTimeMs?: number;
56+
} = {}
57+
): Promise<ActionStatus> {
58+
const timeout = timeoutMs || 1000;
59+
const maxWaitTime = maxWaitTimeMs || 60000;
60+
61+
const startTime = Date.now();
62+
63+
while (Date.now() - startTime < maxWaitTime) {
64+
const action = await this.getActionStatus<ActionStatus>(actionId);
65+
66+
if (action.status === "failed") {
67+
throw new Error(
68+
`Action id: ${actionId} failed! Error: ${action.error}`
69+
);
70+
}
71+
72+
if (["finished", "success"].includes(action.status)) {
73+
return action;
74+
}
75+
76+
await setTimeout(timeout);
77+
}
78+
79+
throw new Error(`Timeout waiting for action ${actionId}`);
80+
}
81+
82+
async migrateAndBindAction({
83+
bdbId,
84+
clusterIndex,
85+
}: {
86+
bdbId: string | number;
87+
clusterIndex: string | number;
88+
}) {
89+
const bdbIdStr = bdbId.toString();
90+
const clusterIndexStr = clusterIndex.toString();
91+
92+
return this.triggerAction<{
93+
action_id: string;
94+
}>({
95+
type: "sequence_of_actions",
96+
parameters: {
97+
bdbId: bdbIdStr,
98+
actions: [
99+
{
100+
type: "migrate",
101+
params: {
102+
cluster_index: clusterIndexStr,
103+
bdb_id: bdbIdStr,
104+
},
105+
},
106+
{
107+
type: "bind",
108+
params: {
109+
cluster_index: clusterIndexStr,
110+
bdb_id: bdbIdStr,
111+
},
112+
},
113+
],
114+
},
115+
});
116+
}
117+
118+
async #request<T>(
119+
method: string,
120+
path: string,
121+
body?: Object | string
122+
): Promise<T> {
123+
const url = `${this.baseUrl}${path}`;
124+
const headers: Record<string, string> = {
125+
"Content-Type": "application/json",
126+
};
127+
128+
let payload: string | undefined;
129+
130+
if (body) {
131+
if (typeof body === "string") {
132+
headers["Content-Type"] = "text/plain";
133+
payload = body;
134+
} else {
135+
headers["Content-Type"] = "application/json";
136+
payload = JSON.stringify(body);
137+
}
138+
}
139+
140+
const response = await this.#fetch(url, { method, headers, body: payload });
141+
142+
if (!response.ok) {
143+
try {
144+
const text = await response.text();
145+
throw new Error(`HTTP ${response.status} - ${text}`);
146+
} catch {
147+
throw new Error(`HTTP ${response.status}`);
148+
}
149+
}
150+
151+
try {
152+
return (await response.json()) as T;
153+
} catch {
154+
throw new Error(
155+
`HTTP ${response.status} - Unable to parse response as JSON`
156+
);
157+
}
158+
}
159+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from "./fault-injector-client";
2+
export * from "./proxied-fault-injector-cluster";

0 commit comments

Comments
 (0)