Skip to content

Commit 3038c90

Browse files
author
Ruben Bridgewater
committed
Make sure all individual handled command work in multi context the same
Fix quit possibly resulting in reconnections
1 parent 0424cb0 commit 3038c90

File tree

7 files changed

+334
-99
lines changed

7 files changed

+334
-99
lines changed

index.js

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -735,12 +735,35 @@ function return_pub_sub (self, reply) {
735735
}
736736

737737
RedisClient.prototype.return_reply = function (reply) {
738-
if (this.pub_sub_mode === 1 && reply instanceof Array && reply.length !== 0 && reply[0]) {
738+
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
739+
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
740+
// the average performance of all other commands in case of no monitor mode
741+
if (this.monitoring) {
742+
var replyStr;
743+
if (this.buffers && Buffer.isBuffer(reply)) {
744+
replyStr = reply.toString();
745+
} else {
746+
replyStr = reply;
747+
}
748+
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
749+
// Therefore the monitor command has to finish before it catches further commands
750+
if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) {
751+
var timestamp = replyStr.slice(0, replyStr.indexOf(' '));
752+
var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) {
753+
return elem.replace(/\\"/g, '"');
754+
});
755+
this.emit('monitor', timestamp, args, replyStr);
756+
return;
757+
}
758+
}
759+
if (this.pub_sub_mode === 0) {
760+
normal_reply(this, reply);
761+
} else if (this.pub_sub_mode !== 1) {
762+
this.pub_sub_mode--;
763+
normal_reply(this, reply);
764+
} else if (reply instanceof Array && reply.length > 2 && reply[0]) {
739765
return_pub_sub(this, reply);
740766
} else {
741-
if (this.pub_sub_mode !== 0 && this.pub_sub_mode !== 1) {
742-
this.pub_sub_mode--;
743-
}
744767
normal_reply(this, reply);
745768
}
746769
};

lib/individualCommands.js

Lines changed: 157 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -31,88 +31,101 @@ RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args
3131
return new Multi(this, args);
3232
};
3333

34-
// Store db in this.select_db to restore it on reconnect
35-
RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) {
36-
var self = this;
37-
return this.internal_send_command('select', [db], function (err, res) {
34+
function select_callback (self, db, callback) {
35+
return function (err, res) {
3836
if (err === null) {
37+
// Store db in this.select_db to restore it on reconnect
3938
self.selected_db = db;
4039
}
4140
utils.callback_or_emit(self, callback, err, res);
42-
});
41+
};
42+
}
43+
44+
RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) {
45+
return this.internal_send_command('select', [db], select_callback(this, db, callback));
4346
};
4447

45-
RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function (callback) {
46-
// Use a individual command, as this is a special case that does not has to be checked for any other command
47-
var self = this;
48-
return this.internal_send_command('monitor', [], function (err, res) {
48+
Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) {
49+
this.queue.push(['select', [db], select_callback(this._client, db, callback)]);
50+
return this;
51+
};
52+
53+
function monitor_callback (self, callback) {
54+
return function (err, res) {
4955
if (err === null) {
50-
self.reply_parser.returnReply = function (reply) {
51-
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
52-
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
53-
// the average performance of all other commands in case of no monitor mode
54-
if (self.monitoring) {
55-
var replyStr;
56-
if (self.buffers && Buffer.isBuffer(reply)) {
57-
replyStr = reply.toString();
58-
} else {
59-
replyStr = reply;
60-
}
61-
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
62-
// Therefor the monitor command has to finish before it catches further commands
63-
if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) {
64-
var timestamp = replyStr.slice(0, replyStr.indexOf(' '));
65-
var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) {
66-
return elem.replace(/\\"/g, '"');
67-
});
68-
self.emit('monitor', timestamp, args, replyStr);
69-
return;
70-
}
71-
}
72-
self.return_reply(reply);
73-
};
7456
self.monitoring = true;
7557
}
7658
utils.callback_or_emit(self, callback, err, res);
77-
});
59+
};
60+
}
61+
62+
RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor (callback) {
63+
// Use a individual command, as this is a special case that does not has to be checked for any other command
64+
return this.internal_send_command('monitor', [], monitor_callback(this, callback));
7865
};
7966

