Skip to content

Commit 683815d

Browse files
author
Ruben Bridgewater
committed
Refactor pipelining
1 parent 5d12659 commit 683815d

File tree

4 files changed

+47
-54
lines changed

4 files changed

+47
-54
lines changed

index.js

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ function RedisClient (options, stream) {
122122
}
123123
this.command_queue = new Queue(); // Holds sent commands to de-pipeline them
124124
this.offline_queue = new Queue(); // Holds commands issued but not able to be sent
125+
this.pipeline_queue = new Queue(); // Holds all pipelined commands
125126
// ATTENTION: connect_timeout should change in v.3.0 so it does not count towards ending reconnection attempts after x seconds
126127
// This should be done by the retry_strategy. Instead it should only be the timeout for connecting to redis
127128
this.connect_timeout = +options.connect_timeout || 3600000; // 60 * 60 * 1000 ms
@@ -144,8 +145,8 @@ function RedisClient (options, stream) {
144145
this.auth_pass = options.auth_pass || options.password;
145146
this.selected_db = options.db; // Save the selected db here, used when reconnecting
146147
this.old_state = null;
147-
this.send_anyway = false;
148-
this.pipeline = 0;
148+
this.fire_strings = true; // Determine if strings or buffers should be written to the stream
149+
this.pipeline = false;
149150
this.times_connected = 0;
150151
this.options = options;
151152
this.buffers = options.return_buffers || options.detect_buffers;
@@ -374,23 +375,25 @@ RedisClient.prototype.on_ready = function () {
374375
debug('on_ready called ' + this.address + ' id ' + this.connection_id);
375376
this.ready = true;
376377

377-
var cork;
378-
if (!this.stream.cork) {
379-
cork = function (len) {
380-
self.pipeline = len;
381-
self.pipeline_queue = new Queue(len);
382-
};
383-
} else {
384-
cork = function (len) {
385-
self.pipeline = len;
386-
self.pipeline_queue = new Queue(len);
378+
this.cork = function () {
379+
self.pipeline = true;
380+
if (self.stream.cork) {
387381
self.stream.cork();
388-
};
389-
this.uncork = function () {
382+
}
383+
};
384+
this.uncork = function () {
385+
if (self.fire_strings) {
386+
self.write_strings();
387+
} else {
388+
self.write_buffers();
389+
}
390+
self.pipeline = false;
391+
self.fire_strings = true;
392+
if (self.stream.uncork) {
393+
// TODO: Consider using next tick here. See https://github.com/NodeRedis/node_redis/issues/1033
390394
self.stream.uncork();
391-
};
392-
}
393-
this.cork = cork;
395+
}
396+
};
394397

395398
// Restore modal commands from previous connection. The order of the commands is important
396399
if (this.selected_db !== undefined) {
@@ -523,7 +526,8 @@ RedisClient.prototype.connection_gone = function (why, error) {
523526
this.ready = false;
524527
// Deactivate cork to work with the offline queue
525528
this.cork = noop;
526-
this.pipeline = 0;
529+
this.uncork = noop;
530+
this.pipeline = false;
527531

528532
var state = {
529533
monitoring: this.monitoring,
@@ -792,10 +796,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
792796
if (args[i].length > 30000) {
793797
big_data = true;
794798
args_copy[i] = new Buffer(args[i], 'utf8');
795-
if (this.pipeline !== 0) {
796-
this.pipeline += 2;
797-
this.writeDefault = this.writeBuffers;
798-
}
799799
} else {
800800
args_copy[i] = args[i];
801801
}
@@ -813,10 +813,6 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
813813
args_copy[i] = args[i];
814814
buffer_args = true;
815815
big_data = true;
816-
if (this.pipeline !== 0) {
817-
this.pipeline += 2;
818-
this.writeDefault = this.writeBuffers;
819-
}
820816
} else {
821817
this.warn(
822818
'Deprecated: The ' + command.toUpperCase() + ' command contains a argument of type ' + args[i].constructor.name + '.\n' +
@@ -870,6 +866,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
870866
this.write(command_str);
871867
} else {
872868
debug('Send command (' + command_str + ') has Buffer arguments');
869+
this.fire_strings = false;
873870
this.write(command_str);
874871

875872
for (i = 0; i < len; i += 1) {
@@ -887,40 +884,33 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
887884
return !this.should_buffer;
888885
};
889886

890-
RedisClient.prototype.writeDefault = RedisClient.prototype.writeStrings = function (data) {
887+
RedisClient.prototype.write_strings = function () {
891888
var str = '';
892889
for (var command = this.pipeline_queue.shift(); command; command = this.pipeline_queue.shift()) {
893890
// Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long
894891
if (str.length + command.length > 4 * 1024 * 1024) {
895-
this.stream.write(str);
892+
this.should_buffer = !this.stream.write(str);
896893
str = '';
897894
}
898895
str += command;
899896
}
900-
this.should_buffer = !this.stream.write(str + data);
897+
if (str !== '') {
898+
this.should_buffer = !this.stream.write(str);
899+
}
901900
};
902901

903-
RedisClient.prototype.writeBuffers = function (data) {
902+
RedisClient.prototype.write_buffers = function () {
904903
for (var command = this.pipeline_queue.shift(); command; command = this.pipeline_queue.shift()) {
905-
this.stream.write(command);
904+
this.should_buffer = !this.stream.write(command);
906905
}
907-
this.should_buffer = !this.stream.write(data);
908906
};
909907

910908
RedisClient.prototype.write = function (data) {
911-
if (this.pipeline === 0) {
909+
if (this.pipeline === false) {
912910
this.should_buffer = !this.stream.write(data);
913911
return;
914912
}
915-
916-
this.pipeline--;
917-
if (this.pipeline === 0) {
918-
this.writeDefault(data);
919-
return;
920-
}
921-
922913
this.pipeline_queue.push(data);
923-
return;
924914
};
925915

926916
Object.defineProperty(exports, 'debugMode', {

lib/multi.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) {
126126
var len = self.queue.length;
127127
self.errors = [];
128128
self.callback = callback;
129-
self._client.cork(len + 2);
129+
self._client.cork();
130130
self.wants_buffers = new Array(len);
131131
pipeline_transaction_command(self, 'multi', []);
132132
// Drain queue, callback will catch 'QUEUED' or error
@@ -151,7 +151,6 @@ Multi.prototype.exec_transaction = function exec_transaction (callback) {
151151
multi_callback(self, err, replies);
152152
});
153153
self._client.uncork();
154-
self._client.writeDefault = self._client.writeStrings;
155154
return !self._client.should_buffer;
156155
};
157156

@@ -198,7 +197,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
198197
return true;
199198
}
200199
self.results = [];
201-
self._client.cork(len);
200+
self._client.cork();
202201
while (args = self.queue.shift()) {
203202
var command = args[0];
204203
var cb;
@@ -213,9 +212,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
213212
self._client.internal_send_command(command, args[1], cb);
214213
index++;
215214
}
216-
self.queue = new Queue();
217215
self._client.uncork();
218-
self._client.writeDefault = self._client.writeStrings;
219216
return !self._client.should_buffer;
220217
};
221218

test/multi.spec.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,16 @@ describe("The 'multi' method", function () {
125125

126126
describe('when connected', function () {
127127

128-
beforeEach(function (done) {
128+
beforeEach(function () {
129129
client = redis.createClient.apply(null, args);
130-
client.once('connect', done);
131130
});
132131

133132
it('executes a pipelined multi properly in combination with the offline queue', function (done) {
134133
var multi1 = client.multi();
135134
multi1.set('m1', '123');
136135
multi1.get('m1');
137136
multi1.exec(done);
137+
assert.strictEqual(client.offline_queue.length, 4);
138138
});
139139

140140
it('executes a pipelined multi properly after a reconnect in combination with the offline queue', function (done) {
@@ -612,11 +612,17 @@ describe("The 'multi' method", function () {
612612
});
613613

614614
it('emits error once if reconnecting after multi has been executed but not yet returned without callback', function (done) {
615+
// NOTE: If uncork is called async by postponing it to the next tick, this behavior is going to change.
616+
// The command won't be processed anymore two errors are returned instead of one
615617
client.on('error', function (err) {
616618
assert.strictEqual(err.code, 'UNCERTAIN_STATE');
617-
done();
619+
client.get('foo', function (err, res) {
620+
assert.strictEqual(res, 'bar');
621+
done();
622+
});
618623
});
619624

625+
// The commands should still be fired, no matter that the socket is destroyed on the same tick
620626
client.multi().set('foo', 'bar').get('foo').exec();
621627
// Abort connection before the value returned
622628
client.stream.destroy();

test/node_redis.spec.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ describe('The node_redis client', function () {
121121
str += str;
122122
}
123123
var called = false;
124-
var temp = client.writeBuffers.bind(client);
125-
assert(String(client.writeBuffers) !== String(client.writeDefault));
126-
client.writeBuffers = function (data) {
124+
var temp = client.write_buffers.bind(client);
125+
assert(client.fire_strings);
126+
client.write_buffers = function (data) {
127127
called = true;
128128
// To increase write performance for strings the value is converted to a buffer
129-
assert(String(client.writeBuffers) === String(client.writeDefault));
129+
assert(!client.fire_strings);
130130
temp(data);
131131
};
132132
client.multi().set('foo', str).get('foo', function (err, res) {
@@ -136,7 +136,7 @@ describe('The node_redis client', function () {
136136
assert.strictEqual(res[1], str);
137137
done();
138138
});
139-
assert(String(client.writeBuffers) !== String(client.writeDefault));
139+
assert(client.fire_strings);
140140
});
141141
});
142142

0 commit comments

Comments
 (0)