Skip to content

Commit d509e71

Browse files
committed
implement queue drain mechanism
1 parent 8e04398 commit d509e71

File tree

3 files changed

+97
-26
lines changed

3 files changed

+97
-26
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
1+
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, makeEmptyAware } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8+
import EventEmitter from 'events';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -54,18 +55,18 @@ export default class RedisCommandsQueue {
5455
readonly #respVersion;
5556
readonly #maxLength;
5657
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
57-
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
58+
readonly #waitingForReply: SinglyLinkedList<CommandWaitingForReply>;
5859
readonly #onShardedChannelMoved;
5960
#chainInExecution: symbol | undefined;
6061
readonly decoder;
6162
readonly #pubSub = new PubSub();
63+
readonly events = new EventEmitter();
6264

6365
get isPubSubActive() {
6466
return this.#pubSub.isActive;
6567
}
6668

6769
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68-
#movingCallback?: (afterMs: number, host: string, port: number) => void;
6970

7071
constructor(
7172
respVersion: RespVersions,
@@ -76,6 +77,9 @@ export default class RedisCommandsQueue {
7677
this.#maxLength = maxLength;
7778
this.#onShardedChannelMoved = onShardedChannelMoved;
7879
this.decoder = this.#initiateDecoder();
80+
const [waitingForReply, emptyEmitter] = makeEmptyAware(new SinglyLinkedList<CommandWaitingForReply>())
81+
this.#waitingForReply = waitingForReply;
82+
emptyEmitter.on('empty', this.events.on.bind(this.events, 'waitingForReplyEmpty'))
7983
}
8084

8185
#onReply(reply: ReplyUnion) {
@@ -137,16 +141,10 @@ export default class RedisCommandsQueue {
137141
break;
138142
}
139143
case 'MOVING': {
140-
if (this.#movingCallback) {
141-
console.log('received moving', push)
142-
const [_, afterMs, url] = push;
143-
let [host, port] = url.toString().split(':');
144-
//['18.200.246.58'] - for some reason the server sends the host this way
145-
if(host.includes('[')) {
146-
host = host.slice(2, -2);
147-
}
148-
this.#movingCallback(afterMs, host, Number(port));
149-
}
144+
console.log('received moving', push)
145+
const [_, afterMs, url] = push;
146+
const [host, port] = url.toString().split(':');
147+
this.events.emit('moving', afterMs, host, Number(port))
150148
break;
151149
}
152150
}
@@ -160,8 +158,8 @@ export default class RedisCommandsQueue {
160158
this.#invalidateCallback = callback;
161159
}
162160

163-
setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) {
164-
this.#movingCallback = callback;
161+
isWaitingForReply(): boolean {
162+
return this.#waitingForReply.length > 0;
165163
}
166164

167165
addCommand<T>(

packages/client/lib/client/index.ts

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -441,11 +441,16 @@ export default class RedisClient<
441441
#watchEpoch?: number;
442442
#clientSideCache?: ClientSideCacheProvider;
443443
#credentialsSubscription: Disposable | null = null;
444+
// Flag used to pause writing to the socket during maintenance windows.
445+
// When true, prevents new commands from being written while waiting for:
446+
// 1. New socket to be ready after maintenance redirect
447+
// 2. In-flight commands on the old socket to complete
448+
#paused = false;
449+
444450
get clientSideCache() {
445451
return this._self.#clientSideCache;
446452
}
447453

448-
449454
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
450455
return this._self.#options;
451456
}
@@ -494,24 +499,60 @@ export default class RedisClient<
494499
this.#options = this.#initiateOptions(options);
495500
this.#queue = this.#initiateQueue();
496501
this.#socket = this.#initiateSocket(this.#options);
497-
498-
this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => {
502+
// Queue
503+
// toWrite [ C D E ]
504+
// waitingForReply [ A B ]
505+
//
506+
// time: ---1-2---3-4-5-6---------------------------
507+
//
508+
// 1. [EVENT] MOVING PN received
509+
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
510+
// 3. [EVENT] New sock connected
511+
// 4. [EVENT] In-flight commands completed
512+
// 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
513+
// 6. [ACTION] Destroy old socket
514+
this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
515+
// 1
499516
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
517+
518+
// 2
519+
this.#paused = true;
520+
500521
const oldSocket = this.#socket;
501-
const newSocket = this.#initiateSocket({
522+
this.#socket = this.#initiateSocket({
502523
...this.#options,
503524
socket: {
504525
...this.#options?.socket,
505526
host,
506527
port
507528
}
508529
});
509-
newSocket.on('ready', () => {
510-
console.log(`Connected to ${host}:${port}, destroying old socket`);
511-
oldSocket.destroy()
512-
this.#socket = newSocket
530+
531+
// 3
532+
this.#socket.once('ready', () => {
533+
//TODO handshake...???
534+
console.log(`Connected to ${host}:${port}`);
535+
536+
// 4
537+
if(!this.#queue.isWaitingForReply()) {
538+
// 5 and 6
539+
oldSocket.destroy();
540+
this.#paused = false;
541+
}
542+
});
543+
544+
// 4
545+
this.#queue.events.once('waitingForReplyEmpty', () => {
546+
547+
// 3
548+
if(this.#socket.isReady) {
549+
// 5 and 6
550+
oldSocket.destroy();
551+
this.#paused = false;
552+
}
513553
});
514-
await newSocket.connect()
554+
555+
await this.#socket.connect()
515556
});
516557

