@@ -474,7 +474,7 @@ export default class RedisClient<
474474 this . #validateOptions( options )
475475 this . #options = this . #initiateOptions( options ) ;
476476 this . #queue = this . #initiateQueue( ) ;
477- this . #socket = this . #initiateSocket ( this . #options) ;
477+ this . #socket = this . #createSocket ( this . #options) ;
478478 // Queue
479479 // toWrite [ C D E ]
480480 // waitingForReply [ A B ]
@@ -487,54 +487,54 @@ export default class RedisClient<
487487 // 4. [EVENT] In-flight commands completed
488488 // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on
489489 // 6. [ACTION] Destroy old socket
490- this . options ?. gracefulMaintenance && this . #queue. events . on ( 'moving' , async ( afterMs : number , host : string , port : number ) => {
491- // 1
492- console . log ( `Moving to ${ host } :${ port } before ${ afterMs } ms` ) ;
493-
494- // 2
495- console . log ( `Pausing writing until new socket is ready and all in-flight commands are completed` ) ;
496- // this.#paused = true;
497-
498- const oldSocket = this . #socket;
499- this . #socket = this . #initiateSocket( {
500- ...this . #options,
501- socket : {
502- ...this . #options?. socket ,
503- host,
504- port
505- }
506- } ) ;
507-
508- // 3
509- this . #socket. once ( 'ready' , ( ) => {
510- //TODO handshake...???
511- console . log ( `Connected to ${ host } :${ port } ` ) ;
512-
513- // 4
514- if ( ! this . #queue. isWaitingForReply ( ) ) {
515- // 5 and 6
516- console . log ( `All in-flight commands completed` ) ;
517- console . log ( `Resume writing` )
518- oldSocket . destroy ( ) ;
519- this . #paused = false ;
520- }
521- } ) ;
522-
523- // 4
524- this . #queue. events . once ( 'waitingForReplyEmpty' , ( ) => {
525- console . log ( `All in-flight commands completed` ) ;
526- // 3
527- if ( this . #socket. isReady ) {
528- // 5 and 6
529- console . log ( `Connected to ${ host } :${ port } ` ) ;
530- console . log ( `Resume writing` )
531- oldSocket . destroy ( ) ;
532- this . #paused = false ;
533- }
534- } ) ;
535-
536- await this . #socket. connect ( )
537- } ) ;
490+ // this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => {
491+ // // 1
492+ // console.log(`Moving to ${host}:${port} before ${afterMs}ms`);
493+
494+ // // 2
495+ // console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`);
496+ // // this.#paused = true;
497+
498+ // const oldSocket = this.#socket;
499+ // this.#socket = this.#initiateSocket({
500+ // ...this.#options,
501+ // socket: {
502+ // ...this.#options?.socket,
503+ // host,
504+ // port
505+ // }
506+ // });
507+
508+ // // 3
509+ // this.#socket.once('ready', () => {
510+ // //TODO handshake...???
511+ // console.log(`Connected to ${host}:${port}`);
512+
513+ // // 4
514+ // if(!this.#queue.isWaitingForReply()) {
515+ // // 5 and 6
516+ // console.log(`All in-flight commands completed`);
517+ // console.log(`Resume writing`)
518+ // oldSocket.destroy();
519+ // this.#paused = false;
520+ // }
521+ // });
522+
523+ // // 4
524+ // this.#queue.events.once('waitingForReplyEmpty', () => {
525+ // console.log(`All in-flight commands completed`);
526+ // // 3
527+ // if(this.#socket.isReady) {
528+ // // 5 and 6
529+ // console.log(`Connected to ${host}:${port}`);
530+ // console.log(`Resume writing`)
531+ // oldSocket.destroy();
532+ // this.#paused = false;
533+ // }
534+ // });
535+
536+ // await this.#socket.connect()
537+ // });
538538
539539 if ( options ?. clientSideCache ) {
540540 if ( options . clientSideCache instanceof ClientSideCacheProvider ) {
@@ -764,39 +764,42 @@ export default class RedisClient<
764764 return commands ;
765765 }
766766
767- #initiateSocket( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : RedisSocket {
768- const socketInitiator = async ( ) => {
769- console . log ( 'Initiator...' ) ;
770- const promises = [ ] ,
771- chainId = Symbol ( 'Socket Initiator' ) ;
767+ async #initiateSocket( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : Promise < void > {
768+ await this . #socket. waitForReady ( ) ;
769+ console . log ( 'Initiator...' ) ;
770+ const promises = [ ] ;
771+ const chainId = Symbol ( 'Socket Initiator' ) ;
772+
773+ const resubscribePromise = this . #queue. resubscribe ( chainId ) ;
774+ if ( resubscribePromise ) {
775+ promises . push ( resubscribePromise ) ;
776+ }
772777
773- const resubscribePromise = this . #queue. resubscribe ( chainId ) ;
774- if ( resubscribePromise ) {
775- promises . push ( resubscribePromise ) ;
776- }
778+ if ( this . #monitorCallback) {
779+ promises . push (
780+ this . #queue. monitor (
781+ this . #monitorCallback,
782+ {
783+ typeMapping : this . _commandOptions ?. typeMapping ,
784+ chainId,
785+ asap : true
786+ }
787+ )
788+ ) ;
789+ }
777790
778- if ( this . #monitorCallback) {
779- promises . push (
780- this . #queue. monitor (
781- this . #monitorCallback,
782- {
783- typeMapping : this . _commandOptions ?. typeMapping ,
784- chainId,
785- asap : true
786- }
787- )
788- ) ;
789- }
791+ promises . push ( ...( await this . #handshake( chainId , true ) ) ) ;
790792
791- promises . push ( ... ( await this . #handshake ( chainId , true ) ) ) ;
793+ this . #setPingTimer ( ) ;
792794
793- if ( promises . length ) {
794- this . #write( ) ;
795- return Promise . all ( promises ) ;
796- }
797- } ;
795+ if ( promises . length ) {
796+ this . #write( ) ;
797+ await Promise . all ( promises ) ;
798+ }
799+ }
798800
799- return new RedisSocket ( socketInitiator , options ?. socket )
801+ #createSocket( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : RedisSocket {
802+ return new RedisSocket ( options ?. socket )
800803 . on ( 'data' , chunk => {
801804 try {
802805 this . #queue. decoder . write ( chunk ) ;
@@ -818,8 +821,6 @@ export default class RedisClient<
818821 . on ( 'ready' , ( ) => {
819822 console . log ( 'Socket ready' ) ;
820823 this . emit ( 'ready' ) ;
821- this . #setPingTimer( ) ;
822- this . #maybeScheduleWrite( ) ;
823824 } )
824825 . on ( 'reconnecting' , ( ) => this . emit ( 'reconnecting' ) )
825826 . on ( 'drain' , ( ) => this . #maybeScheduleWrite( ) )
@@ -932,6 +933,7 @@ export default class RedisClient<
932933
933934 async connect ( ) {
934935 await this . _self . #socket. connect ( ) ;
936+ await this . _self . #initiateSocket( this . _self . #options) ;
935937 return this as unknown as RedisClientType < M , F , S , RESP , TYPE_MAPPING > ;
936938 }
937939
0 commit comments