@@ -460,6 +460,15 @@ export default class RedisClient<
460460 return this . _self . #dirtyWatch !== undefined
461461 }
462462
463+ get socket ( ) {
464+ return this . _self . #socket;
465+ }
466+
467+ set socket ( socket : RedisSocket ) {
468+ this . _self . #socket = socket ;
469+ this . #initiateSocket( ) ;
470+ }
471+
463472 /**
464473 * Marks the client's WATCH command as invalidated due to a topology change.
465474 * This will cause any subsequent EXEC in a transaction to fail with a WatchError.
@@ -557,6 +566,7 @@ export default class RedisClient<
557566 }
558567
559568 }
569+
560570 #initiateOptions( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > | undefined {
561571
562572 // Convert username/password to credentialsProvider if no credentialsProvider is already in place
@@ -766,6 +776,29 @@ export default class RedisClient<
766776
767777 async #initiateSocket( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : Promise < void > {
768778 await this . #socket. waitForReady ( ) ;
779+
780+ this . #socket
781+ . on ( 'data' , chunk => {
782+ try {
783+ this . #queue. decoder . write ( chunk ) ;
784+ } catch ( err ) {
785+ this . #queue. resetDecoder ( ) ;
786+ this . emit ( 'error' , err ) ;
787+ }
788+ } )
789+ . on ( 'error' , err => {
790+ this . emit ( 'error' , err ) ;
791+ this . #clientSideCache?. onError ( ) ;
792+ if ( this . #socket. isOpen && ! this . #options?. disableOfflineQueue ) {
793+ this . #queue. flushWaitingForReply ( err ) ;
794+ } else {
795+ this . #queue. flushAll ( err ) ;
796+ }
797+ } )
798+ . on ( 'reconnecting' , ( ) => this . emit ( 'reconnecting' ) )
799+ . on ( 'drain' , ( ) => this . #maybeScheduleWrite( ) )
800+ . on ( 'end' , ( ) => this . emit ( 'end' ) ) ;
801+
769802 console . log ( 'Initiator...' ) ;
770803 const promises = [ ] ;
771804 const chainId = Symbol ( 'Socket Initiator' ) ;
@@ -800,31 +833,11 @@ export default class RedisClient<
800833
801834 #createSocket( options ?: RedisClientOptions < M , F , S , RESP , TYPE_MAPPING > ) : RedisSocket {
802835 return new RedisSocket ( options ?. socket )
803- . on ( 'data' , chunk => {
804- try {
805- this . #queue. decoder . write ( chunk ) ;
806- } catch ( err ) {
807- this . #queue. resetDecoder ( ) ;
808- this . emit ( 'error' , err ) ;
809- }
810- } )
811- . on ( 'error' , err => {
812- this . emit ( 'error' , err ) ;
813- this . #clientSideCache?. onError ( ) ;
814- if ( this . #socket. isOpen && ! this . #options?. disableOfflineQueue ) {
815- this . #queue. flushWaitingForReply ( err ) ;
816- } else {
817- this . #queue. flushAll ( err ) ;
818- }
819- } )
820836 . on ( 'connect' , ( ) => this . emit ( 'connect' ) )
821837 . on ( 'ready' , ( ) => {
822838 console . log ( 'Socket ready' ) ;
823839 this . emit ( 'ready' ) ;
824- } )
825- . on ( 'reconnecting' , ( ) => this . emit ( 'reconnecting' ) )
826- . on ( 'drain' , ( ) => this . #maybeScheduleWrite( ) )
827- . on ( 'end' , ( ) => this . emit ( 'end' ) ) ;
840+ } ) ;
828841 }
829842
830843 #pingTimer?: NodeJS . Timeout ;
0 commit comments