Skip to content

Commit b0babba

Browse files
committed
implement queue drain mechanism
1 parent 8557f04 commit b0babba

File tree

3 files changed

+97
-26
lines changed

3 files changed

+97
-26
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
1+
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, makeEmptyAware } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8+
import EventEmitter from 'events';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -54,18 +55,18 @@ export default class RedisCommandsQueue {
5455
readonly #respVersion;
5556
readonly #maxLength;
5657
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
57-
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
58+
readonly #waitingForReply: SinglyLinkedList<CommandWaitingForReply>;
5859
readonly #onShardedChannelMoved;
5960
#chainInExecution: symbol | undefined;
6061
readonly decoder;
6162
readonly #pubSub = new PubSub();
63+
readonly events = new EventEmitter();
6264

6365
get isPubSubActive() {
6466
return this.#pubSub.isActive;
6567
}
6668

6769
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68-
#movingCallback?: (afterMs: number, host: string, port: number) => void;
6970

7071
constructor(
7172
respVersion: RespVersions,
@@ -76,6 +77,9 @@ export default class RedisCommandsQueue {
7677
this.#maxLength = maxLength;
7778
this.#onShardedChannelMoved = onShardedChannelMoved;
7879
this.decoder = this.#initiateDecoder();
80+
const [waitingForReply, emptyEmitter] = makeEmptyAware(new SinglyLinkedList<CommandWaitingForReply>())
81+
this.#waitingForReply = waitingForReply;
82+
emptyEmitter.on('empty', this.events.on.bind(this.events, 'waitingForReplyEmpty'))
7983
}
8084

8185
#onReply(reply: ReplyUnion) {
@@ -137,16 +141,10 @@ export default class RedisCommandsQueue {
137141
break;
138142
}
139143
case 'MOVING': {
140-
if (this.#movingCallback) {
141-
console.log('received moving', push)
142-
const [_, afterMs, url] = push;
143-
let [host, port] = url.toString().split(':');
144-
//['18.200.246.58'] - for some reason the server sends the host this way
145-
if(host.includes('[')) {
146-
host = host.slice(2, -2);
147-
}
148-
this.#movingCallback(afterMs, host, Number(port));
149-
}
144+
console.log('received moving', push)
145+
const [_, afterMs, url] = push;
146+
const [host, port] = url.toString().split(':');
147+
this.events.emit('moving', afterMs, host, Number(port))
150148
break;
151149
}
152150
}
@@ -160,8 +158,8 @@ export default class RedisCommandsQueue {
160158
this.#invalidateCallback = callback;
161159
}
162160

163-
setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) {
164-
this.#movingCallback = callback;
161+
isWaitingForReply(): boolean {
162+
return this.#waitingForReply.length > 0;
165163
}
166164

167165
addCommand<T>(

packages/client/lib/client/index.ts

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,16 @@ export default class RedisClient<
417417
#watchEpoch?: number;
418418
#clientSideCache?: ClientSideCacheProvider;
419419
#credentialsSubscription: Disposable | null = null;
420+
// Flag used to pause writing to the socket during maintenance windows.
421+
// When true, prevents new commands from being written while waiting for:
422+
// 1. New socket to be ready after maintenance redirect
423+
// 2. In-flight commands on the old socket to complete
424+
#paused = false;
425+
420426
get clientSideCache() {
421427
return this._self.#clientSideCache;
422428
}
423429

424-
425430
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
426431
return this._self.#options;
427432
}
@@ -470,24 +475,60 @@ export default class RedisClient<
470475
this.#options = this.#initiateOptions(options);
471476
this.#queue = this.#initiateQueue();
472477
this.#socket = this.#initiateSocket(this.#options);
473-
474-
this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => {
478+
// Queue
479+
// toWrite [ C D E ]
480+
// waitingForReply [ A B ]
481+
//
482+
// time: ---1-2---3-4-5-6---------------------------
483+
//
484+
// 1. [EVENT] MOVING PN received
485+
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
486+
// 3. [EVENT] New sock connected
487+
// 4. [EVENT] In-flight commands completed
488+
// 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
489+
// 6. [ACTION] Destroy old socket
490+
this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
491+
// 1
475492
console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
493+
494+
// 2
495+
this.#paused = true;
496+
476497
const oldSocket = this.#socket;
477-
const newSocket = this.#initiateSocket({
498+
this.#socket = this.#initiateSocket({
478499
...this.#options,
479500
socket: {
480501
...this.#options?.socket,
481502
host,
482503
port
483504
}
484505
});
485-
newSocket.on('ready', () => {
486-
console.log(`Connected to ${host}:${port}, destroying old socket`);
487-
oldSocket.destroy()
488-
this.#socket = newSocket
506+
507+
// 3
508+
this.#socket.once('ready', () => {
509+
//TODO handshake...???
510+
console.log(`Connected to ${host}:${port}`);
511+
512+
// 4
513+
if(!this.#queue.isWaitingForReply()) {
514+
// 5 and 6
515+
oldSocket.destroy();
516+
this.#paused = false;
517+
}
518+
});
519+
520+
// 4
521+
this.#queue.events.once('waitingForReplyEmpty', () => {
522+
523+
// 3
524+
if(this.#socket.isReady) {
525+
// 5 and 6
526+
oldSocket.destroy();
527+
this.#paused = false;
528+
}
489529
});
490-
await newSocket.connect()
530+
531+
await this.#socket.connect()
491532
});
492533

