Skip to content

Commit 399a29a

Browse files
author
Ruben Bridgewater
committed
Improve pipeline logic and fix #897
1 parent 1cb158b commit 399a29a

File tree

3 files changed

+77
-88
lines changed

3 files changed

+77
-88
lines changed

index.js

Lines changed: 55 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,24 @@ function RedisClient(stream, options) {
3939
options = JSON.parse(JSON.stringify(options || {}));
4040
var self = this;
4141

42+
this.pipeline = 0;
43+
var cork;
4244
if (!stream.cork) {
43-
this.pipeline = 0;
44-
this.cork = noop;
45-
this.once('ready', function () {
46-
self.cork = function (len) {
47-
self.pipeline = len;
48-
self.pipeline_queue = new Queue(len);
49-
};
50-
});
51-
stream.uncork = noop;
52-
this.write = this.writeStream;
45+
cork = function (len) {
46+
self.pipeline = len;
47+
self.pipeline_queue = new Queue(len);
48+
};
49+
this.uncork = noop;
50+
} else {
51+
cork = function (len) {
52+
self.pipeline = len;
53+
self.pipeline_queue = new Queue(len);
54+
self.stream.cork();
55+
};
5356
}
57+
this.once('ready', function () {
58+
self.cork = cork;
59+
});
5460

5561
this.stream = stream;
5662
this.connection_id = ++connection_id;
@@ -131,8 +137,9 @@ RedisClient.prototype.install_stream_listeners = function() {
131137
});
132138
};
133139

134-
RedisClient.prototype.cork = function (len) {
135-
this.stream.cork();
140+
RedisClient.prototype.cork = noop;
141+
RedisClient.prototype.uncork = function () {
142+
this.stream.uncork();
136143
};
137144

138145
RedisClient.prototype.initialize_retry_vars = function () {
@@ -377,7 +384,6 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
377384
return;
378385
}
379386

380-
var self = this;
381387
var obj = {};
382388
var lines = res.toString().split('\r\n');
383389
var i = 0;
@@ -422,9 +428,9 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
422428
retry_time = 1000;
423429
}
424430
debug('Redis server still loading, trying again in ' + retry_time);
425-
setTimeout(function () {
431+
setTimeout(function (self) {
426432
self.ready_check();
427-
}, retry_time);
433+
}, retry_time, this);
428434
}
429435
};
430436

@@ -441,12 +447,13 @@ RedisClient.prototype.ready_check = function () {
441447
};
442448

443449
RedisClient.prototype.send_offline_queue = function () {
444-
var command_obj, buffered_writes = 0;
450+
var command_obj;
445451

446452
while (command_obj = this.offline_queue.shift()) {
447453
debug('Sending offline command: ' + command_obj.command);
448-
buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
454+
this.send_command(command_obj.command, command_obj.args, command_obj.callback);
449455
}
456+
this.drain();
450457
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
451458
this.offline_queue = new Queue();
452459
};
@@ -543,7 +550,7 @@ RedisClient.prototype.return_error = function (err) {
543550
err.code = match[1];
544551
}
545552

546-
this.emit_drain_idle(queue_len);
553+
this.emit_idle(queue_len);
547554

