Skip to content

Commit f041260

Browse files
committed
refactor - remove reference to client
1 parent c6aa037 commit f041260

File tree

2 files changed

+20
-34
lines changed

2 files changed

+20
-34
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1+
import EventEmitter from "events";
12
import { RedisClientOptions } from ".";
23
import RedisCommandsQueue from "./commands-queue";
34
import RedisSocket from "./socket";
45

5-
export default class EnterpriseMaintenanceManager {
6-
client: any;
6+
export default class EnterpriseMaintenanceManager extends EventEmitter {
77
commandsQueue: RedisCommandsQueue;
88
options: RedisClientOptions;
99
constructor(
10-
client: any,
1110
commandsQueue: RedisCommandsQueue,
1211
options: RedisClientOptions,
1312
) {
14-
this.client = client;
13+
super();
1514
this.commandsQueue = commandsQueue;
1615
this.options = options;
1716

1817
this.commandsQueue.events.on("moving", this.#onMoving);
1918
}
2019

21-
// Queue
20+
// Queue:
2221
// toWrite [ C D E ]
2322
// waitingForReply [ A B ]
2423
//
@@ -27,7 +26,7 @@ export default class EnterpriseMaintenanceManager {
2726
// 1. [EVENT] MOVING PN received
2827
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
2928
// 3. [EVENT] New socket connected
30-
// 4. [EVENT] In-flight commands completed
29+
// 4. [EVENT] WaitingForReply commands completed
3130
// 5. [ACTION] Destroy old socket
3231
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
3332
#onMoving = async (
@@ -38,7 +37,7 @@ export default class EnterpriseMaintenanceManager {
3837
// 1 [EVENT] MOVING PN received
3938
console.log('[EnterpriseMaintenanceManager] Pausing client');
4039
// 2 [ACTION] Pause writing
41-
this.client.pause();
40+
this.emit('pause')
4241

4342
console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`);
4443
const newSocket = new RedisSocket({
@@ -65,20 +64,10 @@ export default class EnterpriseMaintenanceManager {
6564
});
6665
}
6766
});
68-
// 4 [EVENT] Reply queue now empty
67+
// 4 [EVENT] WaitingForReply commands completed
6968

70-
// 5 [ACTION] Destroy old socket
71-
// Switch to the new socket and clean up the old one
72-
console.log('[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one');
73-
const oldSocket = this.client.socket;
74-
this.client.socket = newSocket;
75-
oldSocket.removeAllListeners();
76-
oldSocket.destroy();
77-
console.log('[EnterpriseMaintenanceManager] Old socket destroyed');
69+
// 5 + 6
70+
this.emit('resume', newSocket);
7871

79-
// 6 [ACTION] Resume writing
80-
console.log('[EnterpriseMaintenanceManager] Resuming client');
81-
this.client.resume();
82-
console.log('[EnterpriseMaintenanceManager] Socket migration complete');
8372
};
8473
}

packages/client/lib/client/index.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -485,21 +485,16 @@ export default class RedisClient<
485485
return this._self.#dirtyWatch !== undefined
486486
}
487487

488-
get socket() {
489-
return this._self.#socket;
490-
}
491-
492-
set socket(socket: RedisSocket) {
493-
this._self.#socket = socket;
494-
this.#initiateSocket();
495-
}
496-
497-
pause() {
488+
#pauseForMaintenance() {
498489
this._self.#paused = true;
499490
}
500491

501-
resume() {
492+
#resumeFromMaintenance(newSocket: RedisSocket) {
493+
this._self.#socket.removeAllListeners();
494+
this._self.#socket.destroy();
495+
this._self.#socket = newSocket;
502496
this._self.#paused = false;
497+
this._self.#initiateSocket();
503498
this._self.#maybeScheduleWrite();
504499
}
505500

@@ -520,7 +515,9 @@ export default class RedisClient<
520515
this.#socket = this.#createSocket(this.#options);
521516

522517
if(options?.gracefulMaintenance) {
523-
new EnterpriseMaintenanceManager(this, this.#queue, this.#options!);
518+
new EnterpriseMaintenanceManager(this.#queue, this.#options!)
519+
.on('pause', this.#pauseForMaintenance.bind(this))
520+
.on('resume', this.#resumeFromMaintenance.bind(this))
524521
}
525522

526523
if (options?.clientSideCache) {
@@ -747,7 +744,7 @@ export default class RedisClient<
747744
return commands;
748745
}
749746

750-
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
747+
async #initiateSocket(): Promise<void> {
751748
await this.#socket.waitForReady();
752749

753750
this.#socket
@@ -919,7 +916,7 @@ export default class RedisClient<
919916

920917
async connect() {
921918
await this._self.#socket.connect();
922-
await this._self.#initiateSocket(this._self.#options);
919+
await this._self.#initiateSocket();
923920
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
924921
}
925922

0 commit comments

Comments
 (0)