@@ -52,15 +52,11 @@ function RedisClient(stream, options) {
52
52
this . should_buffer = false ;
53
53
this . command_queue_high_water = this . options . command_queue_high_water || 1000 ;
54
54
this . command_queue_low_water = this . options . command_queue_low_water || 0 ;
55
- if ( options . max_attempts && options . max_attempts > 0 ) {
56
- this . max_attempts = + options . max_attempts ;
57
- }
55
+ this . max_attempts = + options . max_attempts || 0 ;
58
56
this . command_queue = new Queue ( ) ; // holds sent commands to de-pipeline them
59
57
this . offline_queue = new Queue ( ) ; // holds commands issued but not able to be sent
60
58
this . commands_sent = 0 ;
61
- if ( options . connect_timeout && options . connect_timeout > 0 ) {
62
- this . connect_timeout = + options . connect_timeout ;
63
- }
59
+ this . connect_timeout = + options . connect_timeout || 86400000 ; // 24 * 60 * 60 * 1000 ms
64
60
this . enable_offline_queue = true ;
65
61
if ( this . options . enable_offline_queue === false ) {
66
62
this . enable_offline_queue = false ;
@@ -123,7 +119,7 @@ RedisClient.prototype.install_stream_listeners = function() {
123
119
RedisClient . prototype . initialize_retry_vars = function ( ) {
124
120
this . retry_timer = null ;
125
121
this . retry_totaltime = 0 ;
126
- this . retry_delay = 150 ;
122
+ this . retry_delay = 200 ;
127
123
this . retry_backoff = 1.7 ;
128
124
this . attempts = 1 ;
129
125
} ;
@@ -141,21 +137,17 @@ RedisClient.prototype.unref = function () {
141
137
} ;
142
138
143
139
// flush offline_queue and command_queue, erroring any items with a callback first
144
- RedisClient . prototype . flush_and_error = function ( message ) {
145
- var command_obj , error ;
146
-
147
- error = new Error ( message ) ;
140
+ RedisClient . prototype . flush_and_error = function ( error ) {
141
+ var command_obj ;
148
142
149
- while ( this . offline_queue . length > 0 ) {
150
- command_obj = this . offline_queue . shift ( ) ;
143
+ while ( command_obj = this . offline_queue . shift ( ) ) {
151
144
if ( typeof command_obj . callback === "function" ) {
152
145
command_obj . callback ( error ) ;
153
146
}
154
147
}
155
148
this . offline_queue = new Queue ( ) ;
156
149
157
- while ( this . command_queue . length > 0 ) {
158
- command_obj = this . command_queue . shift ( ) ;
150
+ while ( command_obj = this . command_queue . shift ( ) ) {
159
151
if ( typeof command_obj . callback === "function" ) {
160
152
command_obj . callback ( error ) ;
161
153
}
@@ -172,8 +164,6 @@ RedisClient.prototype.on_error = function (msg) {
172
164
173
165
debug ( message ) ;
174
166
175
- this . flush_and_error ( message ) ;
176
-
177
167
this . connected = false ;
178
168
this . ready = false ;
179
169
@@ -399,8 +389,8 @@ RedisClient.prototype.ready_check = function () {
399
389
RedisClient . prototype . send_offline_queue = function ( ) {
400
390
var command_obj , buffered_writes = 0 ;
401
391
402
- while ( this . offline_queue . length > 0 ) {
403
- command_obj = this . offline_queue . shift ( ) ;
392
+ // TODO: Implement queue.pop() as it should be faster than shift and evaluate petka antonovs queue
393
+ while ( command_obj = this . offline_queue . shift ( ) ) {
404
394
debug ( "Sending offline command: " + command_obj . command ) ;
405
395
buffered_writes += ! this . send_command ( command_obj . command , command_obj . args , command_obj . callback ) ;
406
396
}
@@ -438,56 +428,54 @@ RedisClient.prototype.connection_gone = function (why) {
438
428
}
439
429
440
430
// since we are collapsing end and close, users don't expect to be called twice
441
- if ( ! this . emitted_end ) {
431
+ if ( ! this . emitted_end ) {
442
432
this . emit ( "end" ) ;
443
433
this . emitted_end = true ;
444
434
}
445
435
446
- this . flush_and_error ( "Redis connection gone from " + why + " event." ) ;
447
-
448
436
// If this is a requested shutdown, then don't retry
449
437
if ( this . closing ) {
450
- this . retry_timer = null ;
451
- debug ( "Connection ended from quit command, not retrying." ) ;
438
+ debug ( "connection ended from quit command, not retrying." ) ;
439
+ this . flush_and_error ( new Error ( "Redis connection gone from " + why + " event." ) ) ;
440
+ return ;
441
+ }
442
+
443
+ if ( this . max_attempts !== 0 && this . attempts >= this . max_attempts || this . retry_totaltime >= this . connect_timeout ) {
444
+ var message = this . retry_totaltime >= this . connect_timeout ?
445
+ 'connection timeout exceeded.' :
446
+ 'maximum connection attempts exceeded.' ;
447
+ var error = new Error ( "Redis connection in broken state: " + message ) ;
448
+ error . code = 'CONNECTION_BROKEN' ;
449
+ this . flush_and_error ( error ) ;
450
+ this . emit ( 'error' , error ) ;
451
+ this . end ( ) ;
452
452
return ;
453
453
}
454
454
455
- var nextDelay = Math . floor ( this . retry_delay * this . retry_backoff ) ;
456
- if ( this . retry_max_delay !== null && nextDelay > this . retry_max_delay ) {
455
+ if ( this . retry_max_delay !== null && this . retry_delay > this . retry_max_delay ) {
457
456
this . retry_delay = this . retry_max_delay ;
458
- } else {
459
- this . retry_delay = nextDelay ;
457
+ } else if ( this . retry_totaltime + this . retry_delay > this . connect_timeout ) {
458
+ // Do not exceed the maximum
459
+ this . retry_delay = this . connect_timeout - this . retry_totaltime ;
460
460
}
461
461
462
462
debug ( "Retry connection in " + this . retry_delay + " ms" ) ;
463
463
464
- if ( this . max_attempts && this . attempts >= this . max_attempts ) {
465
- this . retry_timer = null ;
466
- // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
467
- // want the program to exit. Right now, we just log, which doesn't really help in either case.
468
- debug ( "Couldn't get Redis connection after " + this . max_attempts + " attempts." ) ;
469
- return ;
470
- }
471
-
472
- this . attempts += 1 ;
473
- this . emit ( "reconnecting" , {
474
- delay : self . retry_delay ,
475
- attempt : self . attempts
476
- } ) ;
477
464
this . retry_timer = setTimeout ( function ( ) {
478
465
debug ( "Retrying connection..." ) ;
479
466
480
- self . retry_totaltime += self . retry_delay ;
467
+ self . emit ( "reconnecting" , {
468
+ delay : self . retry_delay ,
469
+ attempt : self . attempts
470
+ } ) ;
481
471
482
- if ( self . connect_timeout && self . retry_totaltime >= self . connect_timeout ) {
483
- self . retry_timer = null ;
484
- // TODO - engage Redis is Broken mode for future commands, or whatever
485
- debug ( "Couldn't get Redis connection after " + self . retry_totaltime + "ms." ) ;
486
- return ;
487
- }
472
+ self . retry_totaltime += self . retry_delay ;
473
+ self . attempts += 1 ;
474
+ self . retry_delay = Math . round ( self . retry_delay * self . retry_backoff ) ;
488
475
489
476
self . stream = net . createConnection ( self . connectionOption ) ;
490
477
self . install_stream_listeners ( ) ;
478
+
491
479
self . retry_timer = null ;
492
480
} , this . retry_delay ) ;
493
481
} ;
@@ -836,12 +824,12 @@ RedisClient.prototype.pub_sub_command = function (command_obj) {
836
824
RedisClient . prototype . end = function ( ) {
837
825
this . stream . _events = { } ;
838
826
839
- //clear retry_timer
840
- if ( this . retry_timer ) {
827
+ // Clear retry_timer
828
+ if ( this . retry_timer ) {
841
829
clearTimeout ( this . retry_timer ) ;
842
- this . retry_timer = null ;
830
+ this . retry_timer = null ;
843
831
}
844
- this . stream . on ( "error" , function ( ) { } ) ;
832
+ this . stream . on ( "error" , function noop ( ) { } ) ;
845
833
846
834
this . connected = false ;
847
835
this . ready = false ;
@@ -1047,7 +1035,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
1047
1035
1048
1036
// TODO - make this callback part of Multi.prototype instead of creating it each time
1049
1037
return this . _client . send_command ( "exec" , [ ] , function ( err , replies ) {
1050
- if ( err ) {
1038
+ if ( err && ! err . code ) {
1051
1039
if ( callback ) {
1052
1040
errors . push ( err ) ;
1053
1041
callback ( errors ) ;
@@ -1083,6 +1071,9 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
1083
1071
1084
1072
if ( callback ) {
1085
1073
callback ( null , replies ) ;
1074
+ } else if ( err && err . code !== 'CONNECTION_BROKEN' ) {
1075
+ // Exclude CONNECTION_BROKEN so that error won't be emitted twice
1076
+ self . _client . emit ( 'error' , err ) ;
1086
1077
}
1087
1078
} ) ;
1088
1079
} ;
0 commit comments