Skip to content

Commit f8c245e

Browse files
author
Ruben Bridgewater
committed
Add .batch with better pipeline implementation
1 parent 146d881 commit f8c245e

File tree

5 files changed

+188
-22
lines changed

5 files changed

+188
-22
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ channel name as `channel` and the new count of subscriptions for this client as
403403

404404
`MULTI` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by
405405
Redis. The interface in `node_redis` is to return an individual `Multi` object by calling `client.multi()`.
406+
If any command fails to queue, all commands are rolled back and none is going to be executed (For further information look at [transactions](http://redis.io/topics/transactions)).
406407

407408
```js
408409
var redis = require("./index"),
@@ -485,6 +486,12 @@ client.multi([
485486
console.log(replies);
486487
});
487488
```
489+
## client.batch([commands])
490+
491+
`BATCH` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by
492+
Redis. The interface in `node_redis` is to return an individual `Batch` object by calling `client.batch()`.
493+
The only difference between .batch and .multi is that no transaction is going to be used.
494+
Be aware that the errors are - just like in multi statements - in the result. Otherwise both, errors and results could be returned at the same time.
488495

489496
## Monitor mode
490497

changelog.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Changelog
22
=========
33

4-
## v.2.2.0 - xx, 2015
4+
## v.2.2.0 - 07, 2015 - The peregrino falcon
55

66
Features
77

@@ -11,6 +11,13 @@ Features
1111
- exchanging built in queue with [Petka Antonov's](@petkaantonov) [double-ended queue](https://github.com/petkaantonov/deque)
1212
- prevent polymorphism
1313
- optimize statements
14+
- Added .batch command, similar to multi but without transaction (@BridgeAR)
15+
- Improved pipelining to minimize the [RTT](http://redis.io/topics/pipelining) further (@BridgeAR)
16+
17+
This release is mainly focusing on further speed improvements and we can proudly say that node_redis is very likely outperforming any other node redis client.
18+
19+
If you do not rely on transactions but want to reduze the RTT you can use .batch from now on. It'll behave just the same as .multi but it does not have any transaction and therefor won't roll back any failed commands.
20+
Both .multi and .batch are from now on going to fire the commands in bulk without doing any other operation in between.
1421

1522
Bugfixes
1623

index.js

Lines changed: 113 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ function RedisClient(stream, options) {
8585
this.parser_module = null;
8686
this.selected_db = null; // save the selected db here, used when reconnecting
8787
this.old_state = null;
88+
this.pipeline = 0;
89+
this.pipeline_queue = new Queue();
8890

8991
this.install_stream_listeners();
9092
events.EventEmitter.call(this);
@@ -648,6 +650,26 @@ RedisClient.prototype.return_reply = function (reply) {
648650
}
649651
};
650652

653+
RedisClient.prototype.writeStream = function (data) {
654+
var stream = this.stream;
655+
var nr = 0;
656+
657+
// Do not use a pipeline
658+
if (this.pipeline === 0) {
659+
return !stream.write(data);
660+
}
661+
this.pipeline--;
662+
this.pipeline_queue.push(data);
663+
if (this.pipeline === 0) {
664+
var len = this.pipeline_queue.length;
665+
while (len--) {
666+
nr += !stream.write(this.pipeline_queue.shift());
667+
}
668+
return !nr;
669+
}
670+
return true;
671+
};
672+
651673
RedisClient.prototype.send_command = function (command, args, callback) {
652674
var arg, command_obj, i, err,
653675
stream = this.stream,
@@ -753,29 +775,29 @@ RedisClient.prototype.send_command = function (command, args, callback) {
753775
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
754776
}
755777
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
756-
buffered_writes += !stream.write(command_str);
778+
buffered_writes += !this.writeStream(command_str);
757779
} else {
758780
debug('Send command (' + command_str + ') has Buffer arguments');
759-
buffered_writes += !stream.write(command_str);
781+
buffered_writes += !this.writeStream(command_str);
760782

761783
for (i = 0; i < args.length; i += 1) {
762784
arg = args[i];
763785
if (Buffer.isBuffer(arg)) {
764786
if (arg.length === 0) {
765787
debug('send_command: using empty string for 0 length buffer');
766-
buffered_writes += !stream.write('$0\r\n\r\n');
788+
buffered_writes += !this.writeStream('$0\r\n\r\n');
767789
} else {
768-
buffered_writes += !stream.write('$' + arg.length + '\r\n');
769-
buffered_writes += !stream.write(arg);
770-
buffered_writes += !stream.write('\r\n');
790+
buffered_writes += !this.writeStream('$' + arg.length + '\r\n');
791+
buffered_writes += !this.writeStream(arg);
792+
buffered_writes += !this.writeStream('\r\n');
771793
debug('send_command: buffer send ' + arg.length + ' bytes');
772794
}
773795
} else {
774796
if (typeof arg !== 'string') {
775797
arg = String(arg);
776798
}
777799
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
778-
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
800+
buffered_writes += !this.writeStream('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
779801
}
780802
}
781803
}
@@ -839,9 +861,15 @@ RedisClient.prototype.end = function (flush) {
839861
return this.stream.destroySoon();
840862
};
841863

842-
function Multi(client, args) {
864+
function Multi(client, args, transaction) {
843865
this._client = client;
844-
this.queue = [['multi']];
866+
this.queue = [];
867+
if (transaction) {
868+
this.exec = this.exec_transaction;
869+
this.EXEC = this.exec_transaction;
870+
this.queue.push(['multi']);
871+
}
872+
this._client.pipeline_queue.clear();
845873
var command, tmp_args;
846874
if (Array.isArray(args)) {
847875
while (tmp_args = args.shift()) {
@@ -857,7 +885,11 @@ function Multi(client, args) {
857885
}
858886

859887
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function (args) {
860-
return new Multi(this, args);
888+
return new Multi(this, args, true);
889+
};
890+
891+
RedisClient.prototype.batch = RedisClient.prototype.BATCH = function (args) {
892+
return new Multi(this, args, false);
861893
};
862894

863895
commands.forEach(function (fullCommand) {
@@ -1025,25 +1057,35 @@ Multi.prototype.send_command = function (command, args, index, cb) {
10251057
});
10261058
};
10271059

1028-
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
1060+
Multi.prototype.exec_transaction = function (callback) {
10291061
var self = this;
1062+
var len = this.queue.length;
1063+
var cb;
10301064
this.errors = [];
10311065
this.callback = callback;
1032-
this.wants_buffers = new Array(this.queue.length);
1066+
this._client.pipeline = len;
1067+
this.wants_buffers = new Array(len);
10331068
// drain queue, callback will catch 'QUEUED' or error
1034-
for (var index = 0; index < this.queue.length; index++) {
1069+
for (var index = 0; index < len; index++) {
10351070
var args = this.queue[index].slice(0);
10361071
var command = args.shift();
1037-
var cb;
10381072
if (typeof args[args.length - 1] === 'function') {
10391073
cb = args.pop();
1074+
} else {
1075+
cb = undefined;
10401076
}
10411077
// Keep track of who wants buffer responses:
1042-
this.wants_buffers[index] = false;
1043-
for (var i = 0; i < args.length; i += 1) {
1044-
if (Buffer.isBuffer(args[i])) {
1045-
this.wants_buffers[index] = true;
1046-
break;
1078+
if (this._client.options.return_buffers) {
1079+
this.wants_buffers[index] = true;
1080+
} else if (!this._client.options.detect_buffers) {
1081+
this.wants_buffers[index] = false;
1082+
} else {
1083+
this.wants_buffers[index] = false;
1084+
for (var i = 0; i < args.length; i += 1) {
1085+
if (Buffer.isBuffer(args[i])) {
1086+
this.wants_buffers[index] = true;
1087+
break;
1088+
}
10471089
}
10481090
}
10491091
this.send_command(command, args, index, cb);
@@ -1107,6 +1149,58 @@ Multi.prototype.execute_callback = function (err, replies) {
11071149
}
11081150
};
11091151

1152+
Multi.prototype.callback = function (cb, command, i) {
1153+
var self = this;
1154+
return function (err, res) {
1155+
if (err) {
1156+
self.results[i] = err;
1157+
} else {
1158+
self.results[i] = res;
1159+
}
1160+
if (cb) {
1161+
cb(err, res);
1162+
}
1163+
// Do not emit an error here. Otherwise each error would result in one emit.
1164+
// The errors will be returned in the result anyway
1165+
};
1166+
};
1167+
1168+
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
1169+
var len = this.queue.length;
1170+
var self = this;
1171+
var index = 0;
1172+
var args;
1173+
if (len === 0) {
1174+
if (callback) {
1175+
callback(null, []);
1176+
}
1177+
return false;
1178+
}
1179+
this.results = new Array(len);
1180+
this._client.pipeline = len;
1181+
var lastCallback = function (cb) {
1182+
return function (err, res) {
1183+
cb(err, res);
1184+
callback(null, self.results);
1185+
};
1186+
};
1187+
while (args = this.queue.shift()) {
1188+
var command = args.shift();
1189+
var cb;
1190+
if (typeof args[args.length - 1] === 'function') {
1191+
cb = this.callback(args.pop(), command, index);
1192+
} else {
1193+
cb = this.callback(undefined, command, index);
1194+
}
1195+
if (callback && index === len - 1) {
1196+
cb = lastCallback(cb);
1197+
}
1198+
this._client.send_command(command, args, cb);
1199+
index++;
1200+
}
1201+
return this._client.should_buffer;
1202+
};
1203+
11101204
var createClient_unix = function (path, options){
11111205
var cnxOptions = {
11121206
path: path

test/commands/multi.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ describe("The 'multi' method", function () {
216216
.mset('some', '10', 'keys', '20')
217217
.incr('some')
218218
.incr('keys')
219-
.mget('some', 'keys')
219+
.mget('some', ['keys'])
220220
.exec(function (err, replies) {
221221
assert.strictEqual(null, err);
222222
assert.equal('OK', replies[0]);

test/detect_buffers.spec.js

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,27 @@ describe("detect_buffers", function () {
7979
});
8080
});
8181

82+
describe('batch.hget', function () {
83+
it('can interleave string and buffer results', function (done) {
84+
client.batch()
85+
.hget("hash key 2", "key 1")
86+
.hget(new Buffer("hash key 2"), "key 1")
87+
.hget("hash key 2", new Buffer("key 2"))
88+
.hget("hash key 2", "key 2")
89+
.exec(function (err, reply) {
90+
assert.strictEqual(true, Array.isArray(reply));
91+
assert.strictEqual(4, reply.length);
92+
assert.strictEqual("val 1", reply[0]);
93+
assert.strictEqual(true, Buffer.isBuffer(reply[1]));
94+
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[1].inspect());
95+
assert.strictEqual(true, Buffer.isBuffer(reply[2]));
96+
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[2].inspect());
97+
assert.strictEqual("val 2", reply[3]);
98+
return done(err);
99+
});
100+
});
101+
});
102+
82103
describe('hmget', function () {
83104
describe('first argument is a string', function () {
84105
it('returns strings for keys requested', function (done) {
@@ -149,6 +170,19 @@ describe("detect_buffers", function () {
149170
return done(err);
150171
});
151172
});
173+
174+
it("returns buffers for keys requested in .batch", function (done) {
175+
client.batch().hmget(new Buffer("hash key 2"), "key 1", "key 2").exec(function (err, reply) {
176+
assert.strictEqual(true, Array.isArray(reply));
177+
assert.strictEqual(1, reply.length);
178+
assert.strictEqual(2, reply[0].length);
179+
assert.strictEqual(true, Buffer.isBuffer(reply[0][0]));
180+
assert.strictEqual(true, Buffer.isBuffer(reply[0][1]));
181+
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0][0].inspect());
182+
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0][1].inspect());
183+
return done(err);
184+
});
185+
});
152186
});
153187
});
154188

@@ -174,6 +208,17 @@ describe("detect_buffers", function () {
174208
return done(err);
175209
});
176210
});
211+
212+
it('returns string values when executed in .batch', function (done) {
213+
client.batch().hgetall("hash key 2").exec(function (err, reply) {
214+
assert.strictEqual(1, reply.length);
215+
assert.strictEqual("object", typeof reply[0]);
216+
assert.strictEqual(2, Object.keys(reply[0]).length);
217+
assert.strictEqual("val 1", reply[0]["key 1"]);
218+
assert.strictEqual("val 2", reply[0]["key 2"]);
219+
return done(err);
220+
});
221+
});
177222
});
178223

179224
describe('first argument is a buffer', function () {
@@ -193,7 +238,20 @@ describe("detect_buffers", function () {
193238
it('returns buffer values when executed in transaction', function (done) {
194239
client.multi().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
195240
assert.strictEqual(1, reply.length);
196-
assert.strictEqual("object", typeof reply);
241+
assert.strictEqual("object", typeof reply[0]);
242+
assert.strictEqual(2, Object.keys(reply[0]).length);
243+
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
244+
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));
245+
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
246+
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
247+
return done(err);
248+
});
249+
});
250+
251+
it('returns buffer values when executed in .batch', function (done) {
252+
client.batch().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
253+
assert.strictEqual(1, reply.length);
254+
assert.strictEqual("object", typeof reply[0]);
197255
assert.strictEqual(2, Object.keys(reply[0]).length);
198256
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
199257
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));

0 commit comments

Comments
 (0)