Skip to content

Commit ece40b5

Browse files
committed
extract socket orchestration in separate class
1 parent 4aab08f commit ece40b5

File tree

3 files changed

+67
-77
lines changed

3 files changed

+67
-77
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,67 @@ export default class EnterpriseMaintenanceManager {
1818
this.commandsQueue.events.on("moving", this.#onMoving);
1919
}
2020

21-
#onMoving = async (_afterMs: number, host: string, port: number) => {
21+
// Queue
22+
// toWrite [ C D E ]
23+
// waitingForReply [ A B ]
24+
//
25+
// time: ---1-2---3-4-5-6---------------------------
26+
//
27+
// 1. [EVENT] MOVING PN received
28+
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
29+
// 3. [EVENT] New socket connected
30+
// 4. [EVENT] In-flight commands completed
31+
// 5. [ACTION] Destroy old socket
32+
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
33+
#onMoving = async (
34+
_afterMs: number,
35+
host: string,
36+
port: number,
37+
): Promise<void> => {
38+
// 1 [EVENT] MOVING PN received
39+
console.log('[EnterpriseMaintenanceManager] Pausing client');
40+
// 2 [ACTION] Pause writing
41+
this.client.pause();
2242

23-
this.client.pause()
24-
25-
const socket = new RedisSocket({
43+
console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`);
44+
const newSocket = new RedisSocket({
2645
...this.options.socket,
2746
host,
28-
port
47+
port,
2948
});
30-
await socket.connect();
49+
console.log('[EnterpriseMaintenanceManager] Connecting to new socket');
50+
await newSocket.connect();
51+
// 3 [EVENT] New socket connected
52+
console.log('[EnterpriseMaintenanceManager] New socket connected');
3153

32-
//wait until waitingForReply is empty
33-
await new Promise<void>(resolve => {
34-
if(!this.commandsQueue.isWaitingForReply()) {
35-
resolve()
54+
// Wait until waitingForReply is empty
55+
console.log('[EnterpriseMaintenanceManager] Waiting for reply queue to empty');
56+
await new Promise<void>((resolve) => {
57+
if (!this.commandsQueue.isWaitingForReply()) {
58+
console.log('[EnterpriseMaintenanceManager] Reply queue already empty');
59+
resolve();
3660
} else {
37-
this.commandsQueue.events.once('waitingForReplyEmpty', resolve)
61+
console.log('[EnterpriseMaintenanceManager] Reply queue not empty, waiting for empty event');
62+
this.commandsQueue.events.once("waitingForReplyEmpty", () => {
63+
console.log('[EnterpriseMaintenanceManager] Reply queue now empty');
64+
resolve();
65+
});
3866
}
39-
})
67+
});
68+
// 4 [EVENT] Reply queue now empty
4069

41-
const oldSocket = this.client.socket
70+
// 5 [ACTION] Destroy old socket
71+
// Switch to the new socket and clean up the old one
72+
console.log('[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one');
73+
const oldSocket = this.client.socket;
74+
this.client.socket = newSocket;
4275
oldSocket.removeAllListeners();
4376
oldSocket.destroy();
77+
console.log('[EnterpriseMaintenanceManager] Old socket destroyed');
4478

45-
this.client.socket = socket;
46-
47-
this.client.resume()
79+
// 6 [ACTION] Resume writing
80+
console.log('[EnterpriseMaintenanceManager] Resuming client');
81+
this.client.resume();
82+
console.log('[EnterpriseMaintenanceManager] Socket migration complete');
4883
};
49-
5084
}

packages/client/lib/client/index.ts

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
2020
import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
23+
import EnterpriseMaintenanceManager from './enterprise-maintenance-manager';
2324

2425
export interface RedisClientOptions<
2526
M extends RedisModules = RedisModules,
@@ -469,6 +470,15 @@ export default class RedisClient<
469470
this.#initiateSocket();
470471
}
471472

473+
pause() {
474+
this._self.#paused = true;
475+
}
476+
477+
resume() {
478+
this._self.#paused = false;
479+
this._self.#maybeScheduleWrite();
480+
}
481+
472482
/**
473483
* Marks the client's WATCH command as invalidated due to a topology change.
474484
* This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -484,66 +494,10 @@ export default class RedisClient<
484494
this.#options = this.#initiateOptions(options);
485495
this.#queue = this.#initiateQueue();
486496
this.#socket = this.#createSocket(this.#options);
487-
// Queue
488-
// toWrite [ C D E ]
489-
// waitingForReply [ A B ]
490-
//
491-
// time: ---1-2---3-4-5-6---------------------------
492-
//
493-
// 1. [EVENT] MOVING PN received
494-
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
495-
// 3. [EVENT] New sock connected
496-
// 4. [EVENT] In-flight commands completed
497-
// 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
498-
// 6. [ACTION] Destroy old socket
499-
// this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
500-
// // 1
501-
// console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
502-
503-
// // 2
504-
// console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
505-
// // this.#paused = true;
506-
507-
// const oldSocket = this.#socket;
508-
// this.#socket = this.#initiateSocket({
509-
// ...this.#options,
510-
// socket: {
511-
// ...this.#options?.socket,
512-
// host,
513-
// port
514-
// }
515-
// });
516-
517-
// // 3
518-
// this.#socket.once('ready', () => {
519-
// //TODO handshake...???
520-
// console.log(`Connected to ${host}:${port}`);
521-
522-
// // 4
523-
// if(!this.#queue.isWaitingForReply()) {
524-
// // 5 and 6
525-
// console.log(`All in-flight commands completed`);
526-
// console.log(`Resume writing`)
527-
// oldSocket.destroy();
528-
// this.#paused = false;
529-
// }
530-
// });
531-
532-
// // 4
533-
// this.#queue.events.once('waitingForReplyEmpty', () => {
534-
// console.log(`All in-flight commands completed`);
535-
// // 3
536-
// if(this.#socket.isReady) {
537-
// // 5 and 6
538-
// console.log(`Connected to ${host}:${port}`);
539-
// console.log(`Resume writing`)
540-
// oldSocket.destroy();
541-
// this.#paused = false;
542-
// }
543-
// });
544-
545-
// await this.#socket.connect()
546-
// });
497+
498+
if(options?.gracefulMaintenance) {
499+
new EnterpriseMaintenanceManager(this, this.#queue, this.#options!);
500+
}
547501

548502
if (options?.clientSideCache) {
549503
if (options.clientSideCache instanceof ClientSideCacheProvider) {

packages/client/lib/client/socket.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ export default class RedisSocket extends EventEmitter {
250250

251251
let onTimeout;
252252
if (this.#connectTimeout !== undefined) {
253+
console.log('#connectTimeout',this.#connectTimeout)
253254
onTimeout = () => socket.destroy(new ConnectionTimeoutError());
254255
socket.once('timeout', onTimeout);
255256
socket.setTimeout(this.#connectTimeout);
@@ -266,6 +267,7 @@ export default class RedisSocket extends EventEmitter {
266267
}
267268

268269
if (this.#socketTimeout) {
270+
console.log('#socketTimeout',this.#socketTimeout)
269271
socket.once('timeout', () => {
270272
socket.destroy(new SocketTimeoutError(this.#socketTimeout!));
271273
});

0 commit comments

Comments
 (0)