Skip to content

Commit 0ec2c43

Browse files
author
Ruben Bridgewater
committed
Fix fired but not yet returned commands not being rejected after a connection loss
1 parent ebea087 commit 0ec2c43

File tree

4 files changed

+154
-47
lines changed

4 files changed

+154
-47
lines changed

index.js

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,19 @@ RedisClient.prototype.unref = function () {
162162
}
163163
};
164164

165-
// flush offline_queue and command_queue, erroring any items with a callback first
166-
RedisClient.prototype.flush_and_error = function (error) {
165+
// flush provided queues, erroring any items with a callback first
166+
RedisClient.prototype.flush_and_error = function (error, queue_names) {
167167
var command_obj;
168-
while (command_obj = this.offline_queue.shift()) {
169-
if (typeof command_obj.callback === 'function') {
170-
error.command = command_obj.command.toUpperCase();
171-
command_obj.callback(error);
172-
}
173-
}
174-
while (command_obj = this.command_queue.shift()) {
175-
if (typeof command_obj.callback === 'function') {
176-
error.command = command_obj.command.toUpperCase();
177-
command_obj.callback(error);
168+
queue_names = queue_names || ['offline_queue', 'command_queue'];
169+
for (var i = 0; i < queue_names.length; i++) {
170+
while (command_obj = this[queue_names[i]].shift()) {
171+
if (typeof command_obj.callback === 'function') {
172+
error.command = command_obj.command.toUpperCase();
173+
command_obj.callback(error);
174+
}
178175
}
176+
this[queue_names[i]] = new Queue();
179177
}
180-
this.offline_queue = new Queue();
181-
this.command_queue = new Queue();
182178
};
183179

184180
RedisClient.prototype.on_error = function (err) {
@@ -477,6 +473,7 @@ var retry_connection = function (self) {
477473
};
478474

479475
RedisClient.prototype.connection_gone = function (why) {
476+
var error;
480477
// If a retry is already in progress, just let that happen
481478
if (this.retry_timer) {
482479
return;
@@ -515,14 +512,26 @@ RedisClient.prototype.connection_gone = function (why) {
515512
var message = this.retry_totaltime >= this.connect_timeout ?
516513
'connection timeout exceeded.' :
517514
'maximum connection attempts exceeded.';
518-
var error = new Error('Redis connection in broken state: ' + message);
515+
error = new Error('Redis connection in broken state: ' + message);
519516
error.code = 'CONNECTION_BROKEN';
520517
this.flush_and_error(error);
521518
this.emit('error', error);
522519
this.end();
523520
return;
524521
}
525522

523+
// Flush all commands that have not yet returned. We can't handle them appropriatly
524+
if (this.command_queue.length !== 0) {
525+
error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.');
526+
error.code = 'UNCERTAIN_STATE';
527+
// TODO: Evaluate to add this
528+
// if (this.options.retry_commands) {
529+
// this.offline_queue.unshift(this.command_queue.toArray());
530+
// error.message = 'Command aborted in uncertain state and queued for next connection.';
531+
// }
532+
this.flush_and_error(error, ['command_queue']);
533+
}
534+
526535
if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) {
527536
this.retry_delay = this.retry_max_delay;
528537
} else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) {

test/connection.spec.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,63 @@ describe("connection tests", function () {
236236
});
237237
}
238238

239+
it("redis still loading <= 1000ms", function (done) {
240+
client = redis.createClient.apply(redis.createClient, args);
241+
var tmp = client.info.bind(client);
242+
var end = helper.callFuncAfter(done, 3);
243+
var delayed = false;
244+
var time;
245+
// Mock original function and pretent redis is still loading
246+
client.info = function (cb) {
247+
tmp(function(err, res) {
248+
if (!delayed) {
249+
assert(!err);
250+
res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:0.5');
251+
delayed = true;
252+
time = Date.now();
253+
}
254+
end();
255+
cb(err, res);
256+
});
257+
};
258+
client.on("ready", function () {
259+
var rest = Date.now() - time;
260+
// Be on the safe side and accept 100ms above the original value
261+
assert(rest - 100 < 500 && rest >= 500);
262+
assert(delayed);
263+
end();
264+
});
265+
});
266+
267+
it("redis still loading > 1000ms", function (done) {
268+
client = redis.createClient.apply(redis.createClient, args);
269+
var tmp = client.info.bind(client);
270+
var end = helper.callFuncAfter(done, 3);
271+
var delayed = false;
272+
var time;
273+
// Mock original function and pretent redis is still loading
274+
client.info = function (cb) {
275+
tmp(function(err, res) {
276+
if (!delayed) {
277+
assert(!err);
278+
// Try reconnecting after one second even if redis tells us the time needed is above one second
279+
res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:2.5');
280+
delayed = true;
281+
time = Date.now();
282+
}
283+
end();
284+
cb(err, res);
285+
});
286+
};
287+
client.on("ready", function () {
288+
var rest = Date.now() - time;
289+
// Be on the safe side and accept 100ms above the original value
290+
assert(rest - 100 < 1000 && rest >= 1000);
291+
assert(delayed);
292+
end();
293+
});
294+
});
295+
239296
});
240297

241298
});

