Skip to content

Commit c753a18

Browse files
sjpotterbobymicroby
authored andcommitted
CSC POC ontop of Parser
1 parent 8b4ed00 commit c753a18

File tree

16 files changed

+270
-53
lines changed

16 files changed

+270
-53
lines changed

packages/client/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel';
2020
export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types';
2121
export const createSentinel = RedisSentinel.create;
2222

23+
import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
24+
export { BasicClientSideCache, BasicPooledClientSideCache };
25+
2326
// export { GeoReplyWith } from './lib/commands/generic-transformers';
2427

2528
// export { SetOptions } from './lib/commands/SET';

packages/client/lib/client/commands-queue.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
5656
return this.#pubSub.isActive;
5757
}
5858

59+
#invalidateCallback?: (key: RedisArgument | null) => unknown;
60+
5961
constructor(
6062
respVersion: RespVersions,
6163
maxLength: number | null | undefined,
@@ -109,13 +111,30 @@ export default class RedisCommandsQueue {
109111
onErrorReply: err => this.#onErrorReply(err),
110112
onPush: push => {
111113
if (!this.#onPush(push)) {
112-
114+
switch (push[0].toString()) {
115+
case "invalidate": {
116+
if (this.#invalidateCallback) {
117+
if (push[1] !== null) {
118+
for (const key of push[1]) {
119+
this.#invalidateCallback(key);
120+
}
121+
} else {
122+
this.#invalidateCallback(null);
123+
}
124+
}
125+
break;
126+
}
127+
}
113128
}
114129
},
115130
getTypeMapping: () => this.#getTypeMapping()
116131
});
117132
}
118133

134+
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
135+
this.#invalidateCallback = callback;
136+
}
137+
119138
addCommand<T>(
120139
args: ReadonlyArray<RedisArgument>,
121140
options?: CommandOptions

packages/client/lib/client/index.ts

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
1616
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
1717
import { RedisPoolOptions, RedisClientPool } from './pool';
1818
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
19+
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';
1920
import { BasicCommandParser, CommandParser } from './parser';
2021

2122
export interface RedisClientOptions<
@@ -80,6 +81,10 @@ export interface RedisClientOptions<
8081
* TODO
8182
*/
8283
commandOptions?: CommandOptions<TYPE_MAPPING>;
84+
/**
85+
* TODO
86+
*/
87+
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
8388
}
8489

8590
type WithCommands<
@@ -300,9 +305,8 @@ export default class RedisClient<
300305
private _self = this;
301306
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
302307
#dirtyWatch?: string;
303-
#epoch: number;
304308
#watchEpoch?: number;
305-
309+
#clientSideCache?: ClientSideCacheProvider;
306310
#credentialsSubscription: Disposable | null = null;
307311

308312
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
@@ -321,6 +325,11 @@ export default class RedisClient<
321325
return this._self.#queue.isPubSubActive;
322326
}
323327

328+
get socketEpoch() {
329+
return this._self.#socket.socketEpoch;
330+
}
331+
332+
324333
get isWatching() {
325334
return this._self.#watchEpoch !== undefined;
326335
}
@@ -331,10 +340,20 @@ export default class RedisClient<
331340

332341
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
333342
super();
343+
334344
this.#options = this.#initiateOptions(options);
335345
this.#queue = this.#initiateQueue();
336346
this.#socket = this.#initiateSocket();
337-
this.#epoch = 0;
347+
348+
if (options?.clientSideCache) {
349+
if (options.clientSideCache instanceof ClientSideCacheProvider) {
350+
this.#clientSideCache = options.clientSideCache;
351+
} else {
352+
const cscConfig = options.clientSideCache;
353+
this.#clientSideCache = new BasicClientSideCache(cscConfig);
354+
}
355+
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
356+
}
338357
}
339358

340359
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
@@ -495,6 +514,13 @@ export default class RedisClient<
495514
);
496515
}
497516

517+
if (this.#clientSideCache) {
518+
const tracking = this.#clientSideCache.trackingOn();
519+
if (tracking) {
520+
commands.push(tracking);
521+
}
522+
}
523+
498524
return commands;
499525
}
500526

