@@ -3,6 +3,7 @@ import { EventEmitter } from "events";
3
3
import asCallback from "standard-as-callback" ;
4
4
import Cluster from "./cluster" ;
5
5
import Command from "./Command" ;
6
+ import { DataHandledable , FlushQueueOptions , Condition } from "./DataHandler" ;
6
7
import { StandaloneConnector } from "./connectors" ;
7
8
import AbstractConnector from "./connectors/AbstractConnector" ;
8
9
import SentinelConnector from "./connectors/SentinelConnector" ;
@@ -60,7 +61,7 @@ type RedisStatus =
60
61
* }
61
62
* ```
62
63
*/
63
- class Redis extends Commander {
64
+ class Redis extends Commander implements DataHandledable {
64
65
static Cluster = Cluster ;
65
66
static Command = Command ;
66
67
/**
@@ -89,14 +90,18 @@ class Redis extends Commander {
89
90
*/
90
91
isCluster = false ;
91
92
93
+ /**
94
+ * @ignore
95
+ */
96
+ condition : Condition | null ;
97
+
98
+ /**
99
+ * @ignore
100
+ */
101
+ commandQueue : Deque < CommandItem > ;
102
+
92
103
private connector : AbstractConnector ;
93
104
private reconnectTimeout : ReturnType < typeof setTimeout > | null = null ;
94
- private condition : {
95
- select : number ;
96
- auth ?: string | [ string , string ] ;
97
- subscriber : boolean ;
98
- } ;
99
- private commandQueue : Deque < CommandItem > ;
100
105
private offlineQueue : Deque ;
101
106
private connectionEpoch = 0 ;
102
107
private retryAttempts = 0 ;
@@ -220,9 +225,11 @@ class Redis extends Commander {
220
225
221
226
// Node ignores setKeepAlive before connect, therefore we wait for the event:
222
227
// https://github.com/nodejs/node/issues/31663
223
- if ( typeof options . keepAlive === ' number' ) {
228
+ if ( typeof options . keepAlive === " number" ) {
224
229
if ( stream . connecting ) {
225
- stream . once ( CONNECT_EVENT , ( ) => stream . setKeepAlive ( true , options . keepAlive ) ) ;
230
+ stream . once ( CONNECT_EVENT , ( ) => {
231
+ stream . setKeepAlive ( true , options . keepAlive ) ;
232
+ } ) ;
226
233
} else {
227
234
stream . setKeepAlive ( true , options . keepAlive ) ;
228
235
}
@@ -344,10 +351,10 @@ class Redis extends Commander {
344
351
* One of `"normal"`, `"subscriber"`, or `"monitor"`. When the connection is
345
352
* not in `"normal"` mode, certain commands are not allowed.
346
353
*/
347
- get mode ( ) : "normal" | "subscriber" | "monitor" {
354
+ get mode ( ) : "normal" | "subscriber" | "monitor" {
348
355
return this . options . monitor
349
356
? "monitor"
350
- : this . condition && this . condition . subscriber
357
+ : this . condition ? .subscriber
351
358
? "subscriber"
352
359
: "normal" ;
353
360
}
@@ -421,7 +428,7 @@ class Redis extends Commander {
421
428
return command . promise ;
422
429
}
423
430
if (
424
- this . condition . subscriber &&
431
+ this . condition ? .subscriber &&
425
432
! Command . checkFlag ( "VALID_IN_SUBSCRIBER_MODE" , command . name )
426
433
) {
427
434
command . reject (
@@ -491,7 +498,7 @@ class Redis extends Commander {
491
498
debug (
492
499
"write command[%s]: %d -> %s(%o)" ,
493
500
this . _getDescription ( ) ,
494
- this . condition . select ,
501
+ this . condition ? .select ,
495
502
command . name ,
496
503
command . args
497
504
) ;
@@ -600,45 +607,22 @@ class Redis extends Commander {
600
607
}
601
608
602
609
/**
603
- * Get description of the connection. Used for debugging.
610
+ * @ignore
604
611
*/
605
- private _getDescription ( ) {
606
- let description ;
607
- if ( "path" in this . options && this . options . path ) {
608
- description = this . options . path ;
609
- } else if (
610
- this . stream &&
611
- this . stream . remoteAddress &&
612
- this . stream . remotePort
613
- ) {
614
- description = this . stream . remoteAddress + ":" + this . stream . remotePort ;
615
- } else if ( "host" in this . options && this . options . host ) {
616
- description = this . options . host + ":" + this . options . port ;
617
- } else {
618
- // Unexpected
619
- description = "" ;
620
- }
621
- if ( this . options . connectionName ) {
622
- description += ` (${ this . options . connectionName } )` ;
623
- }
624
- return description ;
625
- }
626
-
627
- private resetCommandQueue ( ) {
628
- this . commandQueue = new Deque ( ) ;
629
- }
630
-
631
- private resetOfflineQueue ( ) {
632
- this . offlineQueue = new Deque ( ) ;
633
- }
634
-
635
- private recoverFromFatalError ( commandError , err : Error | null , options ) {
612
+ recoverFromFatalError (
613
+ _commandError : Error ,
614
+ err : Error ,
615
+ options : FlushQueueOptions
616
+ ) {
636
617
this . flushQueue ( err , options ) ;
637
618
this . silentEmit ( "error" , err ) ;
638
619
this . disconnect ( true ) ;
639
620
}
640
621
641
- private handleReconnection ( err : Error , item : CommandItem ) {
622
+ /**
623
+ * @ignore
624
+ */
625
+ handleReconnection ( err : Error , item : CommandItem ) {
642
626
let needReconnect : ReturnType < ReconnectOnError > = false ;
643
627
if ( this . options . reconnectOnError ) {
644
628
needReconnect = this . options . reconnectOnError ( err ) ;
@@ -657,7 +641,7 @@ class Redis extends Commander {
657
641
this . disconnect ( true ) ;
658
642
}
659
643
if (
660
- this . condition . select !== item . select &&
644
+ this . condition ? .select !== item . select &&
661
645
item . command . name !== "select"
662
646
) {
663
647
this . select ( item . select ) ;
@@ -671,6 +655,39 @@ class Redis extends Commander {
671
655
}
672
656
}
673
657
658
+ /**
659
+ * Get description of the connection. Used for debugging.
660
+ */
661
+ private _getDescription ( ) {
662
+ let description ;
663
+ if ( "path" in this . options && this . options . path ) {
664
+ description = this . options . path ;
665
+ } else if (
666
+ this . stream &&
667
+ this . stream . remoteAddress &&
668
+ this . stream . remotePort
669
+ ) {
670
+ description = this . stream . remoteAddress + ":" + this . stream . remotePort ;
671
+ } else if ( "host" in this . options && this . options . host ) {
672
+ description = this . options . host + ":" + this . options . port ;
673
+ } else {
674
+ // Unexpected
675
+ description = "" ;
676
+ }
677
+ if ( this . options . connectionName ) {
678
+ description += ` (${ this . options . connectionName } )` ;
679
+ }
680
+ return description ;
681
+ }
682
+
683
+ private resetCommandQueue ( ) {
684
+ this . commandQueue = new Deque ( ) ;
685
+ }
686
+
687
+ private resetOfflineQueue ( ) {
688
+ this . offlineQueue = new Deque ( ) ;
689
+ }
690
+
674
691
private parseOptions ( ...args : unknown [ ] ) {
675
692
const options : Record < string , unknown > = { } ;
676
693
let isTls = false ;
@@ -744,7 +761,7 @@ class Redis extends Commander {
744
761
* @param error The error object to send to the commands
745
762
* @param options options
746
763
*/
747
- private flushQueue ( error : Error , options ?: RedisOptions ) {
764
+ private flushQueue ( error : Error , options ?: FlushQueueOptions ) {
748
765
options = defaults ( { } , options , {
749
766
offlineQueue : true ,
750
767
commandQueue : true ,
0 commit comments