Skip to content

Commit 0dc45bd

Browse files
author
Ruben Bridgewater
committed
Improve pub sub mode further
1 parent 5fac595 commit 0dc45bd

File tree

3 files changed

+80
-60
lines changed

3 files changed

+80
-60
lines changed

index.js

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ function RedisClient (options, stream) {
148148
this.old_state = null;
149149
this.fire_strings = true; // Determine if strings or buffers should be written to the stream
150150
this.pipeline = false;
151+
this.sub_commands_left = 0;
151152
this.times_connected = 0;
152153
this.options = options;
153154
this.buffers = options.return_buffers || options.detect_buffers;
@@ -645,6 +646,11 @@ RedisClient.prototype.return_error = function (err) {
645646
}
646647
}
647648

649+
// Count down pub sub mode if in entering modus
650+
if (this.pub_sub_mode > 1) {
651+
this.pub_sub_mode--;
652+
}
653+
648654
var match = err.message.match(utils.err_code);
649655
// LUA script could return user errors that don't behave like all other errors!
650656
if (match) {
@@ -677,59 +683,53 @@ function normal_reply (self, reply) {
677683
}
678684
}
679685

680-
function set_subscribe (self, type, subscribe, channel) {
681-
// Every channel has to be saved / removed one after the other and the type has to be the same too,
682-
// to make sure partly subscribe / unsubscribe works well together
683-
if (subscribe) {
684-
self.subscription_set[type + '_' + channel] = channel;
685-
} else {
686-
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
687-
delete self.subscription_set[type + '_' + channel];
688-
}
689-
}
690-
691-
function subscribe_unsubscribe (self, reply, type, subscribe) {
686+
function subscribe_unsubscribe (self, reply, type) {
692687
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
693688
// The pub sub commands return each argument in a separate return value and have to be handled that way
694689
var command_obj = self.command_queue.get(0);
695690
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args;
696691
var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString();
697692
var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not
698-
debug('Subscribe / unsubscribe command');
693+
debug(type, channel);
699694

700695
// Emit first, then return the callback
701696
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
702697
self.emit(type, channel, count);
703-
set_subscribe(self, type, subscribe, channel);
704-
}
705-
if (command_obj.sub_commands_left <= 1) {
706-
if (count !== 0) {
707-
if (!subscribe && command_obj.args.length === 0) { // Unsubscribe from all channels
708-
command_obj.sub_commands_left = count;
709-
return;
710-
}
698+
if (type === 'subscribe' || type === 'psubscribe') {
699+
self.subscription_set[type + '_' + channel] = channel;
711700
} else {
701+
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
702+
delete self.subscription_set[type + '_' + channel];
703+
}
704+
}
705+
706+
if (command_obj.args.length === 1 || self.sub_commands_left === 1 || command_obj.args.length === 0 && (count === 0 || channel === null)) {
707+
if (count === 0) { // unsubscribed from all channels
712708
var running_command;
713709
var i = 1;
710+
self.pub_sub_mode = 0; // Deactivating pub sub mode
714711
// This should be a rare case and therefore handling it this way should be good performance wise for the general case
715712
while (running_command = self.command_queue.get(i)) {
716713
if (SUBSCRIBE_COMMANDS[running_command.command]) {
717-
self.command_queue.shift();
718-
self.pub_sub_mode = i;
719-
return;
714+
self.pub_sub_mode = i; // Entering pub sub mode again
715+
break;
720716
}
721717
i++;
722718
}
723-
self.pub_sub_mode = 0;
724719
}
725720
self.command_queue.shift();
726721
if (typeof command_obj.callback === 'function') {
727722
// TODO: The current return value is pretty useless.
728723
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
729724
command_obj.callback(null, channel);
730725
}
726+
self.sub_commands_left = 0;
731727
} else {
732-
command_obj.sub_commands_left--;
728+
if (self.sub_commands_left !== 0) {
729+
self.sub_commands_left--;
730+
} else {
731+
self.sub_commands_left = command_obj.args.length ? command_obj.args.length - 1 : count;
732+
}
733733
}
734734
}
735735

@@ -751,12 +751,8 @@ function return_pub_sub (self, reply) {
751751
} else {
752752
self.emit('pmessage', reply[1], reply[2], reply[3]);
753753
}
754-
} else if (type === 'subscribe' || type === 'psubscribe') {
755-
subscribe_unsubscribe(self, reply, type, true);
756-
} else if (type === 'unsubscribe' || type === 'punsubscribe') {
757-
subscribe_unsubscribe(self, reply, type, false);
758754
} else {
759-
normal_reply(self, reply);
755+
subscribe_unsubscribe(self, reply, type);
760756
}
761757
}
762758

