From 948aeb39026e4861f5b163dda87b97b729a30e2b Mon Sep 17 00:00:00 2001 From: Hugo Cortes Date: Thu, 27 Dec 2018 13:03:45 -0800 Subject: [PATCH 1/2] fix: add error listeners --- index.js | 2 +- lib/rabbit.js | 43 +++++++++++++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/index.js b/index.js index 0b337c9..3279ccb 100644 --- a/index.js +++ b/index.js @@ -1 +1 @@ -module.exports = require('./lib/rabbit'); \ No newline at end of file +module.exports = require('./lib/rabbit'); diff --git a/lib/rabbit.js b/lib/rabbit.js index df89500..6f04006 100644 --- a/lib/rabbit.js +++ b/lib/rabbit.js @@ -1,7 +1,6 @@ -const events = require('events'), - util = require('util'), - Transport = require('@mydevices/transporter').Transport; +const util = require('util'); +const Transport = require('@mydevices/transporter').Transport; const amqp = require('amqplib'); // @@ -12,13 +11,15 @@ const amqp = require('amqplib'); // const Rabbit = function (options) { Transport.call(this, options); - this.options = options || { + + this.options = Object.assign({ uri: 'amqp://localhost', exchange: 'rabbit-transporter-exchange', - type: 'fanout' - }; + type: 'fanout' + }, options); this.channel = null; + this.failed = false; const self = this; amqp.connect(this.options.uri, { rejectUnauthorized: false }) @@ -27,11 +28,24 @@ const Rabbit = function (options) { }) .then((ch) => { self.channel = ch; + + ch.on('error', (err) => { + self.failed = true; + self.emit('error', err, self); + }); + ch.on('close', () => { + self.failed = true; + self.emit('error', new Error('AMQP channel closed'), self); + }); + return ch.assertExchange(self.options.exchange, self.options.type); }) - .then(console.log) - .catch(console.warn); - + .then(msg => self.emit('ready', msg)) + .catch(err => { + self.emit('error', err, self); + self.failed = true; + return; + }); }; // @@ -45,9 +59,14 @@ util.inherits(Rabbit, Transport); Rabbit.prototype.name = 'Rabbit'; Rabbit.prototype.publish = function(msg, callback) { - const self = this; - self.channel.publish(self.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + if (this.failed) { + this.emit('error', new Error('AMQP connection error'), this); + return callback(); + } else if (this.channel) { + this.channel.publish(this.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + } + return callback(null); } -exports.Rabbit = Rabbit; \ No newline at end of file +exports.Rabbit = Rabbit; From 029079361c4de2a6baf40f8e92eb597deb3de1b3 Mon Sep 17 00:00:00 2001 From: Hugo Cortes Date: Thu, 27 Dec 2018 13:45:33 -0800 Subject: [PATCH 2/2] fix: catch a closed channel error --- lib/rabbit.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/rabbit.js b/lib/rabbit.js index 6f04006..3a4f3d3 100644 --- a/lib/rabbit.js +++ b/lib/rabbit.js @@ -63,7 +63,10 @@ Rabbit.prototype.publish = function(msg, callback) { this.emit('error', new Error('AMQP connection error'), this); return callback(); } else if (this.channel) { - this.channel.publish(this.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + // catch a closed channel, let listener emit the event + try { + this.channel.publish(this.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + } catch (error) {} } return callback(null);