Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/bloom/lib/commands/count-min-sketch/MERGE.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ describe('CMS.MERGE', () => {
);
});

it('without WEIGHTS with Uint8Array sources', () => {
// Regression: Uint8Array is a valid RedisArgument but was not matched by
// the `instanceof Buffer` guard in isPlainSketches, causing it to fall
// into the weighted branch and emit wrong arguments.
const source = new TextEncoder().encode('source');
assert.deepEqual(
parseArgs(MERGE, 'destination', [source]),
['CMS.MERGE', 'destination', '1', source]
);
});

it('with WEIGHTS', () => {
assert.deepEqual(
parseArgs(MERGE, 'destination', [{
Expand Down
2 changes: 1 addition & 1 deletion packages/bloom/lib/commands/count-min-sketch/MERGE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ export default {
} as const satisfies Command;

function isPlainSketches(src: BfMergeSketches): src is Array<RedisArgument> {
return typeof src[0] === 'string' || src[0] instanceof Buffer;
return typeof src[0] === 'string' || src[0] instanceof Buffer || src[0] instanceof Uint8Array;
}
30 changes: 27 additions & 3 deletions packages/client/lib/RESP/decoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function assertSpyCalls(spy: SinonSpy, replies?: Array<unknown>) {

assert.equal(spy.callCount, replies.length);
for (const [i, reply] of replies.entries()) {
assert.deepEqual(
assert.deepStrictEqual(
spy.getCall(i).args,
[reply]
);
Expand All @@ -75,13 +75,13 @@ describe('RESP Decoder', () => {
toWrite: Buffer.from('_\r\n'),
replies: [null]
});

describe('Boolean', () => {
test('true', {
toWrite: Buffer.from('#t\r\n'),
replies: [true]
});

test('false', {
toWrite: Buffer.from('#f\r\n'),
replies: [false]
Expand Down Expand Up @@ -290,6 +290,30 @@ describe('RESP Decoder', () => {
replies: [Buffer.from('OK')]
});

test("Simple string 'OK' as Uint8Array", {
typeMapping: {
[RESP_TYPES.SIMPLE_STRING]: Uint8Array
},
toWrite: Buffer.from('+OK\r\n'),
replies: [new Uint8Array([79, 75])]
});

test("Blob string 'OK' as Uint8Array", {
typeMapping: {
[RESP_TYPES.BLOB_STRING]: Uint8Array
},
toWrite: Buffer.from('$2\r\nOK\r\n'),
replies: [new Uint8Array([79, 75])]
});

test("Verbatim string 'OK' as Uint8Array", {
typeMapping: {
[RESP_TYPES.VERBATIM_STRING]: Uint8Array
},
toWrite: Buffer.from('=6\r\ntxt:OK\r\n'),
replies: [new Uint8Array([79, 75])]
});

test("'é'", {
toWrite: Buffer.from('=6\r\ntxt:é\r\n'),
replies: ['é']
Expand Down
20 changes: 20 additions & 0 deletions packages/client/lib/RESP/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ export class Decoder {
}

const slice = chunk.subarray(start, crlfIndex);

if (type === Uint8Array) {
return new Uint8Array(slice.buffer, slice.byteOffset, slice.byteLength);
}

return type === Buffer ?
slice :
slice.toString();
Expand All @@ -507,6 +512,11 @@ export class Decoder {

chunks.push(chunk.subarray(start, crlfIndex));
const buffer = Buffer.concat(chunks);

if (type === Uint8Array) {
return new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength);
}

return type === Buffer ? buffer : buffer.toString();
}

Expand Down Expand Up @@ -555,6 +565,11 @@ export class Decoder {

const slice = chunk.subarray(this.#cursor, end);
this.#cursor = end + skip;

if (type === Uint8Array) {
return new Uint8Array(slice.buffer, slice.byteOffset, slice.byteLength);
}

return type === Buffer ?
slice :
slice.toString();
Expand All @@ -578,6 +593,11 @@ export class Decoder {
chunks.push(chunk.subarray(this.#cursor, end));
this.#cursor = end + skip;
const buffer = Buffer.concat(chunks);

if (type === Uint8Array) {
return new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength);
}

return type === Buffer ? buffer : buffer.toString();
}

Expand Down
9 changes: 9 additions & 0 deletions packages/client/lib/RESP/encoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,13 @@ describe('RESP Encoder', () => {
['*1\r\n$6\r\n', Buffer.from('string'), '\r\n']
);
});

it('uint8array', () => {
const uint8Array = new Uint8Array([115, 116, 114, 105, 110, 103]); // 'string'
assert.deepEqual(
encodeCommand([uint8Array]),
['*1\r\n$6\r\n', Buffer.from('string'), '\r\n']
);
});

});
12 changes: 10 additions & 2 deletions packages/client/lib/RESP/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@ export default function encodeCommand(args: ReadonlyArray<RedisArgument>): Reado
const arg = args[i];
if (typeof arg === 'string') {
strings += '$' + Buffer.byteLength(arg) + CRLF + arg + CRLF;
} else if (arg instanceof Buffer) {
} else if (arg instanceof Buffer) { // Buffer must be checked before Uint8Array because Buffer is a subclass;
// the Uint8Array branch below handles plain (non-Buffer) typed arrays only.
toWrite.push(
strings + '$' + arg.length.toString() + CRLF,
arg
);
strings = CRLF;
} else if (arg instanceof Uint8Array) {
const buffer = Buffer.from(arg.buffer, arg.byteOffset, arg.byteLength);
toWrite.push(
strings + '$' + buffer.length.toString() + CRLF,
buffer
);
strings = CRLF;
} else {
throw new TypeError(`"arguments[${i}]" must be of type "string | Buffer", got ${typeof arg} instead.`);
throw new TypeError(`"arguments[${i}]" must be of type "string | Buffer | Uint8Array", got ${typeof arg} instead.`);
}
}

Expand Down
10 changes: 10 additions & 0 deletions packages/client/lib/RESP/types.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ describe('RESP Type Mapping', () => {
Buffer.isBuffer(verbatimRes)
);

const uint8Res = await client
.withTypeMapping({
[RESP_TYPES.BLOB_STRING]: Uint8Array
})
.get('key');
if (uint8Res !== null && typeof uint8Res !== 'string') {
assert.ok(uint8Res instanceof Uint8Array, 'expected Uint8Array instance');
assert.ok(!Buffer.isBuffer(uint8Res), 'expected plain Uint8Array, not Buffer');
}

// Recursive Collections
// ARRAY infers nested mapped types
const arrayRes = await client
Expand Down
18 changes: 10 additions & 8 deletions packages/client/lib/RESP/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ export interface SimpleStringReply<
> extends RespType<
RESP_TYPES['SIMPLE_STRING'],
T,
Buffer,
string | Buffer
Buffer | Uint8Array,
string | Buffer | Uint8Array
> { }

export interface BlobStringReply<
T extends string = string
> extends RespType<
RESP_TYPES['BLOB_STRING'],
T,
Buffer,
string | Buffer
Buffer | Uint8Array,
string | Buffer | Uint8Array
> {
toString(): string
}
Expand All @@ -88,8 +88,8 @@ export interface VerbatimStringReply<
> extends RespType<
RESP_TYPES['VERBATIM_STRING'],
T,
Buffer | VerbatimString,
string | Buffer | VerbatimString
Buffer | Uint8Array | VerbatimString,
string | Buffer | Uint8Array | VerbatimString
> { }

export interface SimpleErrorReply extends RespType<
Expand Down Expand Up @@ -198,6 +198,8 @@ type UnwrapConstructor<T> =
T extends NumberConstructor ? number :
T extends BooleanConstructor ? boolean :
T extends BigIntConstructor ? bigint :
T extends BufferConstructor ? Buffer :
T extends Uint8ArrayConstructor ? Uint8Array :
T;
export type UnwrapReply<REPLY extends RespType<any, any, any, any>> = REPLY['DEFAULT' | 'TYPES'];

Expand All @@ -217,7 +219,7 @@ export type ReplyWithTypeMapping<
REPLY extends Set<infer T> ? Set<ReplyWithTypeMapping<T, TYPE_MAPPING>> :
REPLY extends Map<infer K, infer V> ? Map<MapKey<K, TYPE_MAPPING>, ReplyWithTypeMapping<V, TYPE_MAPPING>> :
// `Date | Buffer | Error` are supersets of `Record`, so they need to be checked first
REPLY extends Date | Buffer | Error ? REPLY :
REPLY extends Date | Buffer | Uint8Array | Error ? REPLY :
REPLY extends Record<PropertyKey, any> ? {
[P in keyof REPLY]: ReplyWithTypeMapping<REPLY[P], TYPE_MAPPING>;
} :
Expand All @@ -228,7 +230,7 @@ export type ReplyWithTypeMapping<

export type TransformReply = (this: void, reply: any, preserve?: any, typeMapping?: TypeMapping) => any; // TODO;

export type RedisArgument = string | Buffer;
export type RedisArgument = string | Buffer | Uint8Array;

export type CommandArguments = Array<RedisArgument> & { preserve?: unknown };

Expand Down
86 changes: 86 additions & 0 deletions packages/client/lib/client/cache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,92 @@ describe("Client Side Cache", () => {
}
})
});
describe('Uint8Array key normalization', () => {
//
// The server always delivers invalidation keys as Buffers (PUSH_TYPE_MAPPING
// maps RESP3 blob-strings to Buffer). When a command was cached using a
// Uint8Array key, #keyToCacheKeySetMap was indexed with
// new Uint8Array([120]).toString() → "120" (TypedArray byte-list)
// whereas the incoming invalidation looked up
// Buffer.from('x').toString() → "x" (UTF-8)
// The two strings never matched, so the cache entry was never evicted.

it('invalidate(Buffer) must evict an entry whose Redis key was stored as an equivalent Uint8Array', () => {
const localCsc = new BasicClientSideCache();
const uint8Key = new Uint8Array(Buffer.from('x')); // same bytes as 'x'
const cacheKey = 'sentinel-GET-x';

// createValueEntry does not use its first argument; null is safe here.
const entry = localCsc.createValueEntry(null as any, 'value');
localCsc.set(cacheKey, entry, [uint8Key]);

assert.ok(localCsc.has(cacheKey), 'entry should be present before invalidation');

// The server pushes invalidation keys decoded as Buffer (PUSH_TYPE_MAPPING).
localCsc.invalidate(Buffer.from('x'));

assert.equal(
localCsc.has(cacheKey),
false,
'entry must be evicted when server-style Buffer invalidation key matches the stored Uint8Array key'
);
});

it('invalidate(string) must also evict an entry whose Redis key was stored as an equivalent Uint8Array', () => {
const localCsc = new BasicClientSideCache();
const uint8Key = new Uint8Array(Buffer.from('hello'));
const cacheKey = 'sentinel-GET-hello';

const entry = localCsc.createValueEntry(null as any, 'world');
localCsc.set(cacheKey, entry, [uint8Key]);

assert.ok(localCsc.has(cacheKey), 'entry should be present before invalidation');

localCsc.invalidate('hello');

assert.equal(
localCsc.has(cacheKey),
false,
'entry must be evicted when string invalidation key matches the stored Uint8Array key'
);
});

const csc = new BasicClientSideCache({ maxEntries: 10 });

testUtils.testWithClient('Uint8Array key is invalidated by server-side Buffer invalidation (end-to-end)', async client => {
csc.clear();

// The server will deliver the invalidation push for key 'x' encoded as a
// Buffer. If key normalisation for Uint8Array is broken, the cached entry
// that was stored with Uint8Array([120]) will never match the Buffer('x')
// lookup and the stale value '1' will be returned instead of '2'.
const uint8Key = new Uint8Array(Buffer.from('x'));

await client.set('x', 1);
assert.equal(await client.get(uint8Key), '1', 'first get: cache miss, server returns 1');
assert.equal(csc.stats().missCount, 1, 'first get should be a cache miss');

// Confirm the entry is now cached.
assert.equal(await client.get(uint8Key), '1', 'second get: cache hit');
assert.equal(csc.stats().hitCount, 1, 'second get should be a cache hit');

// Writing a new value causes the server to push an invalidation message
// for key 'x' decoded as a Buffer.
await client.set('x', 2);

// The cache entry must be evicted; the next read must reach the server.
assert.equal(await client.get(uint8Key), '2', 'third get: must be a cache miss after server invalidation — stale value means the Uint8Array invalidation path is broken');
assert.equal(csc.stats().missCount, 2, 'third get should be a cache miss');
assert.equal(csc.stats().hitCount, 1, 'hit count must not increase after invalidation');
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
});

describe("CacheStats", () => {
describe("CacheStats.of()", () => {
it("should correctly initialize stats and calculate derived values", () => {
Expand Down
27 changes: 21 additions & 6 deletions packages/client/lib/client/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ import RedisClient from '.';
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
import { BasicCommandParser } from './parser';

/**
* Converts a Redis key (string, Buffer, or plain Uint8Array) to a UTF-8 string
* suitable for use as a Map key.
*
* Plain Uint8Array.toString() returns comma-separated byte values (e.g. "120"),
* while Buffer.toString() returns the UTF-8 text (e.g. "x"). Using this helper
* ensures that Uint8Array keys produce the same map key as their Buffer equivalents.
*/
function redisKeyToString(key: RedisArgument): string {
if (typeof key === 'string') return key;
if (Buffer.isBuffer(key)) return key.toString();
return Buffer.from(key).toString();
}

/**
* A snapshot of cache statistics.
*
Expand Down Expand Up @@ -622,7 +636,8 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
return;
}

const keySet = this.#keyToCacheKeySetMap.get(key.toString());
const keyStr = redisKeyToString(key);
const keySet = this.#keyToCacheKeySetMap.get(keyStr);
if (keySet) {
for (const cacheKey of keySet) {
const entry = this.#cacheKeyToEntryMap.get(cacheKey);
Expand All @@ -631,7 +646,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
}
this.#cacheKeyToEntryMap.delete(cacheKey);
}
this.#keyToCacheKeySetMap.delete(key.toString());
this.#keyToCacheKeySetMap.delete(keyStr);
}

this.emit('invalidate', key);
Expand Down Expand Up @@ -702,12 +717,12 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry);

for (const key of keys) {
if (!this.#keyToCacheKeySetMap.has(key.toString())) {
this.#keyToCacheKeySetMap.set(key.toString(), new Set<string>());
const keyStr = redisKeyToString(key);
if (!this.#keyToCacheKeySetMap.has(keyStr)) {
this.#keyToCacheKeySetMap.set(keyStr, new Set<string>());
}

const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString());
cacheKeySet!.add(cacheKey);
this.#keyToCacheKeySetMap.get(keyStr)!.add(cacheKey);
}
}

Expand Down
Loading