80-
RedisClient.prototype.quit = RedisClient.prototype.QUIT = function (callback) {
81-
var self = this;
82-
var callback_hook = function (err, res) {
83-
// TODO: Improve this by handling everything with coherend error codes and find out if there's anything missing
84-
if (err && (err.code === 'NR_OFFLINE' ||
85-
err.message === 'Redis connection gone from close event.' ||
86-
err.message === 'The command can\'t be processed. The connection has already been closed.'
87-
)) {
67+
// Only works with batch, not in a transaction
68+
Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) {
69+
// Use a individual command, as this is a special case that does not has to be checked for any other command
70+
if (this.exec !== this.exec_transaction) {
71+
this.queue.push(['monitor', [], monitor_callback(this._client, callback)]);
72+
return this;
73+
}
74+
var err = new Error(
75+
'You used the monitor command in combination with a transaction. Due to faulty return values of ' +
76+
'Redis in this context, the monitor command is now executed without transaction instead and ignored ' +
77+
'in the multi statement.'
78+
);
79+
err.command = 'MONITOR';
80+
utils.reply_in_order(this._client, callback, err);
81+
this._client.monitor('monitor', callback);
82+
return this;
83+
};
84+
85+
function quit_callback (self, callback) {
86+
return function (err, res) {
87+
if (err && err.code === 'NR_OFFLINE') {
8888
// Pretent the quit command worked properly in this case.
8989
// Either the quit landed in the offline queue and was flushed at the reconnect
9090
// or the offline queue is deactivated and the command was rejected right away
9191
// or the stream is not writable
92-
// or while sending the quit, the connection dropped
92+
// or while sending the quit, the connection ended / closed
9393
err = null;
9494
res = 'OK';
9595
}
9696
utils.callback_or_emit(self, callback, err, res);
97+
if (self.stream.writable) {
98+
// If the socket is still alive, kill it. This could happen if quit got a NR_OFFLINE error code
99+
self.stream.destroy();
100+
}
97101
};
98-
var backpressure_indicator = this.internal_send_command('quit', [], callback_hook);
102+
}
103+
104+
RedisClient.prototype.QUIT = RedisClient.prototype.quit = function (callback) {
105+
// TODO: Consider this for v.3
106+
// Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue.
107+
// this.ready = this.offline_queue.length === 0;
108+
var backpressure_indicator = this.internal_send_command('quit', [], quit_callback(this, callback));
99109
// Calling quit should always end the connection, no matter if there's a connection or not
100110
this.closing = true;
111+
this.ready = false;
101112
return backpressure_indicator;
102113
};
103114

104-
// Store info in this.server_info after each call
105-
RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) {
106-
var self = this;
107-
var ready = this.ready;
108-
var args = [];
109-
if (typeof section === 'function') {
110-
callback = section;
111-
} else if (section !== undefined) {
112-
args = Array.isArray(section) ? section : [section];
113-
}
114-
this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt
115-
var tmp = this.internal_send_command('info', args, function (err, res) {
115+
// Only works with batch, not in a transaction
116+
Multi.prototype.QUIT = Multi.prototype.quit = function (callback) {
117+
var self = this._client;
118+
var call_on_write = function () {
119+
// If called in a multi context, we expect redis is available
120+
self.closing = true;
121+
self.ready = false;
122+
};
123+
this.queue.push(['quit', [], quit_callback(self, callback), call_on_write]);
124+
return this;
125+
};
126+
127+
function info_callback (self, callback) {
128+
return function (err, res) {
116129
if (res) {
117130
var obj = {};
118131
var lines = res.toString().split('\r\n');
@@ -146,20 +159,33 @@ RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section
146159
self.server_info = {};
147160
}
148161
utils.callback_or_emit(self, callback, err, res);
149-
});
150-
this.ready = ready;
151-
return tmp;
162+
};
163+
}
164+
165+
// Store info in this.server_info after each call
166+
RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) {
167+
var args = [];
168+
if (typeof section === 'function') {
169+
callback = section;
170+
} else if (section !== undefined) {
171+
args = Array.isArray(section) ? section : [section];
172+
}
173+
return this.internal_send_command('info', args, info_callback(this, callback));
152174
};
153175

154-
RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) {
155-
var self = this;
156-
var ready = this.ready;
157-
debug('Sending auth to ' + self.address + ' id ' + self.connection_id);
176+
Multi.prototype.info = Multi.prototype.INFO = function info (section, callback) {
177+
var args = [];
178+
if (typeof section === 'function') {
179+
callback = section;
180+
} else if (section !== undefined) {
181+
args = Array.isArray(section) ? section : [section];
182+
}
183+
this.queue.push(['info', args, info_callback(this._client, callback)]);
184+
return this;
185+
};
158186

159-
// Stash auth for connect and reconnect.
160-
this.auth_pass = pass;
161-
this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt
162-
var tmp = this.internal_send_command('auth', [pass], function (err, res) {
187+
function auth_callback (self, pass, callback) {
188+
return function (err, res) {
163189
if (err) {
164190
if (no_password_is_set.test(err.message)) {
165191
self.warn('Warning: Redis server does not require a password, but a password was supplied.');
@@ -175,11 +201,31 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, c
175201
}
176202
}
177203
utils.callback_or_emit(self, callback, err, res);
178-
});
204+
};
205+
}
206+
207+
RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) {
208+
debug('Sending auth to ' + this.address + ' id ' + this.connection_id);
209+
210+
// Stash auth for connect and reconnect.
211+
this.auth_pass = pass;
212+
var ready = this.ready;
213+
this.ready = ready || this.offline_queue.length === 0;
214+
var tmp = this.internal_send_command('auth', [pass], auth_callback(this, pass, callback));
179215
this.ready = ready;
180216
return tmp;
181217
};
182218