test/node_redis.spec.js

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,43 @@ describe("The node_redis client", function () {
556556
assert.strictEqual(client.offline_queue.length, 0);
557557
});
558558
});
559+
560+
it("flushes the command queue if connection is lost", function (done) {
561+
client = redis.createClient({
562+
parser: parser
563+
});
564+
565+
client.once('ready', function() {
566+
var multi = client.multi();
567+
multi.config("bar");
568+
var cb = function(err, reply) {
569+
assert.equal(err.code, 'UNCERTAIN_STATE');
570+
};
571+
for (var i = 0; i < 12; i += 3) {
572+
client.set("foo" + i, "bar" + i);
573+
multi.set("foo" + (i + 1), "bar" + (i + 1), cb);
574+
multi.set("foo" + (i + 2), "bar" + (i + 2));
575+
}
576+
multi.exec();
577+
assert.equal(client.command_queue.length, 15);
578+
helper.killConnection(client);
579+
});
580+
581+
client.on("reconnecting", function (params) {
582+
assert.equal(client.command_queue.length, 15);
583+
});
584+
585+
client.on('error', function(err) {
586+
if (/uncertain state/.test(err.message)) {
587+
assert.equal(client.command_queue.length, 0);
588+
done();
589+
} else {
590+
assert.equal(err.code, 'ECONNREFUSED');
591+
assert.equal(err.errno, 'ECONNREFUSED');
592+
assert.equal(err.syscall, 'connect');
593+
}
594+
});
595+
});
559596
});
560597

561598
describe('false', function () {
@@ -599,7 +636,7 @@ describe("The node_redis client", function () {
599636
});
600637
});
601638

602-
it("flushes the command queue connection if in broken connection mode", function (done) {
639+
it("flushes the command queue if connection is lost", function (done) {
603640
client = redis.createClient({
604641
parser: parser,
605642
max_attempts: 2,
@@ -610,7 +647,7 @@ describe("The node_redis client", function () {
610647
var multi = client.multi();
611648
multi.config("bar");
612649
var cb = function(err, reply) {
613-
assert.equal(err.code, 'CONNECTION_BROKEN');
650+
assert.equal(err.code, 'UNCERTAIN_STATE');
614651
};
615652
for (var i = 0; i < 12; i += 3) {
616653
client.set("foo" + i, "bar" + i);
@@ -627,7 +664,7 @@ describe("The node_redis client", function () {
627664
});
628665

629666
client.on('error', function(err) {
630-
if (/Redis connection in broken state:/.test(err.message)) {
667+
if (err.code === 'UNCERTAIN_STATE') {
631668
assert.equal(client.command_queue.length, 0);
632669
done();
633670
} else {

test/pubsub.spec.js

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,39 @@ describe("publish/subscribe", function () {
317317
});
318318
});
319319

320+
it("should not publish a message multiple times per command", function (done) {
321+
var published = {};
322+
323+
function subscribe(message) {
324+
sub.removeAllListeners('subscribe');
325+
sub.removeAllListeners('message');
326+
sub.removeAllListeners('unsubscribe');
327+
sub.on('subscribe', function () {
328+
pub.publish('/foo', message);
329+
});
330+
sub.on('message', function (channel, message) {
331+
if (published[message]) {
332+
done(new Error('Message published more than once.'));
333+
}
334+
published[message] = true;
335+
});
336+
sub.on('unsubscribe', function (channel, count) {
337+
assert.strictEqual(count, 0);
338+
});
339+
sub.subscribe('/foo');
340+
}
341+
342+
subscribe('hello');
343+
344+
setTimeout(function () {
345+
sub.unsubscribe();
346+
setTimeout(function () {
347+
subscribe('world');
348+
setTimeout(done, 50);
349+
}, 40);
350+
}, 40);
351+
});
352+
320353
// TODO: Fix pub sub
321354
// And there's more than just those two issues
322355
describe.skip('FIXME: broken pub sub', function () {
@@ -331,35 +364,6 @@ describe("publish/subscribe", function () {
331364
});
332365
setTimeout(done, 200);
333366
});
334-
335-
it("should not publish a message multiple times per command", function (done) {
336-
var published = {};
337-
338-
function subscribe(message) {
339-
sub.on('subscribe', function () {
340-
pub.publish('/foo', message);
341-
});
342-
sub.on('message', function (channel, message) {
343-
if (published[message]) {
344-
done(new Error('Message published more than once.'));
345-
}
346-
published[message] = true;
347-
});
348-
sub.on('unsubscribe', function (channel, count) {
349-
assert.strictEqual(count, 0);
350-
});
351-
sub.subscribe('/foo');
352-
}
353-
354-
subscribe('hello');
355-
356-
setTimeout(function () {
357-
sub.unsubscribe();
358-
setTimeout(function () {
359-
subscribe('world');
360-
}, 40);
361-
}, 40);
362-
});
363367
});
364368

365369
afterEach(function () {

0 commit comments

Comments
 (0)