Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions packages/client/lib/sentinel/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,84 @@ describe('legacy tests', () => {
assert.notEqual(sentinelNode!.port, newSentinel.port);
});

it('Should recover after full outage', async function () {
this.timeout(120000);

const allSentinelPorts = frame.getAllSentinelsPort();
const primarySentinelPort = allSentinelPorts[0];
const extraSentinelPorts = allSentinelPorts.slice(1);

// Keep only one sentinel reachable for the test.
await Promise.all(extraSentinelPorts.map(port => frame.stopSentinel(port.toString())));
await setTimeout(1500);

sentinel = RedisSentinel.create({
name: config.sentinelName,
sentinelRootNodes: [{ host: '127.0.0.1', port: primarySentinelPort }],
RESP: 3,
scanInterval: 250
});
sentinel.setTracer(tracer);
sentinel.on("error", () => { });
await sentinel.connect();

await sentinel.set('some-key', 'value');
assert.equal(await sentinel.get('some-key'), 'value');

const allNodePorts = frame.getAllNodesPort();
// Simulate full outage (all Redis nodes + the single configured sentinel).
await Promise.all(allNodePorts.map(port => frame.stopNode(port.toString())));
await frame.stopSentinel(primarySentinelPort.toString());

const timedGet = async () => {
const getPromise = sentinel!.get('some-key');
void getPromise.catch(() => undefined); // Promise.race may timeout first.

return Promise.race([
getPromise,
setTimeout(1000).then(() => {
throw new Error('1s Timeout');
})
]);
};

const pollResults: Array<{ phase: 'outage' | 'recovery'; status: 'success' | 'timeout' | 'error' }> = [];
const pollLoop = async (phase: 'outage' | 'recovery', rounds: number) => {
for (let i = 0; i < rounds; i++) {
try {
await timedGet();
pollResults.push({ phase, status: 'success' });
} catch (err: any) {
pollResults.push({
phase,
status: err?.message === '1s Timeout' ? 'timeout' : 'error'
});
}
await setTimeout(3000);
}
};

// Match the issue's periodic GET calls while outage is active.
await pollLoop('outage', 3);

// Bring only the single configured sentinel back; keep extra sentinels down.
await Promise.all(allNodePorts.map(port => frame.restartNode(port.toString())));
await frame.restartSentinel(primarySentinelPort.toString());

// Continue periodic GET loop and assert recovery.
await pollLoop('recovery', 5);

const sawOutageFailure = pollResults.some(result =>
result.phase === 'outage' && result.status !== 'success'
);
assert.equal(sawOutageFailure, true, 'expected GET failures during outage');

const sawRecoverySuccess = pollResults.some(result =>
result.phase === 'recovery' && result.status === 'success'
);
assert.equal(sawRecoverySuccess, true, 'expected periodic GET to recover after restart');
});

it('timer works, and updates sentinel list', async function () {
this.timeout(60000);

Expand Down
25 changes: 21 additions & 4 deletions packages/client/lib/sentinel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ class RedisSentinelInternal<

#configEpoch: number = 0;

readonly #sentinelSeedNodes: Array<RedisNode>;
#sentinelRootNodes: Array<RedisNode>;
#sentinelClient?: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>;

Expand Down Expand Up @@ -696,7 +697,8 @@ class RedisSentinelInternal<
this.#name = options.name;

this.#RESP = options.RESP;
this.#sentinelRootNodes = Array.from(options.sentinelRootNodes);
this.#sentinelSeedNodes = Array.from(options.sentinelRootNodes);
this.#sentinelRootNodes = Array.from(this.#sentinelSeedNodes);
this.#maxCommandRediscovers = options.maxCommandRediscovers ?? 16;
this.#masterPoolSize = options.masterPoolSize ?? 1;
this.#replicaPoolSize = options.replicaPoolSize ?? 0;
Expand Down Expand Up @@ -951,13 +953,27 @@ class RedisSentinelInternal<
}
}

#sentinelNodeListKey(nodes: Array<RedisNode>) {
return nodes.map(node => `${node.host}:${node.port}`).sort().join('|');
}

#restoreSentinelRootNodesIfEmpty() {
if (this.#sentinelRootNodes.length !== 0) {
return;
}

this.#trace("restoring sentinel roots from seed nodes");
this.#sentinelRootNodes = Array.from(this.#sentinelSeedNodes);
}

#handleSentinelFailure(node: RedisNode) {
const found = this.#sentinelRootNodes.findIndex(
(rootNode) => rootNode.host === node.host && rootNode.port === node.port
);
if (found !== -1) {
this.#sentinelRootNodes.splice(found, 1);
}
this.#restoreSentinelRootNodesIfEmpty();
this.#reset();
}

Expand Down Expand Up @@ -1104,6 +1120,8 @@ class RedisSentinelInternal<

// observe/analyze/transform remediation functions
async observe() {
this.#restoreSentinelRootNodesIfEmpty();

for (const node of this.#sentinelRootNodes) {
let client: RedisClientType<typeof RedisSentinelModule, {}, {}, RespVersions, {}> | undefined;
try {
Expand Down Expand Up @@ -1247,8 +1265,7 @@ class RedisSentinelInternal<
};
this.emit('client-error', event);
this.#handleSentinelFailure(node);
})
.on('end', () => this.#handleSentinelFailure(node));
});
this.#sentinelClient = client;

this.#trace(`transform: adding sentinel client connect() to promise list`);
Expand Down Expand Up @@ -1383,7 +1400,7 @@ class RedisSentinelInternal<
}
}

if (analyzed.sentinelList.length != this.#sentinelRootNodes.length) {
if (this.#sentinelNodeListKey(analyzed.sentinelList) !== this.#sentinelNodeListKey(this.#sentinelRootNodes)) {
this.#sentinelRootNodes = analyzed.sentinelList;
const event: RedisSentinelEvent = {
type: "SENTINE_LIST_CHANGE",
Expand Down
Loading