@@ -55,6 +55,13 @@ function RedisClient(stream, options) {
55
55
}
56
56
}
57
57
}
58
+ this . options . return_buffers = ! ! this . options . return_buffers ;
59
+ this . options . detect_buffers = ! ! this . options . detect_buffers ;
60
+ // Override the detect_buffers setting if return_buffers is active and print a warning
61
+ if ( this . options . return_buffers && this . options . detect_buffers ) {
62
+ console . warn ( '>> WARNING: You activated return_buffers and detect_buffers at the same time. The return value is always going to be a buffer.' ) ;
63
+ this . options . detect_buffers = false ;
64
+ }
58
65
this . should_buffer = false ;
59
66
this . command_queue_high_water = options . command_queue_high_water || 1000 ;
60
67
this . command_queue_low_water = options . command_queue_low_water || 0 ;
@@ -433,7 +440,7 @@ RedisClient.prototype.send_offline_queue = function () {
433
440
this . offline_queue = new Queue ( ) ;
434
441
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
435
442
436
- if ( ! buffered_writes ) {
443
+ if ( buffered_writes === 0 ) {
437
444
this . should_buffer = false ;
438
445
this . emit ( 'drain' ) ;
439
446
}
@@ -531,21 +538,25 @@ RedisClient.prototype.return_error = function (err) {
531
538
err . code = match [ 1 ] ;
532
539
}
533
540
541
+ this . emit_drain_idle ( queue_len ) ;
542
+
543
+ if ( command_obj . callback ) {
544
+ command_obj . callback ( err ) ;
545
+ } else {
546
+ this . emit ( 'error' , err ) ;
547
+ }
548
+ } ;
549
+
550
+ RedisClient . prototype . emit_drain_idle = function ( queue_len ) {
534
551
if ( this . pub_sub_mode === false && queue_len === 0 ) {
535
- this . command_queue = new Queue ( ) ;
552
+ this . command_queue . clear ( ) ;
536
553
this . emit ( 'idle' ) ;
537
554
}
538
555
539
556
if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
540
557
this . emit ( 'drain' ) ;
541
558
this . should_buffer = false ;
542
559
}
543
-
544
- if ( command_obj . callback ) {
545
- command_obj . callback ( err ) ;
546
- } else {
547
- this . emit ( 'error' , err ) ;
548
- }
549
560
} ;
550
561
551
562
RedisClient . prototype . return_reply = function ( reply ) {
@@ -566,37 +577,29 @@ RedisClient.prototype.return_reply = function (reply) {
566
577
567
578
queue_len = this . command_queue . length ;
568
579
569
- if ( this . pub_sub_mode === false && queue_len === 0 ) {
570
- this . command_queue = new Queue ( ) ; // explicitly reclaim storage from old Queue
571
- this . emit ( 'idle' ) ;
572
- }
573
- if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
574
- this . emit ( 'drain' ) ;
575
- this . should_buffer = false ;
576
- }
580
+ this . emit_drain_idle ( queue_len ) ;
577
581
578
582
if ( command_obj && ! command_obj . sub_command ) {
579
583
if ( typeof command_obj . callback === 'function' ) {
580
584
if ( 'exec' !== command_obj . command ) {
581
- if ( this . options . detect_buffers && command_obj . buffer_args === false ) {
585
+ if ( command_obj . buffer_args === false ) {
582
586
// If detect_buffers option was specified, then the reply from the parser will be Buffers.
583
587
// If this command did not use Buffer arguments, then convert the reply to Strings here.
584
588
reply = utils . reply_to_strings ( reply ) ;
585
589
}
586
590
587
591
// TODO - confusing and error-prone that hgetall is special cased in two places
588
- if ( reply && 'hgetall' === command_obj . command ) {
592
+ if ( 'hgetall' === command_obj . command ) {
589
593
reply = utils . reply_to_object ( reply ) ;
590
594
}
591
595
}
592
-
593
596
command_obj . callback ( null , reply ) ;
594
597
} else {
595
598
debug ( 'No callback for reply' ) ;
596
599
}
597
600
} else if ( this . pub_sub_mode || command_obj && command_obj . sub_command ) {
598
601
if ( Array . isArray ( reply ) ) {
599
- if ( ! this . options . return_buffers && ( ! command_obj || this . options . detect_buffers && command_obj . buffer_args === false ) ) {
602
+ if ( ! command_obj || command_obj . buffer_args === false ) {
600
603
reply = utils . reply_to_strings ( reply ) ;
601
604
}
602
605
type = reply [ 0 ] . toString ( ) ;
@@ -620,11 +623,9 @@ RedisClient.prototype.return_reply = function (reply) {
620
623
this . emit ( type , reply [ 1 ] , reply [ 2 ] ) ; // channel, count
621
624
} else {
622
625
this . emit ( 'error' , new Error ( 'subscriptions are active but got unknown reply type ' + type ) ) ;
623
- return ;
624
626
}
625
627
} else if ( ! this . closing ) {
626
628
this . emit ( 'error' , new Error ( 'subscriptions are active but got an invalid reply: ' + reply ) ) ;
627
- return ;
628
629
}
629
630
}
630
631
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
@@ -648,7 +649,12 @@ RedisClient.prototype.return_reply = function (reply) {
648
649
} ;
649
650
650
651
RedisClient . prototype . send_command = function ( command , args , callback ) {
651
- var arg , command_obj , i , elem_count , buffer_args , stream = this . stream , command_str = '' , buffered_writes = 0 , err ;
652
+ var arg , command_obj , i , err ,
653
+ stream = this . stream ,
654
+ command_str = '' ,
655
+ buffered_writes = 0 ,
656
+ buffer_args = false ,
657
+ buffer = this . options . return_buffers ;
652
658
653
659
if ( args === undefined ) {
654
660
args = [ ] ;
@@ -660,7 +666,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
660
666
}
661
667
}
662
668
663
- if ( process . domain && callback ) {
669
+ if ( callback && process . domain ) {
664
670
callback = process . domain . bind ( callback ) ;
665
671
}
666
672
@@ -678,15 +684,17 @@ RedisClient.prototype.send_command = function (command, args, callback) {
678
684
}
679
685
}
680
686
681
- buffer_args = false ;
682
687
for ( i = 0 ; i < args . length ; i += 1 ) {
683
688
if ( Buffer . isBuffer ( args [ i ] ) ) {
684
689
buffer_args = true ;
685
690
break ;
686
691
}
687
692
}
693
+ if ( this . options . detect_buffers ) {
694
+ buffer = buffer_args ;
695
+ }
688
696
689
- command_obj = new Command ( command , args , false , buffer_args , callback ) ;
697
+ command_obj = new Command ( command , args , false , buffer , callback ) ;
690
698
691
699
if ( ! this . ready && ! this . send_anyway || ! stream . writable ) {
692
700
if ( this . closing || ! this . enable_offline_queue ) {
@@ -725,16 +733,14 @@ RedisClient.prototype.send_command = function (command, args, callback) {
725
733
this . command_queue . push ( command_obj ) ;
726
734
this . commands_sent += 1 ;
727
735
728
- elem_count = args . length + 1 ;
729
-
730
736
if ( typeof this . options . rename_commands !== 'undefined' && this . options . rename_commands [ command ] ) {
731
737
command = this . options . rename_commands [ command ] ;
732
738
}
733
739
734
740
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
735
741
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
736
742
737
- command_str = '*' + elem_count + '\r\n$' + command . length + '\r\n' + command + '\r\n' ;
743
+ command_str = '*' + ( args . length + 1 ) + '\r\n$' + command . length + '\r\n' + command + '\r\n' ;
738
744
739
745
if ( ! buffer_args ) { // Build up a string and send entire command in one write
740
746
for ( i = 0 ; i < args . length ; i += 1 ) {
@@ -752,10 +758,6 @@ RedisClient.prototype.send_command = function (command, args, callback) {
752
758
753
759
for ( i = 0 ; i < args . length ; i += 1 ) {
754
760
arg = args [ i ] ;
755
- if ( ! ( Buffer . isBuffer ( arg ) || typeof arg === 'string' ) ) {
756
- arg = String ( arg ) ;
757
- }
758
-
759
761
if ( Buffer . isBuffer ( arg ) ) {
760
762
if ( arg . length === 0 ) {
761
763
debug ( 'send_command: using empty string for 0 length buffer' ) ;
@@ -767,13 +769,16 @@ RedisClient.prototype.send_command = function (command, args, callback) {
767
769
debug ( 'send_command: buffer send ' + arg . length + ' bytes' ) ;
768
770
}
769
771
} else {
772
+ if ( typeof arg !== 'string' ) {
773
+ arg = String ( arg ) ;
774
+ }
770
775
debug ( 'send_command: string send ' + Buffer . byteLength ( arg ) + ' bytes: ' + arg ) ;
771
776
buffered_writes += ! stream . write ( '$' + Buffer . byteLength ( arg ) + '\r\n' + arg + '\r\n' ) ;
772
777
}
773
778
}
774
779
}
775
- debug ( 'send_command buffered_writes: ' + buffered_writes , ' should_buffer: ' + this . should_buffer ) ;
776
- if ( buffered_writes || this . command_queue . length >= this . command_queue_high_water ) {
780
+ if ( buffered_writes !== 0 || this . command_queue . length >= this . command_queue_high_water ) {
781
+ debug ( 'send_command buffered_writes: ' + buffered_writes , ' should_buffer: ' + this . should_buffer ) ;
777
782
this . should_buffer = true ;
778
783
}
779
784
return ! this . should_buffer ;
@@ -1077,7 +1082,7 @@ Multi.prototype.execute_callback = function (err, replies) {
1077
1082
}
1078
1083
replies [ i ] . command = args [ 0 ] . toUpperCase ( ) ;
1079
1084
} else if ( replies [ i ] ) {
1080
- if ( this . _client . options . detect_buffers && this . wants_buffers [ i + 1 ] === false ) {
1085
+ if ( this . wants_buffers [ i + 1 ] === false ) {
1081
1086
replies [ i ] = utils . reply_to_strings ( replies [ i ] ) ;
1082
1087
}
1083
1088
if ( args [ 0 ] === 'hgetall' ) {
0 commit comments