@@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
20
20
import { BasicCommandParser , CommandParser } from './parser' ;
21
21
import SingleEntryCache from '../single-entry-cache' ;
22
22
import { version } from '../../package.json'
23
+ import EnterpriseMaintenanceManager from './enterprise-maintenance-manager' ;
23
24
24
25
export interface RedisClientOptions <
25
26
M extends RedisModules = RedisModules ,
@@ -493,6 +494,15 @@ export default class RedisClient<
493
494
this . #initiateSocket( ) ;
494
495
}
495
496
497
+ pause ( ) {
498
+ this . _self . #paused = true ;
499
+ }
500
+
501
+ resume ( ) {
502
+ this . _self . #paused = false ;
503
+ this . _self . #maybeScheduleWrite( ) ;
504
+ }
505
+
496
506
/**
497
507
* Marks the client's WATCH command as invalidated due to a topology change.
498
508
* This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -508,66 +518,10 @@ export default class RedisClient<
508
518
this . #options = this . #initiateOptions( options ) ;
509
519
this . #queue = this . #initiateQueue( ) ;
510
520
this . #socket = this . #createSocket( this . #options) ;
511
- // Queue
512
- // toWrite [ C D E ]
513
- // waitingForReply [ A B ]
514
- //
515
- // time: ---1-2---3-4-5-6---------------------------
516
- //
517
- // 1. [EVENT] MOVING PN received
518
- // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
519
- // 3. [EVENT] New sock connected
520
- // 4. [EVENT] In-flight commands completed
521
- // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
522
- // 6. [ACTION] Destroy old socket
523
- // this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
524
- // // 1
525
- // console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
526
-
527
- // // 2
528
- // console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
529
- // // this.#paused = true;
530
-
531
- // const oldSocket = this.#socket;
532
- // this.#socket = this.#initiateSocket({
533
- // ...this.#options,
534
- // socket: {
535
- // ...this.#options?.socket,
536
- // host,
537
- // port
538
- // }
539
- // });
540
-
541
- // // 3
542
- // this.#socket.once('ready', () => {
543
- // //TODO handshake...???
544
- // console.log(`Connected to ${host}:${port}`);
545
-
546
- // // 4
547
- // if(!this.#queue.isWaitingForReply()) {
548
- // // 5 and 6
549
- // console.log(`All in-flight commands completed`);
550
- // console.log(`Resume writing`)
551
- // oldSocket.destroy();
552
- // this.#paused = false;
553
- // }
554
- // });
555
-
556
- // // 4
557
- // this.#queue.events.once('waitingForReplyEmpty', () => {
558
- // console.log(`All in-flight commands completed`);
559
- // // 3
560
- // if(this.#socket.isReady) {
561
- // // 5 and 6
562
- // console.log(`Connected to ${host}:${port}`);
563
- // console.log(`Resume writing`)
564
- // oldSocket.destroy();
565
- // this.#paused = false;
566
- // }
567
- // });
568
-
569
- // await this.#socket.connect()
570
- // });
521
+
522
+ if ( options ?. gracefulMaintenance ) {
523
+ new EnterpriseMaintenanceManager ( this , this . #queue, this . #options! ) ;
524
+ }
571
525
572
526
if ( options ?. clientSideCache ) {
573
527
if ( options . clientSideCache instanceof ClientSideCacheProvider ) {
0 commit comments