219+
// Only works with batch, not in a transaction
220+
Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) {
221+
debug('Sending auth to ' + this.address + ' id ' + this.connection_id);
222+
223+
// Stash auth for connect and reconnect.
224+
this.auth_pass = pass;
225+
this.queue.push(['auth', [pass], auth_callback(this._client, callback)]);
226+
return this;
227+
};
228+
183229
RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
184230
var arr,
185231
len = arguments.length,
@@ -198,7 +244,7 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
198244
for (; i < len; i += 1) {
199245
arr[i + 1] = arguments[1][i];
200246
}
201-
} else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) {
247+
} else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) {
202248
arr = [arguments[0]];
203249
for (var field in arguments[1]) { // jshint ignore: line
204250
arr.push(field, arguments[1][field]);
@@ -219,6 +265,46 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
219265
return this.internal_send_command('hmset', arr, callback);
220266
};
221267

268+
Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () {
269+
var arr,
270+
len = arguments.length,
271+
callback,
272+
i = 0;
273+
if (Array.isArray(arguments[0])) {
274+
arr = arguments[0];
275+
callback = arguments[1];
276+
} else if (Array.isArray(arguments[1])) {
277+
if (len === 3) {
278+
callback = arguments[2];
279+
}
280+
len = arguments[1].length;
281+
arr = new Array(len + 1);
282+
arr[0] = arguments[0];
283+
for (; i < len; i += 1) {
284+
arr[i + 1] = arguments[1][i];
285+
}
286+
} else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) {
287+
arr = [arguments[0]];
288+
for (var field in arguments[1]) { // jshint ignore: line
289+
arr.push(field, arguments[1][field]);
290+
}
291+
callback = arguments[2];
292+
} else {
293+
len = arguments.length;
294+
// The later should not be the average use case
295+
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
296+
len--;
297+
callback = arguments[len];
298+
}
299+
arr = new Array(len);
300+
for (; i < len; i += 1) {
301+
arr[i] = arguments[i];
302+
}
303+
}
304+
this.queue.push(['hmset', arr, callback]);
305+
return this;
306+
};
307+
222308
RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () {
223309
var arr,
224310
len = arguments.length,
@@ -378,7 +464,7 @@ Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe ()
378464
arr[i] = arguments[i];
379465
}
380466
}
381-
var self = this;
467+
var self = this._client;
382468
var call_on_write = function () {
383469
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;
384470
};
@@ -434,7 +520,7 @@ Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscr
434520
arr[i] = arguments[i];
435521
}
436522
}
437-
var self = this;
523+
var self = this._client;
438524
var call_on_write = function () {
439525
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
440526
self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1;

test/auth.spec.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,48 @@ describe('client authentication', function () {
291291
});
292292
});
293293
});
294+
295+
it('indivdual commands work properly with batch', function (done) {
296+
// quit => might return an error instead of "OK" in the exec callback... (if not connected)
297+
// auth => might return an error instead of "OK" in the exec callback... (if no password is required / still loading on Redis <= 2.4)
298+
// This could be fixed by checking the return value of the callback in the exec callback and
299+
// returning the manipulated [error, result] from the callback.
300+
// There should be a better solution though
301+
302+
var args = config.configureClient(parser, 'localhost', {
303+
noReadyCheck: true
304+
});
305+
client = redis.createClient.apply(redis.createClient, args);
306+
assert.strictEqual(client.selected_db, undefined);
307+
var end = helper.callFuncAfter(done, 8);
308+
client.on('monitor', function () {
309+
end(); // Should be called for each command after monitor
310+
});
311+
client.batch()
312+
.auth(auth)
313+
.SELECT(5, function (err, res) {
314+
assert.strictEqual(client.selected_db, 5);
315+
assert.strictEqual(res, 'OK');
316+
assert.notDeepEqual(client.serverInfo.db5, { avg_ttl: 0, expires: 0, keys: 1 });
317+
})
318+
.monitor()
319+
.set('foo', 'bar', helper.isString('OK'))
320+
.INFO(function (err, res) {
321+
assert.strictEqual(res.indexOf('# Server\r\nredis_version:'), 0);
322+
assert.deepEqual(client.serverInfo.db5, { avg_ttl: 0, expires: 0, keys: 1 });
323+
})
324+
.get('foo', helper.isString('bar'))
325+
.subscribe(['foo', 'bar'])
326+
.unsubscribe('foo')
327+
.SUBSCRIBE('/foo', helper.isString('/foo'))
328+
.psubscribe('*')
329+
.quit(helper.isString('OK')) // this might be interesting
330+
.exec(function (err, res) {
331+
res[4] = res[4].substr(0, 10);
332+
assert.deepEqual(res, ['OK', 'OK', 'OK', 'OK', '# Server\r\n', 'bar', 'bar', 'foo', '/foo', '*', 'OK']);
333+
end();
334+
});
335+
});
294336
});
295337
});
296338

0 commit comments

Comments
 (0)