Skip to content

Commit cd58e1f

Browse files
author
Ruben Bridgewater
committed
Implement message_buffer and pmessage_buffer events
1 parent a9d565b commit cd58e1f

File tree

3 files changed

+67
-27
lines changed

3 files changed

+67
-27
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,16 @@ Client will emit `pmessage` for every message received that matches an active su
410410
Listeners are passed the original pattern used with `PSUBSCRIBE` as `pattern`, the sending channel
411411
name as `channel`, and the message as `message`.
412412

413+
### "message_buffer" (channel, message)
414+
415+
This is the same as the `message` event with the exception, that it is always going to emit a buffer.
416+
If you listen to the `message` event at the same time as the `message_buffer`, it is always going to emit a string.
417+
418+
### "pmessage_buffer" (pattern, channel, message)
419+
420+
This is the same as the `pmessage` event with the exception, that it is always going to emit a buffer.
421+
If you listen to the `pmessage` event at the same time as the `pmessage_buffer`, it is always going to emit a string.
422+
413423
### "subscribe" (channel, count)
414424

415425
Client will emit `subscribe` in response to a `SUBSCRIBE` command. Listeners are passed the

index.js

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ if (typeof EventEmitter !== 'function') {
2727
function noop () {}
2828

2929
function handle_detect_buffers_reply (reply, command, buffer_args) {
30-
if (buffer_args === false) {
30+
if (buffer_args === false || this.message_buffers) {
3131
// If detect_buffers option was specified, then the reply from the parser will be a buffer.
3232
// If this command did not use Buffer arguments, then convert the reply to Strings here.
3333
reply = utils.reply_to_strings(reply);
@@ -138,6 +138,7 @@ function RedisClient (options, stream) {
138138
this.pub_sub_mode = 0;
139139
this.subscription_set = {};
140140
this.monitoring = false;
141+
this.message_buffers = false;
141142
this.closing = false;
142143
this.server_info = {};
143144
this.auth_pass = options.auth_pass || options.password;
@@ -149,23 +150,7 @@ function RedisClient (options, stream) {
149150
this.options = options;
150151
this.buffers = options.return_buffers || options.detect_buffers;
151152
// Init parser
152-
this.reply_parser = Parser({
153-
returnReply: function (data) {
154-
self.return_reply(data);
155-
},
156-
returnError: function (err) {
157-
self.return_error(err);
158-
},
159-
returnFatalError: function (err) {
160-
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
161-
self.flush_and_error(err, ['command_queue']);
162-
self.stream.destroy();
163-
self.return_error(err);
164-
},
165-
returnBuffers: this.buffers,
166-
name: options.parser,
167-
stringNumbers: options.string_numbers
168-
});
153+
this.reply_parser = create_parser(this, options);
169154
this.create_stream();
170155
// The listeners will not be attached right away, so let's print the deprecation message while the listener is attached
171156
this.on('newListener', function (event) {
@@ -179,13 +164,37 @@ function RedisClient (options, stream) {
179164
'The drain event listener is deprecated and will be removed in v.3.0.0.\n' +
180165
'If you want to keep on listening to this event please listen to the stream drain event directly.'
181166
);
167+
} else if (event === 'message_buffer' || event === 'pmessage_buffer' || event === 'messageBuffer' || event === 'pmessageBuffer' && !this.buffers) {
168+
this.message_buffers = true;
169+
this.handle_reply = handle_detect_buffers_reply;
170+
this.reply_parser = create_parser(this);
182171
}
183172
});
184173
}
185174
util.inherits(RedisClient, EventEmitter);
186175

187176
RedisClient.connection_id = 0;
188177

178+
function create_parser (self) {
179+
return Parser({
180+
returnReply: function (data) {
181+
self.return_reply(data);
182+
},
183+
returnError: function (err) {
184+
self.return_error(err);
185+
},
186+
returnFatalError: function (err) {
187+
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
188+
self.flush_and_error(err, ['command_queue']);
189+
self.stream.destroy();
190+
self.return_error(err);
191+
},
192+
returnBuffers: self.buffers || self.message_buffers,
193+
name: self.options.parser,
194+
stringNumbers: self.options.string_numbers
195+
});
196+
}
197+
189198
/******************************************************************************
190199
191200
All functions in here are internal besides the RedisClient constructor
@@ -696,21 +705,18 @@ function subscribe_unsubscribe (self, reply, type, subscribe) {
696705
function return_pub_sub (self, reply) {
697706
var type = reply[0].toString();
698707
if (type === 'message') { // channel, message
699-
// TODO: Implement message_buffer
700-
// if (self.buffers) {
701-
// self.emit('message_buffer', reply[1], reply[2]);
702-
// }
703-
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
708+
if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
704709
self.emit('message', reply[1].toString(), reply[2].toString());
710+
self.emit('message_buffer', reply[1], reply[2]);
711+
self.emit('messageBuffer', reply[1], reply[2]);
705712
} else {
706713
self.emit('message', reply[1], reply[2]);
707714
}
708715
} else if (type === 'pmessage') { // pattern, channel, message
709-
// if (self.buffers) {
710-
// self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
711-
// }
712-
if (!self.options.return_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
716+
if (!self.options.return_buffers || self.message_buffers) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
713717
self.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString());
718+
self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
719+
self.emit('pmessageBuffer', reply[1], reply[2], reply[3]);
714720
} else {
715721
self.emit('pmessage', reply[1], reply[2], reply[3]);
716722
}

test/pubsub.spec.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,30 @@ describe('publish/subscribe', function () {
504504
pub.publish('/foo', 'hello world', helper.isNumber(3));
505505
});
506506
});
507+
508+
it('allows to listen to pmessageBuffer and pmessage', function (done) {
509+
var batch = sub.batch();
510+
batch.psubscribe('*');
511+
batch.subscribe('/foo');
512+
batch.unsubscribe('/foo');
513+
batch.unsubscribe();
514+
batch.subscribe(['/foo']);
515+
batch.exec();
516+
assert.strictEqual(sub.shouldBuffer, false);
517+
sub.on('pmessageBuffer', function (pattern, channel, message) {
518+
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
519+
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
520+
sub.quit(done);
521+
});
522+
sub.on('pmessage', function (pattern, channel, message) {
523+
assert.strictEqual(pattern, '*');
524+
assert.strictEqual(channel, '/foo');
525+
});
526+
pub.pubsub('numsub', '/foo', function (err, res) {
527+
assert.deepEqual(res, ['/foo', 1]);
528+
});
529+
pub.publish('/foo', 'hello world', helper.isNumber(2));
530+
});
507531
});
508532

509533
describe('punsubscribe', function () {

0 commit comments

Comments
 (0)