Skip to content

Commit 0424cb0

Browse files
author
Ruben Bridgewater
committed
Move pub sub command into individual commands and use call_on_write
1 parent 683815d commit 0424cb0

File tree

5 files changed

+280
-79
lines changed

5 files changed

+280
-79
lines changed

index.js

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ RedisClient.prototype.ready_check = function () {
484484
RedisClient.prototype.send_offline_queue = function () {
485485
for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) {
486486
debug('Sending offline command: ' + command_obj.command);
487-
this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback);
487+
this.internal_send_command(command_obj.command, command_obj.args, command_obj.callback, command_obj.call_on_write);
488488
}
489489
this.drain();
490490
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
@@ -771,8 +771,10 @@ function handle_offline_command (self, command_obj) {
771771
self.should_buffer = true;
772772
}
773773

774-
RedisClient.prototype.internal_send_command = function (command, args, callback) {
775-
var arg, prefix_keys;
774+
// Do not call internal_send_command directly, if you are not absolutly certain it handles everything properly
775+
// e.g. monitor / info does not work with internal_send_command only
776+
RedisClient.prototype.internal_send_command = function (command, args, callback, call_on_write) {
777+
var arg, prefix_keys, command_obj;
776778
var i = 0;
777779
var command_str = '';
778780
var len = args.length;
@@ -786,7 +788,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
786788

787789
if (this.ready === false || this.stream.writable === false) {
788790
// Handle offline commands right away
789-
handle_offline_command(this, new OfflineCommand(command, args, callback));
791+
handle_offline_command(this, new OfflineCommand(command, args, callback, call_on_write));
790792
return false; // Indicate buffering
791793
}
792794

@@ -834,15 +836,7 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
834836
}
835837
}
836838
args = null;
837-
var command_obj = new Command(command, args_copy, callback);
838-
command_obj.buffer_args = buffer_args;
839-
840-
if (SUBSCRIBE_COMMANDS[command] && this.pub_sub_mode === 0) {
841-
// If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
842-
// Deactivation of the pub sub mode happens in the result handler
843-
this.pub_sub_mode = this.command_queue.length + 1;
844-
}
845-
this.command_queue.push(command_obj);
839+
command_obj = new Command(command, args_copy, buffer_args, callback);
846840

847841
if (this.options.prefix) {
848842
prefix_keys = commands.getKeyIndexes(command, args_copy);
@@ -881,6 +875,9 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
881875
debug('send_command: buffer send ' + arg.length + ' bytes');
882876
}
883877
}
878+
if (call_on_write) {
879+
call_on_write();
880+
}
884881
return !this.should_buffer;
885882
};
886883

lib/command.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@
22

