Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/sentinel.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,27 @@ try {
clientLease.release();
}
```

## Scan Iterator

The sentinel client supports `scanIterator` for iterating over keys on the master node:

```javascript
for await (const keys of sentinel.scanIterator()) {
// ...
}
```

If a failover occurs during the scan, the iterator will automatically restart from the beginning on the new master to ensure all keys are covered. This may result in duplicate keys being yielded. If your application requires processing each key exactly once, you should implement a deduplication mechanism (like a `Set` or Bloom filter).

```javascript
const processed = new Set();
for await (const keys of sentinel.scanIterator()) {
for (const key of keys) {
if (processed.has(key)) continue;
processed.add(key);

// process key
}
}
```
231 changes: 231 additions & 0 deletions packages/client/lib/sentinel/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,235 @@ describe('legacy tests', () => {
assert.equal(csc.stats().hitCount, 6);
})
});

describe('scanIterator tests', () => {
testUtils.testWithClientSentinel('should iterate through all keys in normal operation', async sentinel => {
// Set up test data
const testKeys = new Set<string>();
const entries: Array<string> = [];

// Create 50 test keys to ensure we get multiple scan iterations
for (let i = 0; i < 50; i++) {
const key = `scantest:${i}`;
testKeys.add(key);
entries.push(key, `value${i}`);
}

// Insert all test data
await sentinel.mSet(entries);

// Collect all keys using scanIterator
const foundKeys = new Set<string>();
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'scantest:*' })) {
for (const key of keyBatch) {
foundKeys.add(key);
}
}

// Verify all keys were found
assert.deepEqual(testKeys, foundKeys);
}, GLOBAL.SENTINEL.OPEN);

testUtils.testWithClientSentinel('should respect MATCH pattern', async sentinel => {
// Set up test data with different patterns
await sentinel.mSet([
'match:1', 'value1',
'match:2', 'value2',
'nomatch:1', 'value3',
'nomatch:2', 'value4'
]);

const foundKeys = new Set<string>();
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'match:*' })) {
for (const key of keyBatch) {
foundKeys.add(key);
}
}

const expectedKeys = new Set(['match:1', 'match:2']);
assert.deepEqual(foundKeys, expectedKeys);
}, GLOBAL.SENTINEL.OPEN);
});

describe('scanIterator with master failover', () => {
const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: undefined };
const frame = new SentinelFramework(config);
let sentinel: RedisSentinelType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
const tracer: Array<string> = [];

beforeEach(async function () {
this.timeout(60000);
await frame.spawnRedisSentinel();
await frame.getAllRunning();
await steadyState(frame);
});

afterEach(async function () {
this.timeout(60000);
if (sentinel !== undefined) {
sentinel.destroy();
sentinel = undefined;
}
await frame.cleanup();
});

it('should restart scan from beginning when master changes during iteration', async function () {
this.timeout(60000);

sentinel = frame.getSentinelClient({ scanInterval: 1000 });
sentinel.setTracer(tracer);
sentinel.on("error", () => {});
await sentinel.connect();

// Set up test data
const testKeys = new Set<string>();
const entries: Array<string> = [];

for (let i = 0; i < 100; i++) {
const key = `failovertest:${i}`;
testKeys.add(key);
entries.push(key, `value${i}`);
}

await sentinel.mSet(entries);
// Wait for addded keys to be replicated
await setTimeout(2000);

let masterChangeDetected = false;
let iterationCount = 0;
const foundKeys = new Set<string>();

// Listen for manifest change events
sentinel.on("topology-change", (event: RedisSentinelEvent) => {
if (event.type === "MASTER_CHANGE") {
masterChangeDetected = true;
tracer.push(`Master change detected during scan: ${event.node.port}`);
}
});

// Get the current master node before starting scan
const originalMaster = sentinel.getMasterNode();
tracer.push(`Original master port: ${originalMaster?.port}`);

// Start scanning with a small COUNT to ensure multiple iterations
const scanIterator = sentinel.scanIterator({
MATCH: "failovertest:*",
COUNT: 10,
});

// Consume the scan iterator
try {
for await (const keyBatch of scanIterator) {
iterationCount++;
if (iterationCount === 1) {
tracer.push(
`Triggering master failover by stopping node ${originalMaster?.port}`
);
await frame.stopNode(originalMaster!.port.toString());
tracer.push(`Master node stopped`);
}
tracer.push(
`Scan iteration ${iterationCount}, got ${keyBatch.length} keys`
);

for (const key of keyBatch) {
foundKeys.add(key);
}
}
} catch (error) {
tracer.push(`Error during scan: ${error}`);
throw error;
}

// Verify that master change was detected
assert.equal(
masterChangeDetected,
true,
"Master change should have been detected"
);

// Verify that we eventually got all keys despite the master change
assert.equal(
foundKeys.size,
testKeys.size,
"Should find all keys despite master failover"
);
assert.deepEqual(
foundKeys,
testKeys,
"Found keys should match test keys"
);

// Verify that the master actually changed
const newMaster = sentinel.getMasterNode();
tracer.push(`New master port: ${newMaster?.port}`);
assert.notEqual(
originalMaster?.port,
newMaster?.port,
"Master should have changed"
);

tracer.push(
`Test completed successfully with ${iterationCount} scan iterations`
);
});

it('should handle master change at scan start', async function () {
this.timeout(60000);

sentinel = frame.getSentinelClient({ scanInterval: 1000 });
sentinel.setTracer(tracer);
sentinel.on("error", () => { });
await sentinel.connect();

// Set up test data
const entries: Array<string> = [];
for (let i = 0; i < 30; i++) {
entries.push(`startfailover:${i}`, `value${i}`);
}
await sentinel.mSet(entries);

// Wait for addded keys to be replicated
await setTimeout(2000);

// Get original master and trigger immediate failover
const originalMaster = sentinel.getMasterNode();

// Stop master immediately before starting scan
await frame.stopNode(originalMaster!.port.toString());

let masterChangeDetected = false;
let masterChangeResolve: () => void;
const masterChangePromise = new Promise<void>((resolve) => {
masterChangeResolve = resolve;
});

// Listen for manifest change events
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
if (event.type === "MASTER_CHANGE") {
masterChangeDetected = true;
tracer.push(`Master change detected during scan: ${event.node.port}`);
if (masterChangeResolve) masterChangeResolve();
}
});

await masterChangePromise;

// Now start scan - should work with new master
const foundKeys = new Set<string>();
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'startfailover:*' })) {
for (const key of keyBatch) {
foundKeys.add(key);
}
}

assert.equal(masterChangeDetected, true, 'Master change should have been detected');
// Should find all keys even though master changed before scan started
assert.equal(foundKeys.size, 30);

// Verify master actually changed
const newMaster = sentinel.getMasterNode();
assert.notEqual(originalMaster?.port, newMaster?.port);
});
});
});
50 changes: 50 additions & 0 deletions packages/client/lib/sentinel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ import { WaitQueue } from './wait-queue';
import { TcpNetConnectOpts } from 'node:net';
import { RedisTcpSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
import { ScanOptions } from '../commands/SCAN';
import { RedisArgument } from '../RESP/types';

interface ClientInfo {
id: number;
}

interface ScanIteratorOptions {
cursor?: RedisArgument;
}

export class RedisSentinelClient<
M extends RedisModules,
F extends RedisFunctions,
Expand Down Expand Up @@ -599,6 +605,50 @@ export default class RedisSentinel<

this._self.#internal.setTracer(tracer);
}

async *scanIterator(
this: RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>,
options?: ScanOptions & ScanIteratorOptions
) {
// Acquire a master client lease
const masterClient = await this.acquire();
let cursor = options?.cursor ?? "0";
let shouldRestart = false;

// Set up topology change listener
const handleTopologyChange = (event: RedisSentinelEvent) => {
if (event.type === "MASTER_CHANGE") {
shouldRestart = true;
}
};

// Listen for master changes
this.on("topology-change", handleTopologyChange);

try {
do {
// Check if we need to restart due to master change
if (shouldRestart) {
cursor = "0";
shouldRestart = false;
}

const reply = await masterClient.scan(cursor, options);
// If a topology change happened during the scan command (which caused a retry),
// the reply is from the new master using the old cursor. We should discard it
// and let the loop restart the scan from cursor "0".
if (shouldRestart) {
continue;
}
cursor = reply.cursor;
yield reply.keys;
} while (cursor !== "0");
} finally {
// Clean up: remove event listener and release the client
this.removeListener("topology-change", handleTopologyChange);
masterClient.release();
}
}
}

class RedisSentinelInternal<
Expand Down