Skip to content

Commit 77fef62

Browse files
committed
Sentinel scan iterator
1 parent 7c419e0 commit 77fef62

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

docs/sentinel.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,27 @@ try {
101101
clientLease.release();
102102
}
103103
```
104+
105+
## Scan Iterator
106+
107+
The sentinel client supports `scanIterator` for iterating over keys on the master node:
108+
109+
```javascript
110+
for await (const keys of sentinel.scanIterator()) {
111+
// ...
112+
}
113+
```
114+
115+
If a failover occurs during the scan, the iterator will automatically restart from the beginning on the new master to ensure all keys are covered. This may result in duplicate keys being yielded. If your application requires processing each key exactly once, you should implement a deduplication mechanism (like a `Set` or Bloom filter).
116+
117+
```javascript
118+
const processed = new Set();
119+
for await (const keys of sentinel.scanIterator()) {
120+
for (const key of keys) {
121+
if (processed.has(key)) continue;
122+
processed.add(key);
123+
124+
// process key
125+
}
126+
}
127+
```

packages/client/lib/sentinel/index.spec.ts

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,4 +1041,237 @@ describe('legacy tests', () => {
10411041
assert.equal(csc.stats().hitCount, 6);
10421042
})
10431043
});
1044+
1045+
describe('scanIterator tests', () => {
1046+
testUtils.testWithClientSentinel('should iterate through all keys in normal operation', async sentinel => {
1047+
// Set up test data
1048+
const testKeys = new Set<string>();
1049+
const entries: Array<string> = [];
1050+
1051+
// Create 50 test keys to ensure we get multiple scan iterations
1052+
for (let i = 0; i < 50; i++) {
1053+
const key = `scantest:${i}`;
1054+
testKeys.add(key);
1055+
entries.push(key, `value${i}`);
1056+
}
1057+
1058+
// Insert all test data
1059+
await sentinel.mSet(entries);
1060+
1061+
// Collect all keys using scanIterator
1062+
const foundKeys = new Set<string>();
1063+
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'scantest:*' })) {
1064+
for (const key of keyBatch) {
1065+
foundKeys.add(key);
1066+
}
1067+
}
1068+
1069+
// Verify all keys were found
1070+
assert.deepEqual(testKeys, foundKeys);
1071+
}, GLOBAL.SENTINEL.OPEN);
1072+
1073+
testUtils.testWithClientSentinel('should respect MATCH pattern', async sentinel => {
1074+
// Set up test data with different patterns
1075+
await sentinel.mSet([
1076+
'match:1', 'value1',
1077+
'match:2', 'value2',
1078+
'nomatch:1', 'value3',
1079+
'nomatch:2', 'value4'
1080+
]);
1081+
1082+
const foundKeys = new Set<string>();
1083+
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'match:*' })) {
1084+
for (const key of keyBatch) {
1085+
foundKeys.add(key);
1086+
}
1087+
}
1088+
1089+
const expectedKeys = new Set(['match:1', 'match:2']);
1090+
assert.deepEqual(foundKeys, expectedKeys);
1091+
}, GLOBAL.SENTINEL.OPEN);
1092+
});
1093+
1094+
describe('scanIterator with master failover', () => {
1095+
const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: undefined };
1096+
const frame = new SentinelFramework(config);
1097+
let sentinel: RedisSentinelType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
1098+
const tracer: Array<string> = [];
1099+
1100+
before(async function () {
1101+
this.timeout(60000);
1102+
await frame.spawnRedisSentinel();
1103+
await steadyState(frame);
1104+
});
1105+
1106+
afterEach(async function () {
1107+
if (sentinel !== undefined) {
1108+
sentinel.destroy();
1109+
sentinel = undefined;
1110+
}
1111+
});
1112+
1113+
after(async function () {
1114+
this.timeout(60000);
1115+
await frame.cleanup();
1116+
});
1117+
1118+
it('should restart scan from beginning when master changes during iteration', async function () {
1119+
this.timeout(60000);
1120+
1121+
sentinel = frame.getSentinelClient({ scanInterval: 1000 });
1122+
sentinel.setTracer(tracer);
1123+
sentinel.on("error", () => {});
1124+
await sentinel.connect();
1125+
1126+
// Set up test data
1127+
const testKeys = new Set<string>();
1128+
const entries: Array<string> = [];
1129+
1130+
for (let i = 0; i < 100; i++) {
1131+
const key = `failovertest:${i}`;
1132+
testKeys.add(key);
1133+
entries.push(key, `value${i}`);
1134+
}
1135+
1136+
await sentinel.mSet(entries);
1137+
// Wait for addded keys to be replicated
1138+
await setTimeout(2000);
1139+
1140+
let masterChangeDetected = false;
1141+
let iterationCount = 0;
1142+
const foundKeys = new Set<string>();
1143+
1144+
// Listen for manifest change events
1145+
sentinel.on("topology-change", (event: RedisSentinelEvent) => {
1146+
if (event.type === "MASTER_CHANGE") {
1147+
masterChangeDetected = true;
1148+
tracer.push(`Master change detected during scan: ${event.node.port}`);
1149+
}
1150+
});
1151+
1152+
// Get the current master node before starting scan
1153+
const originalMaster = sentinel.getMasterNode();
1154+
tracer.push(`Original master port: ${originalMaster?.port}`);
1155+
1156+
// Start scanning with a small COUNT to ensure multiple iterations
1157+
const scanIterator = sentinel.scanIterator({
1158+
MATCH: "failovertest:*",
1159+
COUNT: 10,
1160+
});
1161+
1162+
// Consume the scan iterator
1163+
try {
1164+
for await (const keyBatch of scanIterator) {
1165+
iterationCount++;
1166+
if (iterationCount === 1) {
1167+
tracer.push(
1168+
`Triggering master failover by stopping node ${originalMaster?.port}`
1169+
);
1170+
await frame.stopNode(originalMaster!.port.toString());
1171+
tracer.push(`Master node stopped`);
1172+
}
1173+
tracer.push(
1174+
`Scan iteration ${iterationCount}, got ${keyBatch.length} keys`
1175+
);
1176+
1177+
for (const key of keyBatch) {
1178+
foundKeys.add(key);
1179+
}
1180+
}
1181+
} catch (error) {
1182+
tracer.push(`Error during scan: ${error}`);
1183+
throw error;
1184+
}
1185+
1186+
// Verify that master change was detected
1187+
assert.equal(
1188+
masterChangeDetected,
1189+
true,
1190+
"Master change should have been detected"
1191+
);
1192+
1193+
// Verify that we eventually got all keys despite the master change
1194+
assert.equal(
1195+
foundKeys.size,
1196+
testKeys.size,
1197+
"Should find all keys despite master failover"
1198+
);
1199+
assert.deepEqual(
1200+
foundKeys,
1201+
testKeys,
1202+
"Found keys should match test keys"
1203+
);
1204+
1205+
// Verify that the master actually changed
1206+
const newMaster = sentinel.getMasterNode();
1207+
tracer.push(`New master port: ${newMaster?.port}`);
1208+
assert.notEqual(
1209+
originalMaster?.port,
1210+
newMaster?.port,
1211+
"Master should have changed"
1212+
);
1213+
1214+
tracer.push(
1215+
`Test completed successfully with ${iterationCount} scan iterations`
1216+
);
1217+
});
1218+
1219+
it('should handle master change at scan start', async function () {
1220+
this.timeout(60000);
1221+
1222+
sentinel = frame.getSentinelClient({ scanInterval: 1000 });
1223+
sentinel.setTracer(tracer);
1224+
sentinel.on("error", () => { });
1225+
await sentinel.connect();
1226+
1227+
// Set up test data
1228+
const entries: Array<string> = [];
1229+
for (let i = 0; i < 30; i++) {
1230+
entries.push(`startfailover:${i}`, `value${i}`);
1231+
}
1232+
await sentinel.mSet(entries);
1233+
1234+
// Wait for addded keys to be replicated
1235+
await setTimeout(2000);
1236+
1237+
// Get original master and trigger immediate failover
1238+
const originalMaster = sentinel.getMasterNode();
1239+
1240+
// Stop master immediately before starting scan
1241+
await frame.stopNode(originalMaster!.port.toString());
1242+
1243+
let masterChangeDetected = false;
1244+
let masterChangeResolve: () => void;
1245+
const masterChangePromise = new Promise<void>((resolve) => {
1246+
masterChangeResolve = resolve;
1247+
});
1248+
1249+
// Listen for manifest change events
1250+
sentinel.on('topology-change', (event: RedisSentinelEvent) => {
1251+
if (event.type === "MASTER_CHANGE") {
1252+
masterChangeDetected = true;
1253+
tracer.push(`Master change detected during scan: ${event.node.port}`);
1254+
if (masterChangeResolve) masterChangeResolve();
1255+
}
1256+
});
1257+
1258+
await masterChangePromise;
1259+
1260+
// Now start scan - should work with new master
1261+
const foundKeys = new Set<string>();
1262+
for await (const keyBatch of sentinel.scanIterator({ MATCH: 'startfailover:*' })) {
1263+
for (const key of keyBatch) {
1264+
foundKeys.add(key);
1265+
}
1266+
}
1267+
1268+
assert.equal(masterChangeDetected, true, 'Master change should have been detected');
1269+
// Should find all keys even though master changed before scan started
1270+
assert.equal(foundKeys.size, 30);
1271+
1272+
// Verify master actually changed
1273+
const newMaster = sentinel.getMasterNode();
1274+
assert.notEqual(originalMaster?.port, newMaster?.port);
1275+
});
1276+
});
10441277
});

packages/client/lib/sentinel/index.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ import { WaitQueue } from './wait-queue';
1717
import { TcpNetConnectOpts } from 'node:net';
1818
import { RedisTcpSocketOptions } from '../client/socket';
1919
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
20+
import { ScanOptions } from '../commands/SCAN';
21+
import { RedisArgument } from '../RESP/types';
2022

2123
interface ClientInfo {
2224
id: number;
2325
}
2426

27+
interface ScanIteratorOptions {
28+
cursor?: RedisArgument;
29+
}
30+
2531
export class RedisSentinelClient<
2632
M extends RedisModules,
2733
F extends RedisFunctions,
@@ -599,6 +605,50 @@ export default class RedisSentinel<
599605

600606
this._self.#internal.setTracer(tracer);
601607
}
608+
609+
async *scanIterator(
610+
this: RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>,
611+
options?: ScanOptions & ScanIteratorOptions
612+
) {
613+
// Acquire a master client lease
614+
const masterClient = await this.acquire();
615+
let cursor = options?.cursor ?? "0";
616+
let shouldRestart = false;
617+
618+
// Set up topology change listener
619+
const handleTopologyChange = (event: RedisSentinelEvent) => {
620+
if (event.type === "MASTER_CHANGE") {
621+
shouldRestart = true;
622+
}
623+
};
624+
625+
// Listen for master changes
626+
this.on("topology-change", handleTopologyChange);
627+
628+
try {
629+
do {
630+
// Check if we need to restart due to master change
631+
if (shouldRestart) {
632+
cursor = "0";
633+
shouldRestart = false;
634+
}
635+
636+
const reply = await masterClient.scan(cursor, options);
637+
// If a topology change happened during the scan command (which caused a retry),
638+
// the reply is from the new master using the old cursor. We should discard it
639+
// and let the loop restart the scan from cursor "0".
640+
if (shouldRestart) {
641+
continue;
642+
}
643+
cursor = reply.cursor;
644+
yield reply.keys;
645+
} while (cursor !== "0");
646+
} finally {
647+
// Clean up: remove event listener and release the client
648+
this.removeListener("topology-change", handleTopologyChange);
649+
masterClient.release();
650+
}
651+
}
602652
}
603653

604654
class RedisSentinelInternal<

0 commit comments

Comments
 (0)