Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,74 @@ describe('Client', () => {
assert.deepEqual(map, results);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('scan iterators respect type mapping', async client => {
await Promise.all([
client.mSet(['scan:1', '', 'scan:2', '']),
client.hSet('hash', {
field: 'value'
}),
client.sAdd('set', ['member']),
client.zAdd('zset', {
value: 'member',
score: 1.5
})
]);

const mappedClient = client.withTypeMapping({
[RESP_TYPES.BLOB_STRING]: Buffer,
[RESP_TYPES.DOUBLE]: String
});

const keys = new Set<string>();
for await (const page of mappedClient.scanIterator({ MATCH: 'scan:*' })) {
for (const key of page) {
assert.ok(Buffer.isBuffer(key));
keys.add(key.toString());
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
assert.deepEqual(keys, new Set(['scan:1', 'scan:2']));

const entries = new Map<string, string>();
for await (const page of mappedClient.hScanIterator('hash')) {
for (const { field, value } of page) {
assert.ok(Buffer.isBuffer(field));
assert.ok(Buffer.isBuffer(value));
entries.set(field.toString(), value.toString());
}
}
assert.deepEqual(entries, new Map([['field', 'value']]));

const fields = new Set<string>();
for await (const page of mappedClient.hScanNoValuesIterator('hash')) {
for (const field of page) {
assert.ok(Buffer.isBuffer(field));
fields.add(field.toString());
}
}
assert.deepEqual(fields, new Set(['field']));

const setMembers = new Set<string>();
for await (const page of mappedClient.sScanIterator('set')) {
for (const member of page) {
assert.ok(Buffer.isBuffer(member));
setMembers.add(member.toString());
}
}
assert.deepEqual(setMembers, new Set(['member']));

const sortedSetMembers = new Map<string, string>();
for await (const page of mappedClient.zScanIterator('zset')) {
for (const { value, score } of page) {
assert.ok(Buffer.isBuffer(value));
sortedSetMembers.set(value.toString(), score);
}
}
assert.deepEqual(sortedSetMembers, new Map([['member', '1.5']]));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test asserts string score but gets Buffer

Medium Severity

The zScanIterator section of this test combines BLOB_STRING: Buffer and DOUBLE: String type mappings, then asserts the score equals the string '1.5'. In RESP2 (the default), scores arrive as blob strings, so the decoder converts them to Buffer via the BLOB_STRING: Buffer mapping. When transformDoubleReply[2] then processes this with DOUBLE: String, its case String branch returns the reply as-is — which is already a Buffer, not a string. The assert.deepEqual against new Map([['member', '1.5']]) would fail because the actual score is Buffer.from('1.5'). This test was never run against a real Redis server per the PR description.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 784e02c. Configure here.

}, {
...GLOBAL.SERVERS.OPEN,
minimumDockerVersion: [7, 4]
});

describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
function assertStringListener(message: string, channel: string) {
Expand Down
55 changes: 36 additions & 19 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,21 @@ export type RedisClientType<
RedisClientExtensions<M, F, S, RESP, TYPE_MAPPING>
);

type ProxyClient = RedisClient<any, any, any, any, any>;
type ProxyClient = RedisClient<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>;

type NamespaceProxyClient = { _self: ProxyClient };

type RedisClientMultiCommandConstructor = new (
...args: ConstructorParameters<typeof RedisClientMultiCommand>
) => RedisClientMultiCommand;

type ConfiguredRedisClientClass = {
new (options?: RedisClientOptions): ProxyClient;
prototype: ProxyClient & {
Multi?: RedisClientMultiCommandConstructor;
};
};

interface ScanIteratorOptions {
cursor?: RedisArgument;
}
Expand Down Expand Up @@ -320,7 +331,10 @@ export default class RedisClient<
}
}

static #SingleEntryCache = new SingleEntryCache<any, any>()
static #SingleEntryCache = new SingleEntryCache<
CommanderConfig<RedisModules, RedisFunctions, RedisScripts, RespVersions> | undefined,
ConfiguredRedisClientClass
>()

static factory<
M extends RedisModules = {},
Expand All @@ -340,18 +354,21 @@ export default class RedisClient<
createFunctionCommand: RedisClient.#createFunctionCommand,
createScriptCommand: RedisClient.#createScriptCommand,
config
});
}) as ConfiguredRedisClientClass;

Client.prototype.Multi = RedisClientMultiCommand.extend(config);
Client.prototype.Multi = RedisClientMultiCommand.extend(config) as RedisClientMultiCommandConstructor;

RedisClient.#SingleEntryCache.set(config, Client);
}

return <TYPE_MAPPING extends TypeMapping = {}>(
options?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, keyof Exclude<typeof config, undefined>>
) => {
const ClientCtor = Client as unknown as new (
options?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, keyof Exclude<typeof config, undefined>>
) => RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
// returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create(new Client(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
return Object.create(new ClientCtor(options)) as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
};
}