@@ -787,10 +783,12 @@ RedisClient.prototype.return_reply = function (reply) {
787783
} else if (this.pub_sub_mode !== 1) {
788784
this.pub_sub_mode--;
789785
normal_reply(this, reply);
790-
} else if (reply instanceof Array && reply.length > 2 && reply[0]) {
791-
return_pub_sub(this, reply);
792-
} else {
786+
} else if (!(reply instanceof Array) || reply.length <= 2) {
787+
// Only PING and QUIT are allowed in this context besides the pub sub commands
788+
// Ping replies with ['pong', null|value] and quit with 'OK'
793789
normal_reply(this, reply);
790+
} else {
791+
return_pub_sub(this, reply);
794792
}
795793
};
796794

lib/command.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
// a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
55
function Command (command, args, buffer_args, callback) {
66
this.command = command;
7-
this.args = args; // We only need the args for the offline commands => move them into another class. We need the number of args though for pub sub
7+
this.args = args;
88
this.buffer_args = buffer_args;
99
this.callback = callback;
10-
this.sub_commands_left = args.length;
1110
}
1211

1312
function OfflineCommand (command, args, callback, call_on_write) {

test/pubsub.spec.js

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ describe('publish/subscribe', function () {
7979

8080
it('does not fire subscribe events after reconnecting', function (done) {
8181
var i = 0;
82+
var end = helper.callFuncAfter(done, 2);
8283
sub.on('subscribe', function (chnl, count) {
8384
assert.strictEqual(typeof count, 'number');
8485
assert.strictEqual(++i, count);
@@ -91,9 +92,10 @@ describe('publish/subscribe', function () {
9192
sub.unsubscribe(function (err, res) { // Do not pass a channel here!
9293
assert.strictEqual(sub.pub_sub_mode, 2);
9394
assert.deepEqual(sub.subscription_set, {});
95+
end();
9496
});
9597
sub.set('foo', 'bar', helper.isString('OK'));
96-
sub.subscribe(channel2, done);
98+
sub.subscribe(channel2, end);
9799
});
98100
});
99101

@@ -181,25 +183,19 @@ describe('publish/subscribe', function () {
181183
sub.subscribe('chan9');
182184
sub.unsubscribe('chan9');
183185
pub.publish('chan8', 'something');
184-
sub.subscribe('chan9', function () {
185-
return done();
186-
});
186+
sub.subscribe('chan9', done);
187187
});
188188

189189
it('handles SUB_UNSUB_MSG_SUB 2', function (done) {
190-
sub.psubscribe('abc*');
190+
sub.psubscribe('abc*', helper.isString('abc*'));
191191
sub.subscribe('xyz');
192192
sub.unsubscribe('xyz');
193193
pub.publish('abcd', 'something');
194-
sub.subscribe('xyz', function () {
195-
return done();
196-
});
194+
sub.subscribe('xyz', done);
197195
});
198196

199197
it('emits end event if quit is called from within subscribe', function (done) {
200-
sub.on('end', function () {
201-
return done();
202-
});
198+
sub.on('end', done);
203199
sub.on('subscribe', function (chnl, count) {
204200
sub.quit();
205201
});
@@ -236,6 +232,10 @@ describe('publish/subscribe', function () {
236232
var end = helper.callFuncAfter(done, 2);
237233
sub.select(3);
238234
sub.set('foo', 'bar');
235+
sub.set('failure', helper.isError()); // Triggering a warning while subscribing should work
236+
sub.mget('foo', 'bar', 'baz', 'hello', 'world', function (err, res) {
237+
assert.deepEqual(res, ['bar', null, null, null, null]);
238+
});
239239
sub.subscribe('somechannel', 'another channel', function (err, res) {
240240
end();
241241
sub.stream.destroy();
@@ -280,7 +280,7 @@ describe('publish/subscribe', function () {
280280

281281
it('should only resubscribe to channels not unsubscribed earlier on a reconnect', function (done) {
282282
sub.subscribe('/foo', '/bar');
283-
sub.unsubscribe('/bar', function () {
283+
sub.batch().unsubscribe(['/bar'], function () {
284284
pub.pubsub('channels', function (err, res) {
285285
assert.deepEqual(res, ['/foo']);
286286
sub.stream.destroy();
@@ -291,7 +291,7 @@ describe('publish/subscribe', function () {
291291
});
292292
});
293293
});
294-
});
294+
}).exec();
295295
});
296296

297297
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Withouth callbacks', function (done) {
@@ -490,7 +490,7 @@ describe('publish/subscribe', function () {
490490
return_buffers: true
491491
});
492492
sub2.on('ready', function () {
493-
sub2.psubscribe('*');
493+
sub2.batch().psubscribe('*', helper.isString('*')).exec();
494494
sub2.subscribe('/foo');
495495
sub2.on('pmessage', function (pattern, channel, message) {
496496
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
@@ -501,32 +501,58 @@ describe('publish/subscribe', function () {
501501
pub.pubsub('numsub', '/foo', function (err, res) {
502502
assert.deepEqual(res, ['/foo', 2]);
503503
});
504+
// sub2 is counted twice as it subscribed with psubscribe and subscribe
504505
pub.publish('/foo', 'hello world', helper.isNumber(3));
505506
});
506507
});
507508

508509
it('allows to listen to pmessageBuffer and pmessage', function (done) {
509510
var batch = sub.batch();
511+
var end = helper.callFuncAfter(done, 6);
512+
assert.strictEqual(sub.message_buffers, false);
510513
batch.psubscribe('*');
511514
batch.subscribe('/foo');
512515
batch.unsubscribe('/foo');
513-
batch.unsubscribe();
514-
batch.subscribe(['/foo']);
516+
batch.unsubscribe(helper.isNull());
517+
batch.subscribe(['/foo'], helper.isString('/foo'));
515518
batch.exec();
516519
assert.strictEqual(sub.shouldBuffer, false);
517520
sub.on('pmessageBuffer', function (pattern, channel, message) {
518521
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
519522
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
520-
sub.quit(done);
523+
sub.quit(end);
521524
});
525+
// Either message_buffers or buffers has to be true, but not both at the same time
526+
assert.notStrictEqual(sub.message_buffers, sub.buffers);
522527
sub.on('pmessage', function (pattern, channel, message) {
523528
assert.strictEqual(pattern, '*');
524529
assert.strictEqual(channel, '/foo');
530+
assert.strictEqual(message, 'hello world');
531+
end();
525532
});
526-
pub.pubsub('numsub', '/foo', function (err, res) {
527-
assert.deepEqual(res, ['/foo', 1]);
533+
sub.on('message', function (channel, message) {
534+
assert.strictEqual(channel, '/foo');
535+
assert.strictEqual(message, 'hello world');
536+
end();
528537
});
529-
pub.publish('/foo', 'hello world', helper.isNumber(2));
538+
setTimeout(function () {
539+
pub.pubsub('numsub', '/foo', function (err, res) {
540+
// There's one subscriber to this channel
541+
assert.deepEqual(res, ['/foo', 1]);
542+
end();
543+
});
544+
pub.pubsub('channels', function (err, res) {
545+
// There's exactly one channel that is listened too
546+
assert.deepEqual(res, ['/foo']);
547+
end();
548+
});
549+
pub.pubsub('numpat', function (err, res) {
550+
// One pattern is active
551+
assert.strictEqual(res, 1);
552+
end();
553+
});
554+
pub.publish('/foo', 'hello world', helper.isNumber(2));
555+
}, 50);
530556
});
531557
});
532558

@@ -536,10 +562,7 @@ describe('publish/subscribe', function () {
536562
});
537563

538564
it('executes callback when punsubscribe is called and there are no subscriptions', function (done) {
539-
pub.punsubscribe(function (err, results) {
540-
assert.strictEqual(null, results);
541-
done(err);
542-
});
565+
pub.batch().punsubscribe(helper.isNull()).exec(done);
543566
});
544567
});
545568

0 commit comments

Comments
 (0)