diff --git a/index.js b/index.js index 0b337c9..3279ccb 100644 --- a/index.js +++ b/index.js @@ -1 +1 @@ -module.exports = require('./lib/rabbit'); \ No newline at end of file +module.exports = require('./lib/rabbit'); diff --git a/lib/rabbit.js b/lib/rabbit.js index df89500..3a4f3d3 100644 --- a/lib/rabbit.js +++ b/lib/rabbit.js @@ -1,7 +1,6 @@ -const events = require('events'), - util = require('util'), - Transport = require('@mydevices/transporter').Transport; +const util = require('util'); +const Transport = require('@mydevices/transporter').Transport; const amqp = require('amqplib'); // @@ -12,13 +11,15 @@ const amqp = require('amqplib'); // const Rabbit = function (options) { Transport.call(this, options); - this.options = options || { + + this.options = Object.assign({ uri: 'amqp://localhost', exchange: 'rabbit-transporter-exchange', - type: 'fanout' - }; + type: 'fanout' + }, options); this.channel = null; + this.failed = false; const self = this; amqp.connect(this.options.uri, { rejectUnauthorized: false }) @@ -27,11 +28,24 @@ const Rabbit = function (options) { }) .then((ch) => { self.channel = ch; + + ch.on('error', (err) => { + self.failed = true; + self.emit('error', err, self); + }); + ch.on('close', () => { + self.failed = true; + self.emit('error', new Error('AMQP channel closed'), self); + }); + return ch.assertExchange(self.options.exchange, self.options.type); }) - .then(console.log) - .catch(console.warn); - + .then(msg => self.emit('ready', msg)) + .catch(err => { + self.emit('error', err, self); + self.failed = true; + return; + }); }; // @@ -45,9 +59,17 @@ util.inherits(Rabbit, Transport); Rabbit.prototype.name = 'Rabbit'; Rabbit.prototype.publish = function(msg, callback) { - const self = this; - self.channel.publish(self.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + if (this.failed) { + this.emit('error', new Error('AMQP connection error'), this); + return callback(); + } else if (this.channel) { + // catch a closed channel, let listener emit the event + try { + this.channel.publish(this.options.exchange, msg.bus, Buffer.from(JSON.stringify(msg))); + } catch (error) {} + } + return callback(null); } -exports.Rabbit = Rabbit; \ No newline at end of file +exports.Rabbit = Rabbit;