@@ -548,6 +574,7 @@ export default class RedisClient<
548574
})
549575
.on('error', err => {
550576
this.emit('error', err);
577+
this.#clientSideCache?.onError();
551578
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
552579
this.#queue.flushWaitingForReply(err);
553580
} else {
@@ -556,7 +583,6 @@ export default class RedisClient<
556583
})
557584
.on('connect', () => this.emit('connect'))
558585
.on('ready', () => {
559-
this.#epoch++;
560586
this.emit('ready');
561587
this.#setPingTimer();
562588
this.#maybeScheduleWrite();
@@ -684,13 +710,21 @@ export default class RedisClient<
684710
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
685711
transformReply: TransformReply | undefined,
686712
) {
687-
const reply = await this.sendCommand(parser.redisArgs, commandOptions);
713+
const csc = this._self.#clientSideCache;
714+
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
688715

689-
if (transformReply) {
690-
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
691-
}
716+
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
692717

693-
return reply;
718+
if (csc && command.CACHEABLE && defaultTypeMapping) {
719+
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
720+
} else {
721+
const reply = await fn();
722+
723+
if (transformReply) {
724+
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
725+
}
726+
return reply;
727+
}
694728
}
695729

696730
/**
@@ -855,7 +889,7 @@ export default class RedisClient<
855889
const reply = await this._self.sendCommand(
856890
pushVariadicArguments(['WATCH'], key)
857891
);
858-
this._self.#watchEpoch ??= this._self.#epoch;
892+
this._self.#watchEpoch ??= this._self.socketEpoch;
859893
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
860894
}
861895

@@ -922,7 +956,7 @@ export default class RedisClient<
922956
}
923957

924958
const chainId = Symbol('Pipeline Chain'),
925-
promise = Promise.all(
959+
promise = Promise.allSettled(
926960
commands.map(({ args }) => this._self.#queue.addCommand(args, {
927961
chainId,
928962
typeMapping: this._commandOptions?.typeMapping
@@ -958,7 +992,7 @@ export default class RedisClient<
958992
throw new WatchError(dirtyWatch);
959993
}
960994

961-
if (watchEpoch && watchEpoch !== this._self.#epoch) {
995+
if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
962996
throw new WatchError('Client reconnected after WATCH');
963997
}
964998

@@ -1182,6 +1216,7 @@ export default class RedisClient<
11821216
return new Promise<void>(resolve => {
11831217
clearTimeout(this._self.#pingTimer);
11841218
this._self.#socket.close();
1219+
this._self.#clientSideCache?.onClose();
11851220

11861221
if (this._self.#queue.isEmpty()) {
11871222
this._self.#socket.destroySocket();
@@ -1208,6 +1243,7 @@ export default class RedisClient<
12081243
clearTimeout(this._self.#pingTimer);
12091244
this._self.#queue.flushAll(new DisconnectsClientError());
12101245
this._self.#socket.destroy();
1246+
this._self.#clientSideCache?.onClose();
12111247
this._self.#credentialsSubscription?.dispose();
12121248
this._self.#credentialsSubscription = null;
12131249
}

packages/client/lib/client/linked-list.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
114114
export interface SinglyLinkedNode<T> {
115115
value: T;
116116
next: SinglyLinkedNode<T> | undefined;
117+
removed: boolean;
117118
}
118119

119120
export class SinglyLinkedList<T> {
@@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {
140141

141142
const node = {
142143
value,
143-
next: undefined
144+
next: undefined,
145+
removed: false
144146
};
145147

146148
if (this.#head === undefined) {
@@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
151153
}
152154

153155
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
156+
if (node.removed) {
157+
throw new Error("node already removed");
158+
}
154159
--this.#length;
155160

156161
if (this.#head === node) {
@@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
165170
} else {
166171
parent!.next = node.next;
167172
}
173+
174+
node.removed = true;
168175
}
169176

170177
shift() {
@@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
177184
this.#head = node.next;
178185
}
179186

187+
node.removed = true;
180188
return node.value;
181189
}
182190

packages/client/lib/client/parser.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser {
3333
return this.#keys[0];
3434
}
3535

36+
get cacheKey() {
37+
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38+
return cacheKey + '_' + this.#redisArgs.join('_');
39+
}
40+
3641
push(...arg: Array<RedisArgument>) {
3742
this.#redisArgs.push(...arg);
3843
};

0 commit comments

Comments
 (0)