1+ import EventEmitter from "events" ;
12import { RedisClientOptions } from "." ;
23import RedisCommandsQueue from "./commands-queue" ;
34import RedisSocket from "./socket" ;
45
5- export default class EnterpriseMaintenanceManager {
6- client : any ;
6+ export default class EnterpriseMaintenanceManager extends EventEmitter {
77 commandsQueue : RedisCommandsQueue ;
88 options : RedisClientOptions ;
99 constructor (
10- client : any ,
1110 commandsQueue : RedisCommandsQueue ,
1211 options : RedisClientOptions ,
1312 ) {
14- this . client = client ;
13+ super ( ) ;
1514 this . commandsQueue = commandsQueue ;
1615 this . options = options ;
1716
1817 this . commandsQueue . events . on ( "moving" , this . #onMoving) ;
1918 }
2019
21- // Queue
20+ // Queue:
2221 // toWrite [ C D E ]
2322 // waitingForReply [ A B ]
2423 //
@@ -27,7 +26,7 @@ export default class EnterpriseMaintenanceManager {
2726 // 1. [EVENT] MOVING PN received
2827 // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
2928 // 3. [EVENT] New socket connected
30- // 4. [EVENT] In-flight commands completed
29+ // 4. [EVENT] WaitingForReply commands completed
3130 // 5. [ACTION] Destroy old socket
3231 // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
3332 #onMoving = async (
@@ -38,7 +37,7 @@ export default class EnterpriseMaintenanceManager {
3837 // 1 [EVENT] MOVING PN received
3938 console . log ( '[EnterpriseMaintenanceManager] Pausing client' ) ;
4039 // 2 [ACTION] Pause writing
41- this . client . pause ( ) ;
40+ this . emit ( 'pause' )
4241
4342 console . log ( `[EnterpriseMaintenanceManager] Creating new socket for ${ host } :${ port } ` ) ;
4443 const newSocket = new RedisSocket ( {
@@ -65,20 +64,10 @@ export default class EnterpriseMaintenanceManager {
6564 } ) ;
6665 }
6766 } ) ;
68- // 4 [EVENT] Reply queue now empty
67+ // 4 [EVENT] WaitingForReply commands completed
6968
70- // 5 [ACTION] Destroy old socket
71- // Switch to the new socket and clean up the old one
72- console . log ( '[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one' ) ;
73- const oldSocket = this . client . socket ;
74- this . client . socket = newSocket ;
75- oldSocket . removeAllListeners ( ) ;
76- oldSocket . destroy ( ) ;
77- console . log ( '[EnterpriseMaintenanceManager] Old socket destroyed' ) ;
69+ // 5 + 6
70+ this . emit ( 'resume' , newSocket ) ;
7871
79- // 6 [ACTION] Resume writing
80- console . log ( '[EnterpriseMaintenanceManager] Resuming client' ) ;
81- this . client . resume ( ) ;
82- console . log ( '[EnterpriseMaintenanceManager] Socket migration complete' ) ;
8372 } ;
8473}
0 commit comments