Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 17 additions & 33 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/bloom/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@redis/bloom",
"version": "5.8.2",
"version": "5.9.0-beta.0",
"license": "MIT",
"main": "./dist/lib/index.js",
"types": "./dist/lib/index.d.ts",
Expand All @@ -13,7 +13,7 @@
"release": "release-it"
},
"peerDependencies": {
"@redis/client": "^5.8.2"
"@redis/client": "^5.9.0-beta.0"
},
"devDependencies": {
"@redis/test-utils": "*"
Expand Down
111 changes: 84 additions & 27 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply, TimeoutError } from '../errors';
import { AbortError, ErrorReply, CommandTimeoutDuringMaintenanceError, TimeoutError } from '../errors';
import { MonitorCallback } from '.';
import { dbgMaintenance } from './enterprise-maintenance-manager';

export interface CommandOptions<T = TypeMapping> {
chainId?: symbol;
Expand All @@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
timeout: {
signal: AbortSignal;
listener: () => unknown;
originalTimeout: number | undefined;
} | undefined;
}

Expand All @@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = {
[RESP_TYPES.SIMPLE_STRING]: Buffer
};

// Try to handle a push notification. Return whether you
// successfully consumed the notification or not. This is
// important in order for the queue to be able to pass the
// notification to another handler if the current one did not
// succeed.
type PushHandler = (pushItems: Array<any>) => boolean;

export default class RedisCommandsQueue {
readonly #respVersion;
readonly #maxLength;
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
readonly #waitingForReply = new EmptyAwareSinglyLinkedList<CommandWaitingForReply>();
readonly #onShardedChannelMoved;
#chainInExecution: symbol | undefined;
readonly decoder;
readonly #pubSub = new PubSub();

#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];

#maintenanceCommandTimeout: number | undefined

setMaintenanceCommandTimeout(ms: number | undefined) {
// Prevent possible api misuse
if (this.#maintenanceCommandTimeout === ms) {
dbgMaintenance(`Queue already set maintenanceCommandTimeout to ${ms}, skipping`);
return;
};

dbgMaintenance(`Setting maintenance command timeout to ${ms}`);
this.#maintenanceCommandTimeout = ms;

if(this.#maintenanceCommandTimeout === undefined) {
dbgMaintenance(`Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts`);
return;
}

let counter = 0;
const total = this.#toWrite.length;

// Overwrite timeouts of all eligible toWrite commands
for(const node of this.#toWrite.nodes()) {
const command = node.value;

// Remove timeout listener if it exists
RedisCommandsQueue.#removeTimeoutListener(command)

counter++;
const newTimeout = this.#maintenanceCommandTimeout;

// Overwrite the command's timeout
const signal = AbortSignal.timeout(newTimeout);
command.timeout = {
signal,
listener: () => {
this.#toWrite.remove(node);
command.reject(new CommandTimeoutDuringMaintenanceError(newTimeout));
},
originalTimeout: command.timeout?.originalTimeout
};
signal.addEventListener('abort', command.timeout.listener, { once: true });
};
dbgMaintenance(`Total of ${counter} of ${total} timeouts reset to ${ms}`);
}

get isPubSubActive() {
return this.#pubSub.isActive;
}

#invalidateCallback?: (key: RedisArgument | null) => unknown;

constructor(
respVersion: RespVersions,
maxLength: number | null | undefined,
Expand Down Expand Up @@ -107,6 +161,7 @@ export default class RedisCommandsQueue {
}
return true;
}
return false
}

#getTypeMapping() {
Expand All @@ -119,30 +174,27 @@ export default class RedisCommandsQueue {
onErrorReply: err => this.#onErrorReply(err),
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
onPush: push => {
if (!this.#onPush(push)) {
// currently only supporting "invalidate" over RESP3 push messages
switch (push[0].toString()) {
case "invalidate": {
if (this.#invalidateCallback) {
if (push[1] !== null) {
for (const key of push[1]) {
this.#invalidateCallback(key);
}
} else {
this.#invalidateCallback(null);
}
}
break;
}
}
for(const pushHandler of this.#pushHandlers) {
if(pushHandler(push)) return
}
},
getTypeMapping: () => this.#getTypeMapping()
});
}

setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
this.#invalidateCallback = callback;
addPushHandler(handler: PushHandler): void {
this.#pushHandlers.push(handler);
}

async waitForInflightCommandsToComplete(): Promise<void> {
// In-flight commands already completed
if(this.#waitingForReply.length === 0) {
return
};
// Otherwise wait for in-flight commands to fire `empty` event
return new Promise(resolve => {
this.#waitingForReply.events.on('empty', resolve)
});
}

addCommand<T>(
Expand All @@ -168,15 +220,20 @@ export default class RedisCommandsQueue {
typeMapping: options?.typeMapping
};

const timeout = options?.timeout;
// If #maintenanceCommandTimeout was explicitly set, we should
// use it instead of the timeout provided by the command
const timeout = this.#maintenanceCommandTimeout ?? options?.timeout;
const wasInMaintenance = this.#maintenanceCommandTimeout !== undefined;
if (timeout) {

const signal = AbortSignal.timeout(timeout);
value.timeout = {
signal,
listener: () => {
this.#toWrite.remove(node);
value.reject(new TimeoutError());
}
value.reject(wasInMaintenance ? new CommandTimeoutDuringMaintenanceError(timeout) : new TimeoutError());
},
originalTimeout: options?.timeout
};
signal.addEventListener('abort', value.timeout.listener, { once: true });
}
Expand Down Expand Up @@ -432,7 +489,7 @@ export default class RedisCommandsQueue {
}

static #removeTimeoutListener(command: CommandToWrite) {
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
}

static #flushToWrite(toBeSent: CommandToWrite, err: Error) {
Expand Down
Loading