Skip to content

Commit f20d903

Browse files
committed
refactor(commands-queue): Improve push notification handling
- Replaced `setInvalidateCallback` with a more flexible `addPushHandler` method, allowing multiple handlers for push notifications. - Introduced the `PushHandler` type to standardize push notification processing. - Refactored `RedisCommandsQueue` to use a `#pushHandlers` array, enabling dynamic and modular handling of push notifications. - Updated `RedisClient` to leverage the new handler mechanism for `invalidate` push notifications, simplifying and decoupling logic.
1 parent 45c79f7 commit f20d903

File tree

2 files changed

+27
-22
lines changed

2 files changed

+27
-22
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ const RESP2_PUSH_TYPE_MAPPING = {
5050
[RESP_TYPES.SIMPLE_STRING]: Buffer
5151
};
5252

53+
// Try to handle a push notification. Return whether you
54+
// successfully consumed the notification or not. This is
55+
// important in order for the queue to be able to pass the
56+
// notification to another handler if the current one did not
57+
// succeed.
58+
type PushHandler = (pushItems: Array<any>) => boolean;
59+
5360
export default class RedisCommandsQueue {
5461
readonly #respVersion;
5562
readonly #maxLength;
@@ -60,12 +67,11 @@ export default class RedisCommandsQueue {
6067
readonly decoder;
6168
readonly #pubSub = new PubSub();
6269

70+
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
6371
get isPubSubActive() {
6472
return this.#pubSub.isActive;
6573
}
6674

67-
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68-
6975
constructor(
7076
respVersion: RespVersions,
7177
maxLength: number | null | undefined,
@@ -107,6 +113,7 @@ export default class RedisCommandsQueue {
107113
}
108114
return true;
109115
}
116+
return false
110117
}
111118

112119
#getTypeMapping() {
@@ -119,30 +126,16 @@ export default class RedisCommandsQueue {
119126
onErrorReply: err => this.#onErrorReply(err),
120127
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
121128
onPush: push => {
122-
if (!this.#onPush(push)) {
123-
// currently only supporting "invalidate" over RESP3 push messages
124-
switch (push[0].toString()) {
125-
case "invalidate": {
126-
if (this.#invalidateCallback) {
127-
if (push[1] !== null) {
128-
for (const key of push[1]) {
129-
this.#invalidateCallback(key);
130-
}
131-
} else {
132-
this.#invalidateCallback(null);
133-
}
134-
}
135-
break;
136-
}
137-
}
129+
for(const pushHandler of this.#pushHandlers) {
130+
if(pushHandler(push)) return
138131
}
139132
},
140133
getTypeMapping: () => this.#getTypeMapping()
141134
});
142135
}
143136

144-
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
145-
this.#invalidateCallback = callback;
137+
addPushHandler(handler: PushHandler): void {
138+
this.#pushHandlers.push(handler);
146139
}
147140

148141
addCommand<T>(
@@ -432,7 +425,7 @@ export default class RedisCommandsQueue {
432425
}
433426

434427
static #removeTimeoutListener(command: CommandToWrite) {
435-
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
428+
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
436429
}
437430

438431
static #flushToWrite(toBeSent: CommandToWrite, err: Error) {

packages/client/lib/client/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,19 @@ export default class RedisClient<
464464
const cscConfig = options.clientSideCache;
465465
this.#clientSideCache = new BasicClientSideCache(cscConfig);
466466
}
467-
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
467+
this.#queue.addPushHandler((push: Array<any>): boolean => {
468+
if (push[0].toString() !== 'invalidate') return false;
469+
470+
if (push[1] !== null) {
471+
for (const key of push[1]) {
472+
this.#clientSideCache?.invalidate(key)
473+
}
474+
} else {
475+
this.#clientSideCache?.invalidate(null)
476+
}
477+
478+
return true
479+
});
468480
}
469481
}
470482

0 commit comments

Comments
 (0)