517558
if (options?.clientSideCache) {
@@ -1139,6 +1180,9 @@ export default class RedisClient<
11391180
}
11401181

11411182
#write() {
1183+
if(this.#paused) {
1184+
return
1185+
}
11421186
this.#socket.write(this.#queue.commandsToWrite());
11431187
}
11441188

packages/client/lib/client/linked-list.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import EventEmitter from "events";
2+
13
export interface DoublyLinkedNode<T> {
24
value: T;
35
previous: DoublyLinkedNode<T> | undefined;
@@ -32,7 +34,7 @@ export class DoublyLinkedList<T> {
3234
next: undefined,
3335
value
3436
};
35-
}
37+
}
3638

3739
return this.#tail = this.#tail.next = {
3840
previous: this.#tail,
@@ -93,7 +95,7 @@ export class DoublyLinkedList<T> {
9395
node.previous!.next = node.next;
9496
node.previous = undefined;
9597
}
96-
98+
9799
node.next = undefined;
98100
}
99101

@@ -111,6 +113,33 @@ export class DoublyLinkedList<T> {
111113
}
112114
}
113115

116+
// This function takes an object that has a `length` property
117+
// and returns both a proxy and an event emitter.
118+
// The proxy will act the same as the original object.
119+
// And the event emitter will emit an `empty` event whenever the `length` becomes zero.
120+
export const makeEmptyAware = <T extends { length: number }>(obj: T): [T, EventEmitter] => {
121+
const eventEmitter = new EventEmitter();
122+
const proxy = new Proxy(obj, {
123+
get(target, prop, receiver) {
124+
const original = Reflect.get(target, prop, receiver);
125+
if (typeof original === 'function') {
126+
return function (...args: any[]) {
127+
const oldLength = target.length;
128+
const ret = original.apply(target, args);
129+
const newLength = target.length;
130+
if(oldLength !== newLength && newLength === 0) {
131+
eventEmitter.emit('empty')
132+
}
133+
return ret
134+
};
135+
} else {
136+
return original;
137+
};
138+
},
139+
});
140+
return [ proxy, eventEmitter ];
141+
}
142+
114143
export interface SinglyLinkedNode<T> {
115144
value: T;
116145
next: SinglyLinkedNode<T> | undefined;

0 commit comments

Comments
 (0)