33
// This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
44
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
5-
function Command (command, args, callback) {
5+
function Command (command, args, buffer_args, callback) {
66
this.command = command;
77
this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub
8-
this.buffer_args = false;
8+
this.buffer_args = buffer_args;
99
this.callback = callback;
1010
this.sub_commands_left = args.length;
1111
}
1212

13-
function OfflineCommand (command, args, callback) {
13+
function OfflineCommand (command, args, callback, call_on_write) {
1414
this.command = command;
1515
this.args = args;
1616
this.callback = callback;
17+
this.call_on_write = call_on_write;
1718
}
1819

1920
module.exports = {

lib/individualCommands.js

Lines changed: 236 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@ var no_password_is_set = /no password is set/;
77
var loading = /LOADING/;
88
var RedisClient = require('../').RedisClient;
99

10-
/********************************
11-
Replace built-in redis functions
12-
********************************/
10+
/********************************************************************************************
11+
Replace built-in redis functions
12+
13+
The callback may be hooked as needed. The same does not apply to the rest of the function.
14+
State should not be set outside of the callback if not absolutly necessary.
15+
This is important to make sure it works the same as single command or in a multi context.
16+
To make sure everything works with the offline queue use the "call_on_write" function.
17+
This is going to be executed while writing to the stream.
18+
19+
TODO: Implement individal command generation as soon as possible to prevent divergent code
20+
on single and multi calls!
21+
********************************************************************************************/
1322

1423
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function multi (args) {
1524
var multi = new Multi(this, args);
@@ -209,3 +218,227 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
209218
}
210219
return this.internal_send_command('hmset', arr, callback);
211220
};
221+
222+
RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () {
223+
var arr,
224+
len = arguments.length,
225+
callback,
226+
i = 0;
227+
if (Array.isArray(arguments[0])) {
228+
arr = arguments[0];
229+
callback = arguments[1];
230+
} else {
231+
len = arguments.length;
232+
// The later should not be the average use case
233+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
234+
len--;
235+
callback = arguments[len];
236+
}
237+
arr = new Array(len);
238+
for (; i < len; i += 1) {
239+
arr[i] = arguments[i];
240+
}
241+
}
242+
var self = this;
243+
var call_on_write = function () {
244+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
245+
};
246+
return this.internal_send_command('subscribe', arr, callback, call_on_write);
247+
};
248+
249+
Multi.prototype.subscribe = Multi.prototype.SUBSCRIBE = function subscribe () {
250+
var arr,
251+
len = arguments.length,
252+
callback,
253+
i = 0;
254+
if (Array.isArray(arguments[0])) {
255+
arr = arguments[0];
256+
callback = arguments[1];
257+
} else {
258+
len = arguments.length;
259+
// The later should not be the average use case
260+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
261+
len--;
262+
callback = arguments[len];
263+
}
264+
arr = new Array(len);
265+
for (; i < len; i += 1) {
266+
arr[i] = arguments[i];
267+
}
268+
}
269+
var self = this._client;
270+
var call_on_write = function () {
271+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
272+
};
273+
this.queue.push(['subscribe', arr, callback, call_on_write]);
274+
return this;
275+
};
276+
277+
RedisClient.prototype.unsubscribe = RedisClient.prototype.UNSUBSCRIBE = function unsubscribe () {
278+
var arr,
279+
len = arguments.length,
280+
callback,
281+
i = 0;
282+
if (Array.isArray(arguments[0])) {
283+
arr = arguments[0];
284+
callback = arguments[1];
285+
} else {
286+
len = arguments.length;
287+
// The later should not be the average use case
288+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
289+
len--;
290+
callback = arguments[len];
291+
}
292+
arr = new Array(len);
293+
for (; i < len; i += 1) {
294+
arr[i] = arguments[i];
295+
}
296+
}
297+
var self = this;
298+
var call_on_write = function () {
299+
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
300+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
301+
};
302+
return this.internal_send_command('unsubscribe', arr, callback, call_on_write);
303+
};
304+
305+
Multi.prototype.unsubscribe = Multi.prototype.UNSUBSCRIBE = function unsubscribe () {
306+
var arr,
307+
len = arguments.length,
308+
callback,
309+
i = 0;
310+
if (Array.isArray(arguments[0])) {
311+
arr = arguments[0];
312+
callback = arguments[1];
313+
} else {
314+
len = arguments.length;
315+
// The later should not be the average use case
316+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
317+
len--;
318+
callback = arguments[len];
319+
}
320+
arr = new Array(len);
321+
for (; i < len; i += 1) {
322+
arr[i] = arguments[i];
323+
}
324+
}
325+
var self = this._client;
326+
var call_on_write = function () {
327+
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
328+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
329+
};
330+
this.queue.push(['unsubscribe', arr, callback, call_on_write]);
331+
return this;
332+
};
333+
334+
RedisClient.prototype.psubscribe = RedisClient.prototype.PSUBSCRIBE = function psubscribe () {
335+
var arr,
336+
len = arguments.length,
337+
callback,
338+
i = 0;
339+
if (Array.isArray(arguments[0])) {
340+
arr = arguments[0];
341+
callback = arguments[1];
342+
} else {
343+
len = arguments.length;
344+
// The later should not be the average use case
345+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
346+
len--;
347+
callback = arguments[len];
348+
}
349+
arr = new Array(len);
350+
for (; i < len; i += 1) {
351+
arr[i] = arguments[i];
352+
}
353+
}
354+
var self = this;
355+
var call_on_write = function () {
356+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
357+
};
358+
return this.internal_send_command('psubscribe', arr, callback, call_on_write);
359+
};
360+
361+
Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () {
362+
var arr,
363+
len = arguments.length,
364+
callback,
365+
i = 0;
366+
if (Array.isArray(arguments[0])) {
367+
arr = arguments[0];
368+
callback = arguments[1];
369+
} else {
370+
len = arguments.length;
371+
// The later should not be the average use case
372+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
373+
len--;
374+
callback = arguments[len];
375+
}
376+
arr = new Array(len);
377+
for (; i < len; i += 1) {
378+
arr[i] = arguments[i];
379+
}
380+
}
381+
var self = this;
382+
var call_on_write = function () {
383+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
384+
};
385+
this.queue.push(['psubscribe', arr, callback, call_on_write]);
386+
return this;
387+
};
388+
389+
RedisClient.prototype.punsubscribe = RedisClient.prototype.PUNSUBSCRIBE = function punsubscribe () {
390+
var arr,
391+
len = arguments.length,
392+
callback,
393+
i = 0;
394+
if (Array.isArray(arguments[0])) {
395+
arr = arguments[0];
396+
callback = arguments[1];
397+
} else {
398+
len = arguments.length;
399+
// The later should not be the average use case
400+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
401+
len--;
402+
callback = arguments[len];
403+
}
404+
arr = new Array(len);
405+
for (; i < len; i += 1) {
406+
arr[i] = arguments[i];
407+
}
408+
}
409+
var self = this;
410+
var call_on_write = function () {
411+
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
412+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
413+
};
414+
return this.internal_send_command('punsubscribe', arr, callback, call_on_write);
415+
};
416+
417+
Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscribe () {
418+
var arr,
419+
len = arguments.length,
420+
callback,
421+
i = 0;
422+
if (Array.isArray(arguments[0])) {
423+
arr = arguments[0];
424+
callback = arguments[1];
425+
} else {
426+
len = arguments.length;
427+
// The later should not be the average use case
428+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
429+
len--;
430+
callback = arguments[len];
431+
}
432+
arr = new Array(len);
433+
for (; i < len; i += 1) {
434+
arr[i] = arguments[i];
435+
}
436+
}
437+
var self = this;
438+
var call_on_write = function () {
439+
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
440+
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
441+
};
442+
this.queue.push(['punsubscribe', arr, callback, call_on_write]);
443+
return this;
444+
};

0 commit comments

Comments
 (0)