@@ -5,6 +5,7 @@ var tls = require('tls');
5
5
var util = require ( 'util' ) ;
6
6
var utils = require ( './lib/utils' ) ;
7
7
var Queue = require ( 'double-ended-queue' ) ;
8
+ var CommandError = require ( './lib/customError' ) ;
8
9
var Command = require ( './lib/command' ) . Command ;
9
10
var OfflineCommand = require ( './lib/command' ) . OfflineCommand ;
10
11
var EventEmitter = require ( 'events' ) ;
@@ -264,11 +265,11 @@ RedisClient.prototype.create_stream = function () {
264
265
} ) ;
265
266
266
267
this . stream . once ( 'close' , function ( hadError ) {
267
- self . connection_gone ( 'close' , new Error ( 'Stream connection closed' + ( hadError ? ' because of a transmission error' : '' ) ) ) ;
268
+ self . connection_gone ( 'close' , hadError ? new Error ( 'Stream connection closed with a transmission error' ) : null ) ;
268
269
} ) ;
269
270
270
271
this . stream . once ( 'end' , function ( ) {
271
- self . connection_gone ( 'end' , new Error ( 'Stream connection ended' ) ) ;
272
+ self . connection_gone ( 'end' , null ) ;
272
273
} ) ;
273
274
274
275
this . stream . on ( 'drain' , function ( ) {
@@ -320,16 +321,29 @@ RedisClient.prototype.warn = function (msg) {
320
321
321
322
// Flush provided queues, erroring any items with a callback first
322
323
RedisClient . prototype . flush_and_error = function ( error , queue_names ) {
324
+ var callbacks_not_called = [ ] ;
323
325
queue_names = queue_names || [ 'offline_queue' , 'command_queue' ] ;
324
326
for ( var i = 0 ; i < queue_names . length ; i ++ ) {
325
327
for ( var command_obj = this [ queue_names [ i ] ] . shift ( ) ; command_obj ; command_obj = this [ queue_names [ i ] ] . shift ( ) ) {
328
+ var err = new CommandError ( error ) ;
329
+ err . command = command_obj . command . toUpperCase ( ) ;
330
+ if ( command_obj . args . length ) {
331
+ err . args = command_obj . args ;
332
+ }
326
333
if ( typeof command_obj . callback === 'function' ) {
327
- error . command = command_obj . command . toUpperCase ( ) ;
328
- command_obj . callback ( error ) ;
334
+ command_obj . callback ( err ) ;
335
+ } else {
336
+ callbacks_not_called . push ( err ) ;
329
337
}
330
338
}
331
339
this [ queue_names [ i ] ] = new Queue ( ) ;
332
340
}
341
+ // Mutate the original error that will be emitted
342
+ // This is fine, as we don't manipulate any user errors
343
+ if ( callbacks_not_called . length !== 0 ) {
344
+ error . errors = callbacks_not_called ;
345
+ }
346
+ return callbacks_not_called . length === 0 ;
333
347
} ;
334
348
335
349
RedisClient . prototype . on_error = function ( err ) {
@@ -546,8 +560,10 @@ RedisClient.prototype.connection_gone = function (why, error) {
546
560
547
561
// If this is a requested shutdown, then don't retry
548
562
if ( this . closing ) {
549
- debug ( 'Connection ended from quit command, not retrying.' ) ;
550
- this . flush_and_error ( new Error ( 'Redis connection gone from ' + why + ' event.' ) ) ;
563
+ debug ( 'Connection ended by quit / end command, not retrying.' ) ;
564
+ error = new Error ( 'Stream connection ended and running command aborted. It might have been processed.' ) ;
565
+ error . code = 'NR_OFFLINE' ;
566
+ this . flush_and_error ( error ) ;
551
567
return ;
552
568
}
553
569
@@ -567,10 +583,18 @@ RedisClient.prototype.connection_gone = function (why, error) {
567
583
if ( typeof this . retry_delay !== 'number' ) {
568
584
// Pass individual error through
569
585
if ( this . retry_delay instanceof Error ) {
570
- error = this . retry_delay ;
586
+ error = new CommandError ( this . retry_delay ) ;
587
+ }
588
+ // Attention: there might be the case where there's no error!
589
+ if ( ! error ) {
590
+ error = new Error ( 'Stream connection ended and running command aborted. It might have been processed.' ) ;
591
+ error . code = 'NR_OFFLINE' ;
592
+ }
593
+ // Only emit an error in case that a running command had no callback
594
+ if ( ! this . flush_and_error ( error ) ) {
595
+ error . message = 'Stream connection ended and all running commands aborted. They might have been processed.' ;
596
+ this . emit ( 'error' , error ) ;
571
597
}
572
- this . flush_and_error ( error ) ;
573
- this . emit ( 'error' , error ) ;
574
598
this . end ( false ) ;
575
599
return ;
576
600
}
@@ -595,11 +619,11 @@ RedisClient.prototype.connection_gone = function (why, error) {
595
619
} else if ( this . command_queue . length !== 0 ) {
596
620
error = new Error ( 'Redis connection lost and command aborted in uncertain state. It might have been processed.' ) ;
597
621
error . code = 'UNCERTAIN_STATE' ;
598
- this . flush_and_error ( error , [ 'command_queue' ] ) ;
599
- error . message = 'Redis connection lost and commands aborted in uncertain state. They might have been processed.' ;
600
- // TODO: Reconsider emitting this always, as each running command is handled anyway
601
- // This should likely be removed in v.3. This is different to the broken connection as we'll reconnect here
602
- this . emit ( 'error' , error ) ;
622
+ if ( ! this . flush_and_error ( error , [ 'command_queue' ] ) ) {
623
+ // Only emit if not all commands had a callback that already handled the error
624
+ error . message = 'Redis connection lost and commands aborted in uncertain state. They might have been processed.' ;
625
+ this . emit ( 'error' , error ) ;
626
+ }
603
627
}
604
628
605
629
if ( this . retry_max_delay !== null && this . retry_delay > this . retry_max_delay ) {
@@ -618,6 +642,9 @@ RedisClient.prototype.return_error = function (err) {
618
642
var command_obj = this . command_queue . shift ( ) ;
619
643
if ( command_obj && command_obj . command && command_obj . command . toUpperCase ) {
620
644
err . command = command_obj . command . toUpperCase ( ) ;
645
+ if ( command_obj . args . length ) {
646
+ err . args = command_obj . args ;
647
+ }
621
648
}
622
649
623
650
var match = err . message . match ( utils . err_code ) ;
@@ -786,6 +813,9 @@ function handle_offline_command (self, command_obj) {
786
813
}
787
814
err = new Error ( command + " can't be processed. " + msg ) ;
788
815
err . command = command ;
816
+ if ( command_obj . args . length ) {
817
+ err . args = command_obj . args ;
818
+ }
789
819
err . code = 'NR_OFFLINE' ;
790
820
utils . reply_in_order ( self , callback , err ) ;
791
821
} else {
0 commit comments