Expand Down Expand Up @@ -598,11 +615,11 @@ export default class RedisClient<
const cscConfig = this.#options.clientSideCache;
this.#clientSideCache = new BasicClientSideCache(cscConfig);
}
this.#queue.addPushHandler((push: Array<any>): boolean => {
if (push[0].toString() !== 'invalidate') return false;
this.#queue.addPushHandler((push: Array<unknown>): boolean => {
if (String(push[0]) !== 'invalidate') return false;

if (push[1] !== null) {
for (const key of push[1]) {
for (const key of push[1] as Array<RedisArgument>) {
this.#clientSideCache?.invalidate(key)
}
} else {
Expand All @@ -612,11 +629,11 @@ export default class RedisClient<
return true
});
} else if (options?.emitInvalidate) {
this.#queue.addPushHandler((push: Array<any>): boolean => {
if (push[0].toString() !== 'invalidate') return false;
this.#queue.addPushHandler((push: Array<unknown>): boolean => {
if (String(push[0]) !== 'invalidate') return false;

if (push[1] !== null) {
for (const key of push[1]) {
for (const key of push[1] as Array<RedisArgument>) {
this.emit('invalidate', key);
}
} else {
Expand Down Expand Up @@ -1057,7 +1074,7 @@ export default class RedisClient<
*/
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
// @ts-expect-error temporarily clears the socket before reinserting one
this._self.#socket = null;
socket.removeAllListeners();
return socket;
Expand Down Expand Up @@ -1524,7 +1541,7 @@ export default class RedisClient<

MULTI<isTyped extends MultiMode = MULTI_MODE['TYPED']>() {
type Multi = new (...args: ConstructorParameters<typeof RedisClientMultiCommand>) => RedisClientMultiCommandType<isTyped, [], M, F, S, RESP, TYPE_MAPPING>;
return new ((this as any).Multi as Multi)(
return new ((this as unknown as { Multi: Multi }).Multi)(
this._executeMulti.bind(this),
this._executePipeline.bind(this),
this._commandOptions?.typeMapping
Expand All @@ -1542,7 +1559,7 @@ export default class RedisClient<
const reply = await this.scan(cursor, options);
cursor = reply.cursor;
yield reply.keys;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async* hScanIterator(
Expand All @@ -1555,7 +1572,7 @@ export default class RedisClient<
const reply = await this.hScan(key, cursor, options);
cursor = reply.cursor;
yield reply.entries;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async* hScanValuesIterator(
Expand All @@ -1568,7 +1585,7 @@ export default class RedisClient<
const reply = await this.hScanNoValues(key, cursor, options);
cursor = reply.cursor;
yield reply.fields;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async* hScanNoValuesIterator(
Expand All @@ -1581,7 +1598,7 @@ export default class RedisClient<
const reply = await this.hScanNoValues(key, cursor, options);
cursor = reply.cursor;
yield reply.fields;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async* sScanIterator(
Expand All @@ -1594,7 +1611,7 @@ export default class RedisClient<
const reply = await this.sScan(key, cursor, options);
cursor = reply.cursor;
yield reply.members;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async* zScanIterator(
Expand All @@ -1607,7 +1624,7 @@ export default class RedisClient<
const reply = await this.zScan(key, cursor, options);
cursor = reply.cursor;
yield reply.members;
} while (cursor !== '0');
} while (cursor.toString() !== '0');
}

async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) {
Expand Down
20 changes: 20 additions & 0 deletions packages/client/lib/commands/ZSCAN.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL } from '../test-utils';
import { RESP_TYPES } from '../RESP/decoder';
import { parseArgs } from './generic-transformers';
import ZSCAN from './ZSCAN';

Expand Down Expand Up @@ -50,4 +51,23 @@ describe('ZSCAN', () => {
}
);
}, GLOBAL.SERVERS.OPEN);

it('transformReply with type mapping', () => {
assert.deepEqual(
ZSCAN.transformReply(
['0', ['member', '1.5']],
undefined,
{
[RESP_TYPES.DOUBLE]: String
}
),
{
cursor: '0',
members: [{
value: 'member',
score: '1.5'
}]
}
);
});
});
10 changes: 7 additions & 3 deletions packages/client/lib/commands/ZSCAN.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CommandParser } from '../client/parser';
import { RedisArgument, ArrayReply, BlobStringReply, Command } from '../RESP/types';
import { RedisArgument, ArrayReply, BlobStringReply, Command, TypeMapping } from '../RESP/types';
import { ScanCommonOptions, parseScanArguments } from './SCAN';
import { transformSortedSetReply } from './generic-transformers';

Expand All @@ -20,10 +20,14 @@ export default {
parser.pushKey(key);
parseScanArguments(parser, cursor, options);
},
transformReply([cursor, rawMembers]: [BlobStringReply, ArrayReply<BlobStringReply>]) {
transformReply(
[cursor, rawMembers]: [BlobStringReply, ArrayReply<BlobStringReply>],
preserve?: unknown,
typeMapping?: TypeMapping
) {
return {
cursor,
members: transformSortedSetReply[2](rawMembers)
members: transformSortedSetReply[2](rawMembers, preserve, typeMapping)
};
}
} as const satisfies Command;