@@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
2020import { BasicCommandParser , CommandParser } from './parser' ;
2121import SingleEntryCache from '../single-entry-cache' ;
2222import { version } from '../../package.json'
23+ import EnterpriseMaintenanceManager from './enterprise-maintenance-manager' ;
2324
2425export interface RedisClientOptions <
2526 M extends RedisModules = RedisModules ,
@@ -469,6 +470,15 @@ export default class RedisClient<
469470 this . #initiateSocket( ) ;
470471 }
471472
473+ pause ( ) {
474+ this . _self . #paused = true ;
475+ }
476+
477+ resume ( ) {
478+ this . _self . #paused = false ;
479+ this . _self . #maybeScheduleWrite( ) ;
480+ }
481+
472482 /**
473483 * Marks the client's WATCH command as invalidated due to a topology change.
474484 * This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -484,66 +494,10 @@ export default class RedisClient<
484494 this . #options = this . #initiateOptions( options ) ;
485495 this . #queue = this . #initiateQueue( ) ;
486496 this . #socket = this . #createSocket( this . #options) ;
487- // Queue
488- // toWrite [ C D E ]
489- // waitingForReply [ A B ]
490- //
491- // time: ---1-2---3-4-5-6---------------------------
492- //
493- // 1. [EVENT] MOVING PN received
494- // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
495- // 3. [EVENT] New sock connected
496- // 4. [EVENT] In-flight commands completed
497- // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
498- // 6. [ACTION] Destroy old socket
499- // this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
500- // // 1
501- // console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
502-
503- // // 2
504- // console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
505- // // this.#paused = true;
506-
507- // const oldSocket = this.#socket;
508- // this.#socket = this.#initiateSocket({
509- // ...this.#options,
510- // socket: {
511- // ...this.#options?.socket,
512- // host,
513- // port
514- // }
515- // });
516-
517- // // 3
518- // this.#socket.once('ready', () => {
519- // //TODO handshake...???
520- // console.log(`Connected to ${host}:${port}`);
521-
522- // // 4
523- // if(!this.#queue.isWaitingForReply()) {
524- // // 5 and 6
525- // console.log(`All in-flight commands completed`);
526- // console.log(`Resume writing`)
527- // oldSocket.destroy();
528- // this.#paused = false;
529- // }
530- // });
531-
532- // // 4
533- // this.#queue.events.once('waitingForReplyEmpty', () => {
534- // console.log(`All in-flight commands completed`);
535- // // 3
536- // if(this.#socket.isReady) {
537- // // 5 and 6
538- // console.log(`Connected to ${host}:${port}`);
539- // console.log(`Resume writing`)
540- // oldSocket.destroy();
541- // this.#paused = false;
542- // }
543- // });
544-
545- // await this.#socket.connect()
546- // });
497+
498+ if ( options ?. gracefulMaintenance ) {
499+ new EnterpriseMaintenanceManager ( this , this . #queue, this . #options! ) ;
500+ }
547501
548502 if ( options ?. clientSideCache ) {
549503 if ( options . clientSideCache instanceof ClientSideCacheProvider ) {
0 commit comments