548555
if (command_obj.callback) {
549556
command_obj.callback(err);
@@ -557,16 +564,12 @@ RedisClient.prototype.drain = function () {
557564
this.should_buffer = false;
558565
};
559566

560-
RedisClient.prototype.emit_drain_idle = function (queue_len) {
567+
RedisClient.prototype.emit_idle = function (queue_len) {
561568
if (this.pub_sub_mode === false && queue_len === 0) {
562569
// Free the queue capacity memory by using a new queue
563570
this.command_queue = new Queue();
564571
this.emit('idle');
565572
}
566-
567-
if (this.should_buffer && queue_len <= this.command_queue_low_water) {
568-
this.drain();
569-
}
570573
};
571574

572575
RedisClient.prototype.return_reply = function (reply) {
@@ -587,7 +590,7 @@ RedisClient.prototype.return_reply = function (reply) {
587590

588591
queue_len = this.command_queue.length;
589592

590-
this.emit_drain_idle(queue_len);
593+
this.emit_idle(queue_len);
591594

592595
if (command_obj && !command_obj.sub_command) {
593596
if (typeof command_obj.callback === 'function') {
@@ -640,7 +643,7 @@ RedisClient.prototype.return_reply = function (reply) {
640643
}
641644
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
642645
else if (this.monitoring) {
643-
if (Buffer.isBuffer(reply)) {
646+
if (typeof reply !== 'string') {
644647
reply = reply.toString();
645648
}
646649
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
@@ -662,8 +665,8 @@ RedisClient.prototype.send_command = function (command, args, callback) {
662665
var arg, command_obj, i, err,
663666
stream = this.stream,
664667
command_str = '',
665-
buffered_writes = 0,
666668
buffer_args = false,
669+
big_data = false,
667670
buffer = this.options.return_buffers;
668671

669672
if (args === undefined) {
@@ -695,7 +698,12 @@ RedisClient.prototype.send_command = function (command, args, callback) {
695698
for (i = 0; i < args.length; i += 1) {
696699
if (Buffer.isBuffer(args[i])) {
697700
buffer_args = true;
698-
break;
701+
} else if (typeof args[i] !== 'string') {
702+
arg = String(arg);
703+
// 30000 seemed to be a good value to switch to buffers after testing this with and checking the pros and cons
704+
} else if (args[i].length > 30000) {
705+
big_data = true;
706+
args[i] = new Buffer(args[i]);
699707
}
700708
}
701709
if (this.options.detect_buffers) {
@@ -741,74 +749,53 @@ RedisClient.prototype.send_command = function (command, args, callback) {
741749

742750
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
743751
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
744-
745752
command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n';
746753

747-
if (!buffer_args) { // Build up a string and send entire command in one write
754+
if (!buffer_args && !big_data) { // Build up a string and send entire command in one write
748755
for (i = 0; i < args.length; i += 1) {
749-
arg = args[i];
750-
if (typeof arg !== 'string') {
751-
arg = String(arg);
752-
}
756+
arg = String(args[i]);
753757
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
754758
}
755759
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
756-
buffered_writes += !this.write(command_str);
760+
this.write(command_str);
757761
} else {
758762
debug('Send command (' + command_str + ') has Buffer arguments');
759-
buffered_writes += !this.write(command_str);
763+
this.write(command_str);
760764

761765
for (i = 0; i < args.length; i += 1) {
762766
arg = args[i];
763-
if (Buffer.isBuffer(arg)) {
764-
if (arg.length === 0) {
765-
debug('send_command: using empty string for 0 length buffer');
766-
buffered_writes += !this.write('$0\r\n\r\n');
767-
} else {
768-
buffered_writes += !this.write('$' + arg.length + '\r\n');
769-
buffered_writes += !this.write(arg);
770-
buffered_writes += !this.write('\r\n');
771-
debug('send_command: buffer send ' + arg.length + ' bytes');
772-
}
767+
if (!Buffer.isBuffer(arg)) {
768+
arg = String(arg);
769+
this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
773770
} else {
774-
if (typeof arg !== 'string') {
775-
arg = String(arg);
776-
}
777-
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
778-
buffered_writes += !this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
771+
this.write('$' + arg.length + '\r\n');
772+
this.write(arg);
773+
this.write('\r\n');
779774
}
775+
debug('send_command: buffer send ' + arg.length + ' bytes');
780776
}
781777
}
782-
if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) {
783-
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
784-
this.should_buffer = true;
785-
}
786778
return !this.should_buffer;
787779
};
788780

789781
RedisClient.prototype.write = function (data) {
790-
return this.stream.write(data);
791-
};
792-
793-
RedisClient.prototype.writeStream = function (data) {
794-
var nr = 0;
795-
796782
if (this.pipeline === 0) {
797-
return this.stream.write(data);
783+
this.should_buffer = !this.stream.write(data);
784+
return;
798785
}
799786

800787
this.pipeline--;
801788
if (this.pipeline === 0) {
802-
var command;
789+
var command, str = '';
803790
while (command = this.pipeline_queue.shift()) {
804-
nr += !this.stream.write(command);
791+
str += command;
805792
}
806-
nr += !this.stream.write(data);
807-
return !nr;
793+
this.should_buffer = !this.stream.write(str + data);
794+
return;
808795
}
809796

810797
this.pipeline_queue.push(data);
811-
return true;
798+
return;
812799
};
813800

814801
RedisClient.prototype.pub_sub_command = function (command_obj) {
@@ -1102,7 +1089,7 @@ Multi.prototype.exec_transaction = function (callback) {
11021089
this.send_command(command, args, index, cb);
11031090
}
11041091

1105-
this._client.stream.uncork();
1092+
this._client.uncork();
11061093
return this._client.send_command('exec', [], function(err, replies) {
11071094
self.execute_callback(err, replies);
11081095
});
@@ -1210,7 +1197,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
12101197
this._client.send_command(command, args, cb);
12111198
index++;
12121199
}
1213-
this._client.stream.uncork();
1200+
this._client.uncork();
12141201
return this._client.should_buffer;
12151202
};
12161203

test/batch.spec.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ describe("The 'batch' method", function () {
162162
});
163163
});
164164

165-
it('handles batchple operations being applied to a set', function (done) {
165+
it('handles multiple operations being applied to a set', function (done) {
166166
client.sadd("some set", "mem 1");
167167
client.sadd(["some set", "mem 2"]);
168168
client.sadd("some set", "mem 3");
@@ -189,7 +189,7 @@ describe("The 'batch' method", function () {
189189
});
190190
});
191191

192-
it('allows batchple operations to be performed using constructor with all kinds of syntax', function (done) {
192+
it('allows multiple operations to be performed using constructor with all kinds of syntax', function (done) {
193193
var now = Date.now();
194194
var arr = ["batchhmset", "batchbar", "batchbaz"];
195195
var arr2 = ['some manner of key', 'otherTypes'];
@@ -253,7 +253,7 @@ describe("The 'batch' method", function () {
253253
assert.strictEqual(buffering, true);
254254
});
255255

256-
it('allows batchple operations to be performed using a chaining API', function (done) {
256+
it('allows multiple operations to be performed using a chaining API', function (done) {
257257
client.batch()
258258
.mset('some', '10', 'keys', '20')
259259
.incr('some')
@@ -270,7 +270,7 @@ describe("The 'batch' method", function () {
270270
});
271271
});
272272

273-
it('allows batchple commands to work the same as normal to be performed using a chaining API', function (done) {
273+
it('allows multiple commands to work the same as normal to be performed using a chaining API', function (done) {
274274
client.batch()
275275
.mset(['some', '10', 'keys', '20'])
276276
.incr(['some', helper.isNumber(11)])
@@ -287,7 +287,7 @@ describe("The 'batch' method", function () {
287287
});
288288
});
289289

290-
it('allows batchple commands to work the same as normal to be performed using a chaining API promisified', function () {
290+
it('allows multiple commands to work the same as normal to be performed using a chaining API promisified', function () {
291291
return client.batch()
292292
.mset(['some', '10', 'keys', '20'])
293293
.incr(['some', helper.isNumber(11)])
@@ -303,7 +303,7 @@ describe("The 'batch' method", function () {
303303
});
304304
});
305305

306-
it('allows an array to be provided indicating batchple operations to perform', function (done) {
306+
it('allows an array to be provided indicating multiple operations to perform', function (done) {
307307
// test nested batch-bulk replies with nulls.
308308
client.batch([
309309
["mget", ["batchfoo", "some", "random value", "keys"]],

test/node_redis.spec.js

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ describe("The node_redis client", function () {
4343
});
4444
});
4545

46+
describe('big data', function () {
47+
48+
// Check if the fast mode for big strings is working correct
49+
it('safe strings that are bigger than 30000 characters', function(done) {
50+
var str = 'foo ಠ_ಠ bar ';
51+
while (str.length < 111111) {
52+
str += str;
53+
}
54+
client.set('foo', str);
55+
client.get('foo', function (err, res) {
56+
assert.strictEqual(res, str);
57+
done();
58+
});
59+
});
60+
});
61+
4662
describe("send_command", function () {
4763

4864
it("omitting args should be fine in some cases", function (done) {
@@ -470,20 +486,6 @@ describe("The node_redis client", function () {
470486

471487
describe('enable_offline_queue', function () {
472488
describe('true', function () {
473-
it("should emit drain after info command and nothing to buffer", function (done) {
474-
client = redis.createClient({
475-
parser: parser
476-
});
477-
client.set('foo', 'bar');
478-
client.get('foo', function () {
479-
assert(!client.should_buffer);
480-
setTimeout(done, 25);
481-
});
482-
client.on('drain', function() {
483-
assert(client.offline_queue.length === 2);
484-
});
485-
});
486-
487489
it("should emit drain if offline queue is flushed and nothing to buffer", function (done) {
488490
client = redis.createClient({
489491
parser: parser,

0 commit comments

Comments
 (0)