493534
if (options?.clientSideCache) {
@@ -1120,6 +1161,9 @@ export default class RedisClient<
11201161
}
11211162

11221163
#write() {
1164+
if(this.#paused) {
1165+
return
1166+
}
11231167
this.#socket.write(this.#queue.commandsToWrite());
11241168
}
11251169

packages/client/lib/client/linked-list.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import EventEmitter from "events";
2+
13
export interface DoublyLinkedNode<T> {
24
value: T;
35
previous: DoublyLinkedNode<T> | undefined;
@@ -32,7 +34,7 @@ export class DoublyLinkedList<T> {
3234
next: undefined,
3335
value
3436
};
35-
}
37+
}
3638

3739
return this.#tail = this.#tail.next = {
3840
previous: this.#tail,
@@ -93,7 +95,7 @@ export class DoublyLinkedList<T> {
9395
node.previous!.next = node.next;
9496
node.previous = undefined;
9597
}
96-
98+
9799
node.next = undefined;
98100
}
99101

@@ -111,6 +113,33 @@ export class DoublyLinkedList<T> {
111113
}
112114
}
113115

116+
// This function takes an object that has a `length` property
117+
// and returns both a proxy and an event emitter.
118+
// The proxy will act the same as the original object.
119+
// And the event emitter will emit an `empty` event whenever the `length` becomes zero.
120+
export const makeEmptyAware = <T extends { length: number }>(obj: T): [T, EventEmitter] => {
121+
const eventEmitter = new EventEmitter();
122+
const proxy = new Proxy(obj, {
123+
get(target, prop, receiver) {
124+
const original = Reflect.get(target, prop, receiver);
125+
if (typeof original === 'function') {
126+
return function (...args: any[]) {
127+
const oldLength = target.length;
128+
const ret = original.apply(target, args);
129+
const newLength = target.length;
130+
if(oldLength !== newLength && newLength === 0) {
131+
eventEmitter.emit('empty')
132+
}
133+
return ret
134+
};
135+
} else {
136+
return original;
137+
};
138+
},
139+
});
140+
return [ proxy, eventEmitter ];
141+
}
142+
114143
export interface SinglyLinkedNode<T> {
115144
value: T;
116145
next: SinglyLinkedNode<T> | undefined;

0 commit comments

Comments
 (0)