Skip to content

Commit 9a8f4b9

Browse files
committed
refactor - remove reference to client
1 parent 3c7f27e commit 9a8f4b9

File tree

2 files changed

+20
-34
lines changed

2 files changed

+20
-34
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1+
import EventEmitter from "events";
12
import { RedisClientOptions } from ".";
23
import RedisCommandsQueue from "./commands-queue";
34
import RedisSocket from "./socket";
45

5-
export default class EnterpriseMaintenanceManager {
6-
client: any;
6+
export default class EnterpriseMaintenanceManager extends EventEmitter {
77
commandsQueue: RedisCommandsQueue;
88
options: RedisClientOptions;
99
constructor(
10-
client: any,
1110
commandsQueue: RedisCommandsQueue,
1211
options: RedisClientOptions,
1312
) {
14-
this.client = client;
13+
super();
1514
this.commandsQueue = commandsQueue;
1615
this.options = options;
1716

1817
this.commandsQueue.events.on("moving", this.#onMoving);
1918
}
2019

21-
// Queue
20+
// Queue:
2221
// toWrite [ C D E ]
2322
// waitingForReply [ A B ]
2423
//
@@ -27,7 +26,7 @@ export default class EnterpriseMaintenanceManager {
2726
// 1. [EVENT] MOVING PN received
2827
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
2928
// 3. [EVENT] New socket connected
30-
// 4. [EVENT] In-flight commands completed
29+
// 4. [EVENT] WaitingForReply commands completed
3130
// 5. [ACTION] Destroy old socket
3231
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
3332
#onMoving = async (
@@ -38,7 +37,7 @@ export default class EnterpriseMaintenanceManager {
3837
// 1 [EVENT] MOVING PN received
3938
console.log('[EnterpriseMaintenanceManager] Pausing client');
4039
// 2 [ACTION] Pause writing
41-
this.client.pause();
40+
this.emit('pause')
4241

4342
console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`);
4443
const newSocket = new RedisSocket({
@@ -65,20 +64,10 @@ export default class EnterpriseMaintenanceManager {
6564
});
6665
}
6766
});
68-
// 4 [EVENT] Reply queue now empty
67+
// 4 [EVENT] WaitingForReply commands completed
6968

70-
// 5 [ACTION] Destroy old socket
71-
// Switch to the new socket and clean up the old one
72-
console.log('[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one');
73-
const oldSocket = this.client.socket;
74-
this.client.socket = newSocket;
75-
oldSocket.removeAllListeners();
76-
oldSocket.destroy();
77-
console.log('[EnterpriseMaintenanceManager] Old socket destroyed');
69+
// 5 + 6
70+
this.emit('resume', newSocket);
7871

79-
// 6 [ACTION] Resume writing
80-
console.log('[EnterpriseMaintenanceManager] Resuming client');
81-
this.client.resume();
82-
console.log('[EnterpriseMaintenanceManager] Socket migration complete');
8372
};
8473
}

packages/client/lib/client/index.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -461,21 +461,16 @@ export default class RedisClient<
461461
return this._self.#dirtyWatch !== undefined
462462
}
463463

464-
get socket() {
465-
return this._self.#socket;
466-
}
467-
468-
set socket(socket: RedisSocket) {
469-
this._self.#socket = socket;
470-
this.#initiateSocket();
471-
}
472-
473-
pause() {
464+
#pauseForMaintenance() {
474465
this._self.#paused = true;
475466
}
476467

477-
resume() {
468+
#resumeFromMaintenance(newSocket: RedisSocket) {
469+
this._self.#socket.removeAllListeners();
470+
this._self.#socket.destroy();
471+
this._self.#socket = newSocket;
478472
this._self.#paused = false;
473+
this._self.#initiateSocket();
479474
this._self.#maybeScheduleWrite();
480475
}
481476

@@ -496,7 +491,9 @@ export default class RedisClient<
496491
this.#socket = this.#createSocket(this.#options);
497492

498493
if(options?.gracefulMaintenance) {
499-
new EnterpriseMaintenanceManager(this, this.#queue, this.#options!);
494+
new EnterpriseMaintenanceManager(this.#queue, this.#options!)
495+
.on('pause', this.#pauseForMaintenance.bind(this))
496+
.on('resume', this.#resumeFromMaintenance.bind(this))
500497
}
501498

502499
if (options?.clientSideCache) {
@@ -728,7 +725,7 @@ export default class RedisClient<
728725
return commands;
729726
}
730727

731-
async #initiateSocket(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): Promise<void> {
728+
async #initiateSocket(): Promise<void> {
732729
await this.#socket.waitForReady();
733730

734731
this.#socket
@@ -900,7 +897,7 @@ export default class RedisClient<
900897

901898
async connect() {
902899
await this._self.#socket.connect();
903-
await this._self.#initiateSocket(this._self.#options);
900+
await this._self.#initiateSocket();
904901
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
905902
}
906903

0 commit comments

Comments
 (0)