diff --git a/docs/sentinel.md b/docs/sentinel.md index f10b2953df5..eccf020f76e 100644 --- a/docs/sentinel.md +++ b/docs/sentinel.md @@ -101,3 +101,27 @@ try { clientLease.release(); } ``` + +## Scan Iterator + +The sentinel client supports `scanIterator` for iterating over keys on the master node: + +```javascript +for await (const keys of sentinel.scanIterator()) { + // ... +} +``` + +If a failover occurs during the scan, the iterator will automatically restart from the beginning on the new master to ensure all keys are covered. This may result in duplicate keys being yielded. If your application requires processing each key exactly once, you should implement a deduplication mechanism (like a `Set` or Bloom filter). + +```javascript +const processed = new Set(); +for await (const keys of sentinel.scanIterator()) { + for (const key of keys) { + if (processed.has(key)) continue; + processed.add(key); + + // process key + } +} +``` diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index ef1702eab13..4090c28ff43 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -1041,4 +1041,235 @@ describe('legacy tests', () => { assert.equal(csc.stats().hitCount, 6); }) }); + + describe('scanIterator tests', () => { + testUtils.testWithClientSentinel('should iterate through all keys in normal operation', async sentinel => { + // Set up test data + const testKeys = new Set(); + const entries: Array = []; + + // Create 50 test keys to ensure we get multiple scan iterations + for (let i = 0; i < 50; i++) { + const key = `scantest:${i}`; + testKeys.add(key); + entries.push(key, `value${i}`); + } + + // Insert all test data + await sentinel.mSet(entries); + + // Collect all keys using scanIterator + const foundKeys = new Set(); + for await (const keyBatch of sentinel.scanIterator({ MATCH: 'scantest:*' })) { + for (const key of keyBatch) { + foundKeys.add(key); + } + } + + // Verify all keys were found + assert.deepEqual(testKeys, foundKeys); + }, GLOBAL.SENTINEL.OPEN); + + testUtils.testWithClientSentinel('should respect MATCH pattern', async sentinel => { + // Set up test data with different patterns + await sentinel.mSet([ + 'match:1', 'value1', + 'match:2', 'value2', + 'nomatch:1', 'value3', + 'nomatch:2', 'value4' + ]); + + const foundKeys = new Set(); + for await (const keyBatch of sentinel.scanIterator({ MATCH: 'match:*' })) { + for (const key of keyBatch) { + foundKeys.add(key); + } + } + + const expectedKeys = new Set(['match:1', 'match:2']); + assert.deepEqual(foundKeys, expectedKeys); + }, GLOBAL.SENTINEL.OPEN); + }); + + describe('scanIterator with master failover', () => { + const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: undefined }; + const frame = new SentinelFramework(config); + let sentinel: RedisSentinelType | undefined; + const tracer: Array = []; + + beforeEach(async function () { + this.timeout(60000); + await frame.spawnRedisSentinel(); + await frame.getAllRunning(); + await steadyState(frame); + }); + + afterEach(async function () { + this.timeout(60000); + if (sentinel !== undefined) { + sentinel.destroy(); + sentinel = undefined; + } + await frame.cleanup(); + }); + + it('should restart scan from beginning when master changes during iteration', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ scanInterval: 1000 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => {}); + await sentinel.connect(); + + // Set up test data + const testKeys = new Set(); + const entries: Array = []; + + for (let i = 0; i < 100; i++) { + const key = `failovertest:${i}`; + testKeys.add(key); + entries.push(key, `value${i}`); + } + + await sentinel.mSet(entries); + // Wait for addded keys to be replicated + await setTimeout(2000); + + let masterChangeDetected = false; + let iterationCount = 0; + const foundKeys = new Set(); + + // Listen for manifest change events + sentinel.on("topology-change", (event: RedisSentinelEvent) => { + if (event.type === "MASTER_CHANGE") { + masterChangeDetected = true; + tracer.push(`Master change detected during scan: ${event.node.port}`); + } + }); + + // Get the current master node before starting scan + const originalMaster = sentinel.getMasterNode(); + tracer.push(`Original master port: ${originalMaster?.port}`); + + // Start scanning with a small COUNT to ensure multiple iterations + const scanIterator = sentinel.scanIterator({ + MATCH: "failovertest:*", + COUNT: 10, + }); + + // Consume the scan iterator + try { + for await (const keyBatch of scanIterator) { + iterationCount++; + if (iterationCount === 1) { + tracer.push( + `Triggering master failover by stopping node ${originalMaster?.port}` + ); + await frame.stopNode(originalMaster!.port.toString()); + tracer.push(`Master node stopped`); + } + tracer.push( + `Scan iteration ${iterationCount}, got ${keyBatch.length} keys` + ); + + for (const key of keyBatch) { + foundKeys.add(key); + } + } + } catch (error) { + tracer.push(`Error during scan: ${error}`); + throw error; + } + + // Verify that master change was detected + assert.equal( + masterChangeDetected, + true, + "Master change should have been detected" + ); + + // Verify that we eventually got all keys despite the master change + assert.equal( + foundKeys.size, + testKeys.size, + "Should find all keys despite master failover" + ); + assert.deepEqual( + foundKeys, + testKeys, + "Found keys should match test keys" + ); + + // Verify that the master actually changed + const newMaster = sentinel.getMasterNode(); + tracer.push(`New master port: ${newMaster?.port}`); + assert.notEqual( + originalMaster?.port, + newMaster?.port, + "Master should have changed" + ); + + tracer.push( + `Test completed successfully with ${iterationCount} scan iterations` + ); + }); + + it('should handle master change at scan start', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ scanInterval: 1000 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + // Set up test data + const entries: Array = []; + for (let i = 0; i < 30; i++) { + entries.push(`startfailover:${i}`, `value${i}`); + } + await sentinel.mSet(entries); + + // Wait for addded keys to be replicated + await setTimeout(2000); + + // Get original master and trigger immediate failover + const originalMaster = sentinel.getMasterNode(); + + // Stop master immediately before starting scan + await frame.stopNode(originalMaster!.port.toString()); + + let masterChangeDetected = false; + let masterChangeResolve: () => void; + const masterChangePromise = new Promise((resolve) => { + masterChangeResolve = resolve; + }); + + // Listen for manifest change events + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + if (event.type === "MASTER_CHANGE") { + masterChangeDetected = true; + tracer.push(`Master change detected during scan: ${event.node.port}`); + if (masterChangeResolve) masterChangeResolve(); + } + }); + + await masterChangePromise; + + // Now start scan - should work with new master + const foundKeys = new Set(); + for await (const keyBatch of sentinel.scanIterator({ MATCH: 'startfailover:*' })) { + for (const key of keyBatch) { + foundKeys.add(key); + } + } + + assert.equal(masterChangeDetected, true, 'Master change should have been detected'); + // Should find all keys even though master changed before scan started + assert.equal(foundKeys.size, 30); + + // Verify master actually changed + const newMaster = sentinel.getMasterNode(); + assert.notEqual(originalMaster?.port, newMaster?.port); + }); + }); }); diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index a9a2b9a5e5d..14921c3be6f 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -17,11 +17,17 @@ import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; +import { ScanOptions } from '../commands/SCAN'; +import { RedisArgument } from '../RESP/types'; interface ClientInfo { id: number; } +interface ScanIteratorOptions { + cursor?: RedisArgument; +} + export class RedisSentinelClient< M extends RedisModules, F extends RedisFunctions, @@ -599,6 +605,50 @@ export default class RedisSentinel< this._self.#internal.setTracer(tracer); } + + async *scanIterator( + this: RedisSentinelType, + options?: ScanOptions & ScanIteratorOptions + ) { + // Acquire a master client lease + const masterClient = await this.acquire(); + let cursor = options?.cursor ?? "0"; + let shouldRestart = false; + + // Set up topology change listener + const handleTopologyChange = (event: RedisSentinelEvent) => { + if (event.type === "MASTER_CHANGE") { + shouldRestart = true; + } + }; + + // Listen for master changes + this.on("topology-change", handleTopologyChange); + + try { + do { + // Check if we need to restart due to master change + if (shouldRestart) { + cursor = "0"; + shouldRestart = false; + } + + const reply = await masterClient.scan(cursor, options); + // If a topology change happened during the scan command (which caused a retry), + // the reply is from the new master using the old cursor. We should discard it + // and let the loop restart the scan from cursor "0". + if (shouldRestart) { + continue; + } + cursor = reply.cursor; + yield reply.keys; + } while (cursor !== "0"); + } finally { + // Clean up: remove event listener and release the client + this.removeListener("topology-change", handleTopologyChange); + masterClient.release(); + } + } } class RedisSentinelInternal<