Skip to content

Commit ac67640

Browse files
committed
refactor(commands-queue): Improve push notification handling
- Replaced `setInvalidateCallback` with a more flexible `addPushHandler` method, allowing multiple handlers for push notifications. - Introduced the `PushHandler` type to standardize push notification processing. - Refactored `RedisCommandsQueue` to use a `#pushHandlers` array, enabling dynamic and modular handling of push notifications. - Updated `RedisClient` to leverage the new handler mechanism for `invalidate` push notifications, simplifying and decoupling logic.
1 parent 91afa09 commit ac67640

File tree

2 files changed

+28
-19
lines changed

2 files changed

+28
-19
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ const RESP2_PUSH_TYPE_MAPPING = {
5050
[RESP_TYPES.SIMPLE_STRING]: Buffer
5151
};
5252

53+
// Try to handle a push notification. Return whether you
54+
// successfully consumed the notification or not. This is
55+
// important in order for the queue to be able to pass the
56+
// notification to another handler if the current one did not
57+
// succeed.
58+
type PushHandler = (pushItems: Array<any>) => boolean;
59+
5360
export default class RedisCommandsQueue {
5461
readonly #respVersion;
5562
readonly #maxLength;
@@ -60,6 +67,7 @@ export default class RedisCommandsQueue {
6067
readonly decoder;
6168
readonly #pubSub = new PubSub();
6269

70+
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
6371
get isPubSubActive() {
6472
return this.#pubSub.isActive;
6573
}
@@ -107,6 +115,7 @@ export default class RedisCommandsQueue {
107115
}
108116
return true;
109117
}
118+
return false
110119
}
111120

112121
#getTypeMapping() {
@@ -119,30 +128,18 @@ export default class RedisCommandsQueue {
119128
onErrorReply: err => this.#onErrorReply(err),
120129
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
121130
onPush: push => {
122-
if (!this.#onPush(push)) {
123-
// currently only supporting "invalidate" over RESP3 push messages
124-
switch (push[0].toString()) {
125-
case "invalidate": {
126-
if (this.#invalidateCallback) {
127-
if (push[1] !== null) {
128-
for (const key of push[1]) {
129-
this.#invalidateCallback(key);
130-
}
131-
} else {
132-
this.#invalidateCallback(null);
133-
}
134-
}
135-
break;
136-
}
137-
}
131+
for(const pushHandler of this.#pushHandlers) {
132+
if(pushHandler(push)) return
138133
}
139134
},
140135
getTypeMapping: () => this.#getTypeMapping()
141136
});
142137
}
143138

144-
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
145-
this.#invalidateCallback = callback;
139+
addPushHandler(handler: PushHandler): void {
140+
this.#pushHandlers.push(handler);
141+
}
142+
146143
}
147144

148145
addCommand<T>(

packages/client/lib/client/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,19 @@ export default class RedisClient<
464464
const cscConfig = options.clientSideCache;
465465
this.#clientSideCache = new BasicClientSideCache(cscConfig);
466466
}
467-
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
467+
this.#queue.addPushHandler((push: Array<any>): boolean => {
468+
if (push[0].toString() !== 'invalidate') return false;
469+
470+
if (push[1] !== null) {
471+
for (const key of push[1]) {
472+
this.#clientSideCache?.invalidate(key)
473+
}
474+
} else {
475+
this.#clientSideCache?.invalidate(null)
476+
}
477+
478+
return true
479+
});
468480
}
469481
}
470482

0 commit comments

Comments
 (0)