Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion packages/client/lib/sentinel/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'node:events';
import { CommandArguments, RedisFunctions, RedisModules, RedisScripts, ReplyUnion, RespVersions, TypeMapping } from '../RESP/types';
import { CommandArguments, RedisArgument, RedisFunctions, RedisModules, RedisScripts, ReplyUnion, RespVersions, TypeMapping } from '../RESP/types';
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
import { CommandOptions } from '../client/commands-queue';
import { attachConfig } from '../commander';
Expand All @@ -18,11 +18,16 @@ import { TcpNetConnectOpts } from 'node:net';
import { RedisTcpSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
import { ClientIdentity, ClientRole, generateClientId } from '../client/identity';
import { ScanOptions } from '../commands/SCAN';

interface ClientInfo {
id: number;
}

interface ScanIteratorOptions {
cursor?: RedisArgument;
}

export class RedisSentinelClient<
M extends RedisModules,
F extends RedisFunctions,
Expand Down Expand Up @@ -160,6 +165,18 @@ export class RedisSentinelClient<
return this._commandOptionsProxy('typeMapping', typeMapping);
}

async* scanIterator(
this: RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>,
options?: ScanOptions & ScanIteratorOptions
) {
let cursor = options?.cursor ?? '0';
do {
const reply = await this.scan(cursor, options);
cursor = reply.cursor;
yield reply.keys;
} while (cursor.toString() !== '0');
}

async _execute<T>(
isReadonly: boolean | undefined,
fn: (client: RedisClient<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => Promise<T>
Expand Down Expand Up @@ -425,6 +442,18 @@ export default class RedisSentinel<
return this._commandOptionsProxy('typeMapping', typeMapping);
}

async* scanIterator(
this: RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>,
options?: ScanOptions & ScanIteratorOptions
) {
let cursor = options?.cursor ?? '0';
do {
const reply = await this.use(client => client.scan(cursor, options));
cursor = reply.cursor;
yield reply.keys;
} while (cursor.toString() !== '0');
}

duplicate<
_M extends RedisModules = M,
_F extends RedisFunctions = F,
Expand Down
63 changes: 63 additions & 0 deletions packages/client/lib/sentinel/scan-iterator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL } from '../test-utils';

describe('RedisSentinel scanIterator', () => {
for (const testOptions of [GLOBAL.SENTINEL.OPEN, GLOBAL.SENTINEL.PASSWORD]) {
const passIndex = testOptions.serverArguments.indexOf('--requirepass') + 1;
const password = passIndex === 0 ? undefined : testOptions.serverArguments[passIndex];

describe(`test with password - ${password}`, () => {
testUtils.testWithClientSentinel('scanIterator', async sentinel => {
await Promise.all([
sentinel.set('scan:1', '1'),
sentinel.set('scan:2', '2')
]);

const results = new Set<string>();
for await (const keys of sentinel.scanIterator({ MATCH: 'scan:*', COUNT: 1 })) {
for (const key of keys) {
results.add(key);
}
}

assert.deepEqual(results, new Set(['scan:1', 'scan:2']));
}, testOptions);

testUtils.testWithClientSentinel('leased client scanIterator', async sentinel => {
await Promise.all([
sentinel.set('lease-scan:1', '1'),
sentinel.set('lease-scan:2', '2')
]);

const client = await sentinel.acquire();
try {
const results = new Set<string>();
for await (const keys of client.scanIterator({ MATCH: 'lease-scan:*', COUNT: 1 })) {
for (const key of keys) {
results.add(key);
}
}

assert.deepEqual(results, new Set(['lease-scan:1', 'lease-scan:2']));
} finally {
const release = client.release();
if (release) await release;
}
}, testOptions);
});
}

testUtils.testWithClientSentinel('scanIterator releases master lease before yielding', async sentinel => {
await sentinel.set('scan-deadlock:1', '1');

let didScan = false;
for await (const keys of sentinel.scanIterator({ MATCH: 'scan-deadlock:*', COUNT: 1 })) {
didScan = true;
assert.ok(keys.length > 0);
await sentinel.set('scan-deadlock:seen', '1');
}

assert.equal(didScan, true);
assert.equal(await sentinel.get('scan-deadlock:seen'), '1');
}, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1);
});