From 53612305e665ccec039dc11814c3ce99e93e55c9 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Fri, 1 Mar 2019 17:09:28 +0700 Subject: [PATCH 01/11] pub & pubQueue unit-test passed, reconnect skenario work, sub & subQueue is not ported yet --- lib/Rabbit.js | 2 +- lib/service_pub_queue_rabbit.js | 54 ++++++--------------------------- lib/service_pub_rabbit.js | 51 ++++++++++--------------------- lib/service_sub_queue_rabbit.js | 7 ----- lib/service_sub_rabbit.js | 7 ----- 5 files changed, 26 insertions(+), 95 deletions(-) diff --git a/lib/Rabbit.js b/lib/Rabbit.js index eaf3efb..cf45cd0 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -19,7 +19,7 @@ const defaultConfig = { * }, * publisherName?: string, // pub * pub?: { - * [eventName]: tringger, + * [eventName]: trigger, * }, * subQueue?: { * [eventName]: (payload) => void, diff --git a/lib/service_pub_queue_rabbit.js b/lib/service_pub_queue_rabbit.js index abc4631..b75456c 100644 --- a/lib/service_pub_queue_rabbit.js +++ b/lib/service_pub_queue_rabbit.js @@ -13,68 +13,32 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { this.config = config; this.logger = logger; this.injector = injector; - this.amqp = amqp; this.servicePubQueue = servicePubQueue; this.SERVICE_NAME = config.default("name", "unnamed-service"); this.VERSION = pack.version; this._status = "ok"; - this._initializing = false; - this._connection = null; - this._channels = {}; - this._registry = {}; this._namespace = config.default("service.rabbit.namespace", "default"); + this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this)); - this.amqp.on("connected", () => { - this.init(); - }); } *initialize() { - if (this.amqp.getConnection()) - this.init(); } - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - - let desc = this.config.default("service.queue.publish", {}); - - for (let service in desc) { - for (let event in desc[service]) { - this.createPublisher(service, event); - } - } - - Object.assign(this._registry, this.config.default("service.registry", {})); - - this._initializing = false; + *destroy() { } publishEvent(service, event, payload) { - let channel = this._channels[service] && this._channels[service][event]; - if (channel) { - let queueName = `${this._namespace}.queue.${service}.${event}`; - let content = JSON.stringify(payload); - channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); - } - } - - *createPublisher(service, event) { - let channel = yield this._connection.createChannel(); - let queueName = `${this._namespace}.queue.${service}.${event}`; - - if (!this._channels[service]) - this._channels[service] = {}; - - this._channels[service][event] = channel; - - yield channel.assertQueue(queueName, { durable: true }); + const rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pubQueue: {[service]: [event]}, + pubPayload: payload, + }); } info() { diff --git a/lib/service_pub_rabbit.js b/lib/service_pub_rabbit.js index a0d5bc3..8dd26d1 100644 --- a/lib/service_pub_rabbit.js +++ b/lib/service_pub_rabbit.js @@ -13,7 +13,6 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this.config = config; this.logger = logger; this.injector = injector; - this.amqp = amqp; this.servicePub = servicePub; this.SERVICE_NAME = config.default("name", "unnamed-service"); @@ -23,50 +22,32 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this._initializing = false; this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); + this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); + + this._eventList = []; + const publishedEvent = this.config.default("service.publish", {}); + for (const eventName in publishedEvent) { + this._eventList.push(eventName); + } + this._publishedEvent = publishedEvent; this.servicePub.on("trigger", this.publishEvent.bind(this)); - this.amqp.on("connected", () => { - this.init(); - }); } *initialize() { - if (this.amqp.getConnection()) - this.init(); } - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - this._eventList = []; - this._channels = {}; - - let desc = this.config.default("service.publish", {}); - for (let i in desc) { - this._eventList.push(i); - yield this.createPublisher(i, desc[i]); - } - - this._initializing = false; + *destroy() { } publishEvent(event, payload) { - let channel = this._channels[event]; - if (channel && this._eventList.includes(event)) { - let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event; - let content = JSON.stringify(payload); - channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); - } - } - - *createPublisher(event) { - let channel = yield this._connection.createChannel(); - let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event; - this._channels[event] = channel; - - yield channel.assertExchange(exchangeName, "fanout", { durable: true }); + const rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pub: {[event]: event ? this._publishedEvent[event] : ""}, + pubPayload: payload, + }); } info() { diff --git a/lib/service_sub_queue_rabbit.js b/lib/service_sub_queue_rabbit.js index 6d0731b..ee97c90 100644 --- a/lib/service_sub_queue_rabbit.js +++ b/lib/service_sub_queue_rabbit.js @@ -14,7 +14,6 @@ class ServiceSubRabbit extends ServiceSubQueue { this.config = config; this.logger = logger; this.injector = injector; - this.amqp = amqp; this.SERVICE_NAME = config.default("name", "unnamed-service"); this.VERSION = pack.version; @@ -23,15 +22,9 @@ class ServiceSubRabbit extends ServiceSubQueue { this._initializing = false; this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); - - this.amqp.on("connected", () => { - this.init(); - }); } *initialize() { - if (this.amqp.getConnection()) - this.init(); } *init() { diff --git a/lib/service_sub_rabbit.js b/lib/service_sub_rabbit.js index eae0a1d..4494cff 100644 --- a/lib/service_sub_rabbit.js +++ b/lib/service_sub_rabbit.js @@ -15,7 +15,6 @@ class ServiceSubRabbit extends ServiceSub { this.config = config; this.logger = logger; this.injector = injector; - this.amqp = amqp; this.servicePubRabbit = servicePubRabbit; this.SERVICE_NAME = config.default("name", "unnamed-service"); @@ -30,15 +29,9 @@ class ServiceSubRabbit extends ServiceSub { this._connection = null; this._secret = this.config.default("service.secret", null); this._namespace = config.default("service.rabbit.namespace", "default"); - - this.amqp.on("connected", () => { - this.init(); - }); } *initialize() { - if (this.amqp.getConnection()) - this.init(); } *init() { From 708ea89e6eb7146e2207e765193ffef66c228a5d Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Sat, 2 Mar 2019 17:22:25 +0700 Subject: [PATCH 02/11] works --- labs/rabbit-reconnect-test.js | 12 +- lib/Rabbit.js | 88 +++++---- lib/service_amqp.js | 64 ------- lib/service_sub_queue_rabbit.js | 87 ++++----- lib/service_sub_rabbit.js | 177 ++++++++---------- test/service_pub_queue_rabbit_test.js | 2 + test/service_pub_rabbit_test.js | 2 + test/service_sub_queue_rabbit_test.js | 7 +- ...service_sub_queue_reconnect_rabbit_test.js | 142 -------------- test/service_sub_rabbit_test.js | 7 +- 10 files changed, 193 insertions(+), 395 deletions(-) delete mode 100644 lib/service_amqp.js delete mode 100644 test/service_sub_queue_reconnect_rabbit_test.js diff --git a/labs/rabbit-reconnect-test.js b/labs/rabbit-reconnect-test.js index 16ebdfc..a68444f 100644 --- a/labs/rabbit-reconnect-test.js +++ b/labs/rabbit-reconnect-test.js @@ -21,8 +21,16 @@ async function main() { const rabbitSub = new Rabbit({ connection: { connectionString: rabbitUrl }, serviceName: "beta", - publisherName: "alpha", - sub: { pesanBakso: (payload) => console.log("Pesan Bakso", payload) }, + publisher: { + someRegistryName : { + pesanBakso: "alpha" + } + }, + sub: { + someRegistryName: { + pesanBakso: (payload) => console.log("Pesan Bakso", payload) + }, + }, }); // restart rabbit diff --git a/lib/Rabbit.js b/lib/Rabbit.js index cf45cd0..94152df 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -13,21 +13,27 @@ const defaultConfig = { namespace: "default", prefetch: 5, /* - * pubPayload?: {}, // pub + pubQueue - * pubQueue?: { - * [serviceName]: string[], // serviceName: array of events + * pubPayload?: {}, // Necessary for: pub + pubQueue + * pubQueue?: { // Necessary for: pubQueue + * [serviceName]: string[], // `eventName` * }, - * publisherName?: string, // pub - * pub?: { - * [eventName]: trigger, + * pub?: { // Necessary for: pub + * [eventName]: string, // `triggerName` * }, - * subQueue?: { + * subQueue?: { // Necessary for: subQueue * [eventName]: (payload) => void, * }, - * sub: { - * [eventName]: (payload) => void, + * publisher?: { // Necessary for: sub + * [registryName: string { + * [eventName]: string, + * } + * }, + * sub: { // Necessary for: sub + * [registryName: string { + * [eventName]: (payload) => void, + * } * }, - * subNotification: { + * subNotification: { // Necessary for: sub * [eventName]: (payload) => void, * }, */ @@ -61,14 +67,14 @@ async function sleep(delay) { class Rabbit { constructor(config = {}, logger = console) { - this.publishedQueueList = []; // queue name that has been published (to avoid redundant publication) + this._publishedQueueList = []; // queue name that has been published (to avoid redundant publication) this.config = getCompleteConfig(config); this.logger = logger; this.connectionString = getConnectionString(this.config); - this._initialize(); + this._run(); } - async _initialize() { + async _run() { try { // create connection const connection = await amqp.connect(this.connectionString); @@ -93,7 +99,7 @@ class Rabbit { this.logger.warn("Failed to connect to rmq.", error); await sleep(1000); this.logger.info("Attempting to reconnect to rmq."); - await this._initialize(); + await this._run(); } } @@ -108,7 +114,7 @@ class Rabbit { const triggerName = pub[eventName]; const exchangeName = `${namespace}.${serviceName}.${eventName}`; // if queue is already used for publication, then skip it - if (this.publishedQueueList.includes(exchangeName)) { + if (this._publishedQueueList.includes(exchangeName)) { continue; } const content = JSON.stringify(pubPayload); @@ -116,7 +122,7 @@ class Rabbit { await channel.assertExchange(exchangeName, "fanout", { durable: true }); channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); // note that the queue has already been used for publication - this.publishedQueueList.push(exchangeName); + this._publishedQueueList.push(exchangeName); } return true; } @@ -126,25 +132,28 @@ class Rabbit { return false; } // extract config - const { namespace, serviceName, sub, subNotification, publisherName } = this.config; + const { namespace, serviceName, sub, subNotification, publisher } = this.config; // create channel & listen to event - for (const eventName in sub) { - const callback = sub[eventName]; - const exchangeName = `${namespace}.${publisherName}.${eventName}`; - const queueName = `${namespace}.${publisherName}.${serviceName}.${eventName}`; - const channel = await connection.createChannel(); - await channel.assertQueue(queueName, {durable: true}); - await channel.assertExchange(exchangeName, "fanout", {durable: true}); - await channel.bindQueue(queueName, exchangeName, ""); - channel.consume(queueName, (message) => { - try { - const payload = JSON.parse(message.content.toString()); - callback(payload); - channel.ack(message); - } catch (error) { - channel.nack(message); - } - }); + for (const registryName in sub) { + for (const eventName in sub[registryName]) { + const publisherName = registryName in publisher && eventName in publisher[registryName] ? publisher[registryName][eventName] : "publisher"; + const callback = sub[registryName][eventName]; + const exchangeName = `${namespace}.${publisherName}.${eventName}`; + const queueName = `${namespace}.${publisherName}.${serviceName}.${eventName}`; + const channel = await connection.createChannel(); + await channel.assertQueue(queueName, {durable: true}); + await channel.assertExchange(exchangeName, "fanout", {durable: true}); + await channel.bindQueue(queueName, exchangeName, ""); + channel.consume(queueName, (message) => { + try { + const payload = JSON.parse(message.content.toString()); + callback(payload); + channel.ack(message); + } catch (error) { + channel.nack(message); + } + }); + } } // create notification channel & listen to event for (const eventName in subNotification) { @@ -170,22 +179,23 @@ class Rabbit { return false; } // extract config - const { namespace, pubQueue, pubPayload } = this.config; + const { namespace, prefetch, pubQueue, pubPayload } = this.config; // create channel & publish payload for (const serviceName in pubQueue) { const eventList = pubQueue[serviceName]; for (const eventName of eventList) { const queueName = `${namespace}.queue.${serviceName}.${eventName}`; // if queue is already used for publication, then skip it - if (this.publishedQueueList.includes(queueName)) { + if (this._publishedQueueList.includes(queueName)) { continue; } const content = JSON.stringify(pubPayload); const channel = await connection.createChannel(); + await channel.prefetch(prefetch); channel.assertQueue(queueName, {durable: true}); channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); // note that the queue has already been used for publication - this.publishedQueueList.push(queueName); + this._publishedQueueList.push(queueName); } } return true; @@ -196,16 +206,18 @@ class Rabbit { return false; } // extract config - const { namespace, serviceName, subQueue } = this.config; + const { namespace, prefetch, serviceName, subQueue } = this.config; // create channel & listen to event for (const eventName in subQueue) { const callback = subQueue[eventName]; const queueName = `${namespace}.queue.${serviceName}.${eventName}`; const channel = await connection.createChannel(); + await channel.prefetch(prefetch); channel.assertQueue(queueName, {durable: true}); channel.consume(queueName, (message) => { try { const payload = JSON.parse(message.content.toString()); + // console.error("RECEIVE", payload); callback(payload); channel.ack(message); } catch (error) { diff --git a/lib/service_amqp.js b/lib/service_amqp.js deleted file mode 100644 index cf110c2..0000000 --- a/lib/service_amqp.js +++ /dev/null @@ -1,64 +0,0 @@ -"use strict"; - -const amqp = require("amqplib"); -const sleep = require("then-sleep"); -const { Component, AsyncEmitter } = require("merapi"); - -module.exports = class Amqp extends Component.mixin(AsyncEmitter) { - - constructor(config, logger) { - super(); - this.config = config; - this.logger = logger; - this._connection; - this._initializing = false; - } - - initialize() { - return this.doConnect(); - } - - getConnection() { - return this._connection; - } - - *doConnect() { - if (this._initializing) return; - this._initializing = true; - let { secure, user, password, host, port, connectionString } = this.config.default("service.rabbit", { host: "localhost", port: 5672 }); - - let protocol = (typeof secure === "boolean" && secure) ? "amqps" : "amqp"; - - port = (protocol === "amqps") ? 5671 : 5672; - - if (!connectionString) { - if (user && password) { - connectionString = `${protocol}://${user}:${password}@${host}:${port}`; - } - else { - connectionString = `${protocol}://${host}:${port}`; - } - } - - try { - this._connection = yield amqp.connect(connectionString); - - this.logger.info("Connected to rmq."); - - this._connection.on("close", this.doConnect.bind(this)); - this._connection.on("error", this.doConnect.bind(this)); - - this.emit("connected"); - - this._initializing = false; - } catch (e) { - this._initializing = false; - - this.logger.warn("Failed to connect to rmq.", e); - yield sleep(3000); - this.logger.info("Attempting to reconnect to rmq."); - - yield this.doConnect(); - } - } -}; diff --git a/lib/service_sub_queue_rabbit.js b/lib/service_sub_queue_rabbit.js index ee97c90..07e7089 100644 --- a/lib/service_sub_queue_rabbit.js +++ b/lib/service_sub_queue_rabbit.js @@ -18,66 +18,53 @@ class ServiceSubRabbit extends ServiceSubQueue { this.SERVICE_NAME = config.default("name", "unnamed-service"); this.VERSION = pack.version; + this._initialized = false; this._status = "ok"; - this._initializing = false; - this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); + this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); + this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); + + // rawSubscribeConfig's structure: {[eventName: string]: string} + const rawSubscribeConfig = this.config.default("service.queue.subscribe", {}); + // expected structure of `this._subscribeConfig: {[eventName: string]: Function} + this._subscribeConfig = {}; + + // fill `this._subscribeConfig` by resolving the methods. This operation is promise based. + let subscribeConfigPromise = Promise.resolve(true); + for (const event in rawSubscribeConfig) { + const methodName = rawSubscribeConfig[event]; + subscribeConfigPromise = subscribeConfigPromise + .then(() => { + return this.injector.resolveMethod(methodName); + }) + .then((callback) => { + this._subscribeConfig[event] = callback; + }); + } + + // resolve the promise and init + subscribeConfigPromise.then(() => { + this.init(); + }); } *initialize() { } - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - - this._subscribers = {}; - this._channels = {}; - this._queues = []; - - let desc = this.config.default("service.queue.subscribe", {}); - let prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); - - for (let event in desc) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._channels[event] = channel; - - let method = yield this.injector.resolveMethod(desc[event]); - this.createQueue(event, method); - } + *destroy() { } - *createQueue(event, method) { - let channel = this._channels[event]; - - if (channel) { - let queueName = `${this._namespace}.queue.${this.SERVICE_NAME}.${event}`; - - yield channel.assertQueue(queueName, { durable: true }); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = yield this.runMethod(method, payload); - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - }.bind(this))); - + init() { + if (this._initialized) { + return true; } - } - - *runMethod(method, payload) { - let ret = method(payload); - if (utils.isPromise(ret)) yield ret; - - return ret; + this._initialized = true; + const rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + subQueue: this._subscribeConfig, + }); } extension() { diff --git a/lib/service_sub_rabbit.js b/lib/service_sub_rabbit.js index 4494cff..7304bcd 100644 --- a/lib/service_sub_rabbit.js +++ b/lib/service_sub_rabbit.js @@ -25,121 +25,104 @@ class ServiceSubRabbit extends ServiceSub { this.remainingAttempts = this.maxAttempts - 1; this._status = "ok"; - this._initializing = false; + this._initialized = false; this._connection = null; this._secret = this.config.default("service.secret", null); this._namespace = config.default("service.rabbit.namespace", "default"); + + this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); + this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); + + // rawSubscribeConfig's structure: + // { + // [registryName: string]: { + // [eventName: string]: string + // } + // } + this._rawSubscribeConfig = this.config.default("service.subscribe", {}); + // rawNotificationConfig's structure: {[eventName: string]: string} + this._rawNotificationConfig = this.config.default("service.notification.subscribe", {}); + // expected structure of `this._subscribeConfig`: + // { + // [registryName: string]: { + // [eventName: string]: Function + // } + // } + this._subscribeConfig = {}; + // expected structure of `this._notificationConfig`: {[eventName: string]: Function} + this._notificationConfig = {}; + + this.init(); + } *initialize() { } - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); + *destroy() { + } - this._subscribers = {}; - this._channels = {}; - this._notificationChannels = {}; - this._queues = []; + *init() { + if (this._initialized) { + return true; + } + this._initialized = true; - let desc = this.config.default("service.subscribe", {}); - let prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); Object.assign(this._registry, this.config.default("service.registry", {})); - for (let i in desc) { - this._subscriptions[i] = {}; - - let info = yield this.getServiceInfo(i); - let rabbitAvailable = info ? Object.keys(info.modules).some(key => key == "pub-rabbit") : false; - - for (let event in desc[i]) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._channels[event] = channel; - - let hook = i + "." + event; - let method = yield this.injector.resolveMethod(desc[i][event]); - this._subscriptions[i][event] = { hook, method }; - this._hooks.push(hook); - - if (rabbitAvailable) { - let publisherName = info.name || "publisher"; - yield this.createQueue(publisherName, event, method); - } - else { - this.createHook(hook, method); - } + const publisher = {}; + this._queues = []; + this._subscriptions = {}; + this._hooks = []; + + // fill `this._subscribeConfig` by resolving the methods. This operation is promise based. + let subscribeAndNotificationConfigPromise = Promise.resolve(true); + for (const registryName in this._rawSubscribeConfig) { + const info = yield this.getServiceInfo(registryName); + const rabbitAvailable = info ? Object.keys(info.modules).some(key => key == "pub-rabbit") : false; + publisher[registryName] = {}; + this._subscriptions[registryName] = {}; + this._subscribeConfig[registryName] = {}; + for (const eventName in this._rawSubscribeConfig[registryName]) { + const methodName = this._rawSubscribeConfig[registryName][eventName]; + const hook = registryName + "." + eventName; + publisher[registryName][eventName] = info.name || "publisher"; + this._queues.push(`${this._namespace}.${publisher[registryName][eventName]}.${this.SERVICE_NAME}.${eventName}`); + subscribeAndNotificationConfigPromise = subscribeAndNotificationConfigPromise + .then(() => { + return this.injector.resolveMethod(methodName); + }) + .then((callback) => { + this._subscriptions[registryName][eventName] = { hook, callback }; + this._hooks.push(hook); + this._subscribeConfig[registryName][eventName] = callback; + }); } } - let notification = this.config.default("service.notification.subscribe", {}); - for (let event in notification) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._notificationChannels[event] = channel; - - let method = yield this.injector.resolveMethod(notification[event]); - yield this.createNotificationQueue(event, method); - } - } - - *createNotificationQueue(event, method) { - let channel = this._channels[event]; - let queueName = `${this._namespace}.${this.SERVICE_NAME}.${event}`; - - yield channel.assertQueue(queueName, { durable: true }); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = method(payload); - - if (utils.isPromise(ret)) - yield ret; - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - })); - } - - *createQueue(publisherName, event, method) { - let channel = this._channels[event]; - - if (channel) { - let queueName = this._namespace + "." + publisherName + "." + this.SERVICE_NAME + "." + event; - let exchangeName = this._namespace + "." + publisherName + "." + event; - - yield channel.assertQueue(queueName, { durable: true }); - yield channel.assertExchange(exchangeName, "fanout", { durable: true }); - yield channel.bindQueue(queueName, exchangeName, ""); - this._queues.push(queueName); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = yield this.runMethod(method, payload); - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - }.bind(this))); - + // fill `this._notificationConfig` by resolving the methods. This operation is promise based. + for (const eventName in this._rawNotificationConfig) { + const methodName = this._rawnotificationConfig[eventName]; + subscribeAndNotificationConfigPromise = subscribeAndNotificationConfigPromise + .then(() => { + return this.injector.resolveMethod(methodName); + }) + .then((callback) => { + this._notificationConfig[eventName] = callback; + }); } - } - *runMethod(method, payload) { - let ret = method(payload); - if (utils.isPromise(ret)) yield ret; + subscribeAndNotificationConfigPromise.then(() => { + const rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + sub: this._subscribeConfig, + subNotification: this._notificationConfig, + publisher + }); + }); - return ret; } retryStrategy(err, response) { diff --git a/test/service_pub_queue_rabbit_test.js b/test/service_pub_queue_rabbit_test.js index 95bcf37..f544e30 100644 --- a/test/service_pub_queue_rabbit_test.js +++ b/test/service_pub_queue_rabbit_test.js @@ -121,12 +121,14 @@ describe("Merapi Plugin Service: Queue Publisher", function() { expect(triggerB).to.not.be.null; })); + /* No need to create queue now, let's just make it when it is really needed it("should create queue", async(function*() { const inQueueResult = yield channel.checkQueue("default.queue.subscriber.in_queue_publisher_test"); const outQueueResult = yield channel.checkQueue("default.queue.subscriber.out_queue_publisher_test"); expect(inQueueResult.queue).to.equal("default.queue.subscriber.in_queue_publisher_test"); expect(outQueueResult.queue).to.equal("default.queue.subscriber.out_queue_publisher_test"); })); + */ }); describe("when publishing event", function() { diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index c82c37d..1ddace2 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -127,6 +127,7 @@ describe("Merapi Plugin Service: Publisher", function() { ); }); + /* No need to create exchange now, let's just make it when it is really needed it("should create exchanges", async(function*() { yield channel.checkExchange( "default.publisher.incoming_message_publisher_test" @@ -135,6 +136,7 @@ describe("Merapi Plugin Service: Publisher", function() { "default.publisher.outgoing_message_publisher_test" ); })); + */ }); describe("when publishing event", function() { diff --git a/test/service_sub_queue_rabbit_test.js b/test/service_sub_queue_rabbit_test.js index 5f2417e..54f1791 100644 --- a/test/service_sub_queue_rabbit_test.js +++ b/test/service_sub_queue_rabbit_test.js @@ -166,7 +166,8 @@ describe("Merapi Plugin Service: Queue Subscriber", function() { }); describe("when subscribing event", function() { - it("should distribute accross all subscribers using round robin method", async(function*() { + it("should distribute accross all subscribers", async(function*() { + // it("should distribute accross all subscribers using round robin method", async(function*() { this.timeout(5000); let trigger = yield publisherContainer.resolve("inQueuePublisherTest"); @@ -176,8 +177,12 @@ describe("Merapi Plugin Service: Queue Subscriber", function() { } yield sleep(3000); + const allMessage = messageA.concat(messageB).sort(); + expect(allMessage).to.deep.equal([0,1,2,3,4]); + /* expect(messageA).to.deep.equal([0, 2, 4]); expect(messageB).to.deep.equal([1, 3]); + */ })); }); }); diff --git a/test/service_sub_queue_reconnect_rabbit_test.js b/test/service_sub_queue_reconnect_rabbit_test.js deleted file mode 100644 index d928aba..0000000 --- a/test/service_sub_queue_reconnect_rabbit_test.js +++ /dev/null @@ -1,142 +0,0 @@ -"use strict"; - -const chai = require("chai"); -const expect = chai.expect; -const request = require("supertest"); -const sleep = require("then-sleep"); -const amqplib = require("amqplib"); - -const merapi = require("merapi"); -const { async, Component } = require("merapi"); - -const { rabbitConnection, rabbitUrl } = require("./configuration.js"); - -/* eslint-env mocha */ - -describe("Merapi Plugin Service: Queue Subscriber", function() { - let publisherContainer, subscriberAContainer, subscriberBContainer; - let service = {}; - let connection = {}; - let channel = {}; - let messageA = []; - - beforeEach(async(function*() { - this.timeout(5000); - - let publisherConfig = { - name: "publisher", - version: "1.0.0", - main: "mainCom", - plugins: ["service"], - service: { - rabbit: rabbitConnection, - queue: { - publish: { - subscriber: { - sub_queue_reconnect_publisher_test: "inQueuePublisherTest", - }, - }, - }, - port: 5135, - }, - }; - - let subscriberConfig = { - name: "subscriber", - version: "1.0.0", - main: "mainCom", - plugins: ["service"], - service: { - rabbit: rabbitConnection, - queue: { - subscribe: { - sub_queue_reconnect_publisher_test: "mainCom.handleIncomingMessage", - }, - }, - }, - }; - - publisherContainer = merapi({ - basepath: __dirname, - config: publisherConfig, - }); - - publisherContainer.registerPlugin( - "service-rabbit", - require("../index.js")(publisherContainer) - ); - publisherContainer.register( - "mainCom", - class MainCom extends Component { - start() {} - } - ); - yield publisherContainer.start(); - - subscriberConfig.service.port = 5212; - subscriberAContainer = merapi({ - basepath: __dirname, - config: subscriberConfig, - }); - - subscriberAContainer.registerPlugin( - "service-rabbit", - require("../index.js")(subscriberAContainer) - ); - subscriberAContainer.register( - "mainCom", - class MainCom extends Component { - start() {} - *handleIncomingMessage(payload) { - messageA.push(payload); - } - } - ); - yield subscriberAContainer.start(); - - service = yield subscriberAContainer.resolve("service"); - connection = yield amqplib.connect(rabbitUrl); - channel = yield connection.createChannel(); - - yield sleep(100); - })); - - afterEach(async(function*() { - yield subscriberAContainer.stop(); - yield channel.close(); - yield connection.close(); - })); - - describe("Subscriber service", function() { - - describe("when subscribing event", function() { - it("published event should be caught", async(function*() { - this.timeout(5000); - let trigger = yield publisherContainer.resolve("inQueuePublisherTest"); - - // send "0" - yield sleep(100); - yield trigger(0); - yield sleep(1000); - // messageA should be [0] - expect(messageA).to.deep.equal([0]); - - // emulate broken connection - yield channel.close(); - yield connection.close(); - // reconnect - connection = yield amqplib.connect(rabbitUrl); - channel = yield connection.createChannel(); - - // send "1" - yield sleep(100); - yield trigger(1); - yield sleep(1000); - // messageA should be [1] - expect(messageA).to.deep.equal([0, 1]); - - })); - }); - - }); -}); diff --git a/test/service_sub_rabbit_test.js b/test/service_sub_rabbit_test.js index 7ccbea7..e1dbc4f 100644 --- a/test/service_sub_rabbit_test.js +++ b/test/service_sub_rabbit_test.js @@ -174,7 +174,8 @@ describe("Merapi Plugin Service: Subscriber", function() { }); describe("when subscribing event", function() { - it("should distribute accross all subscribers using round robin method", async(function*() { + it("should distribute accross all subscribers", async(function*() { + // it("should distribute accross all subscribers using round robin method", async(function*() { this.timeout(5000); let trigger = yield publisherContainer.resolve( "triggerIncomingMessageSubscriberTest" @@ -186,8 +187,12 @@ describe("Merapi Plugin Service: Subscriber", function() { } yield sleep(3000); + const allMessage = messageA.concat(messageB).sort(); + expect(allMessage).to.deep.equal([0,1,2,3,4]); + /* expect(messageA).to.deep.equal([0, 2, 4]); expect(messageB).to.deep.equal([1, 3]); + */ })); }); }); From c9b7ba0d74af90d680d0acf7cf079b731d9e7692 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Mon, 11 Mar 2019 19:17:05 +0700 Subject: [PATCH 03/11] singleton model --- labs/amqplib-reconnect-test.js | 79 ------- lib/Rabbit.js | 89 +++++--- lib/service_pub_queue_rabbit.js | 19 +- lib/service_pub_rabbit.js | 19 +- lib/service_sub_queue_rabbit.js | 15 +- lib/service_sub_rabbit.js | 20 +- old-lib/service_amqp.js | 64 ------ old-lib/service_pub_queue_rabbit.js | 98 --------- old-lib/service_pub_rabbit.js | 100 --------- old-lib/service_sub_queue_rabbit.js | 97 --------- old-lib/service_sub_rabbit.js | 206 ------------------ .../rabbit-reconnect-test.js | 18 +- 12 files changed, 111 insertions(+), 713 deletions(-) delete mode 100644 labs/amqplib-reconnect-test.js delete mode 100644 old-lib/service_amqp.js delete mode 100644 old-lib/service_pub_queue_rabbit.js delete mode 100644 old-lib/service_pub_rabbit.js delete mode 100644 old-lib/service_sub_queue_rabbit.js delete mode 100644 old-lib/service_sub_rabbit.js rename {labs => reconnect-example}/rabbit-reconnect-test.js (80%) diff --git a/labs/amqplib-reconnect-test.js b/labs/amqplib-reconnect-test.js deleted file mode 100644 index ec64027..0000000 --- a/labs/amqplib-reconnect-test.js +++ /dev/null @@ -1,79 +0,0 @@ -const { execSync } = require("child_process"); -const amqp = require("amqplib"); - -async function sleep(delay) { - return new Promise((resolve, reject) => { - setTimeout(resolve, delay); - }); -} - -async function createChannel(config) { - const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config); - try { - // create connection - const connection = await amqp.connect(url); - let channel = null; - connection._channels = []; - connection.on("error", (error) => { - console.error("Connection error : ", config, error); - }); - connection.on("close", async (error) => { - if (channel) { - channel.close(); - } - console.error("Connection close : ", config, error); - await sleep(1000); - createChannel(config); - }); - // create channel - channel = await connection.createConfirmChannel(); - channel.on("error", (error) => { - console.error("Channel error : ", config, error); - }); - channel.on("close", (error) => { - console.error("Channel close : ", config, error); - }); - // register listeners - for (queue in listeners) { - const callback = listeners[queue]; - channel.assertQueue(queue, { durable: false }); - channel.consume(queue, callback); - } - // publish - for (queue in publishers) { - const message = publishers[queue]; - channel.assertQueue(queue, { durable: false }); - channel.sendToQueue(queue, message); - } - return channel; - } catch (error) { - console.error("Create connection error : ", error); - await sleep(1000); - createChannel(config); - } -} - -async function main() { - const channelPublish = await createChannel({ - url: "amqp://root:toor@0.0.0.0:5672", - publishers: { - "queue": Buffer.from("hello"), - } - }); - - execSync("docker stop rabbitmq"); - execSync("docker start rabbitmq"); - - const channelConsume = await createChannel({ - url: "amqp://root:toor@0.0.0.0:5672", - listeners: { - "queue": (message) => { - console.log("Receive message ", message.content.toString()); - }, - } - }); - - return true; -} - -main().catch((error) => console.error(error)); diff --git a/lib/Rabbit.js b/lib/Rabbit.js index 94152df..8e10326 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -68,47 +68,64 @@ class Rabbit { constructor(config = {}, logger = console) { this._publishedQueueList = []; // queue name that has been published (to avoid redundant publication) - this.config = getCompleteConfig(config); - this.logger = logger; - this.connectionString = getConnectionString(this.config); + this._connection = null; + this._config = getCompleteConfig(config); + this._logger = logger; + this._connectionString = getConnectionString(this._config); this._run(); } + async publish(payload) { + if (this._connection) { + this._config.pubPayload = payload; + this._publishedQueueList = []; + await this._pubQueue(this._connection); + await this._pub(this._connection); + } else { + await sleep(1000); + await this.publish(payload); + } + } + async _run() { try { // create connection - const connection = await amqp.connect(this.connectionString); - this.logger.info("Connected to rmq."); + const connection = await amqp.connect(this._connectionString); + this._logger.info("Connected to rmq."); connection.on("error", async (error) => { - this.logger.error("Connection error : ", error); + this._connection = null; + this._logger.error("Connection error : ", error); await sleep(1000); - await this._initialize(); + await this._run(); }); connection.on("close", async (error) => { - this.logger.error("Connection close : ", error); + this._connection = null; + this._logger.error("Connection close : ", error); await sleep(1000); - await this._initialize(); + await this._run(); }); + this._connection = connection; // process pubSub handler + await this._subQueue(this._connection); + await this._sub(this._connection); await this._pubQueue(connection); - await this._subQueue(connection); await this._pub(connection); - await this._sub(connection); // add to channels } catch (error) { - this.logger.warn("Failed to connect to rmq.", error); + this._connection = null; + this._logger.warn("Failed to connect to rmq.", error); await sleep(1000); - this.logger.info("Attempting to reconnect to rmq."); + this._logger.info("Attempting to reconnect to rmq."); await this._run(); } } async _pub(connection) { - if (!this.config.pub) { + if (!this._config.pub || !this._connection) { return false; } // extract config - const { namespace, pub, pubPayload, serviceName } = this.config; + const { namespace, pub, pubPayload, serviceName } = this._config; // create channel & publish payload for (const eventName in pub) { const triggerName = pub[eventName]; @@ -117,22 +134,24 @@ class Rabbit { if (this._publishedQueueList.includes(exchangeName)) { continue; } - const content = JSON.stringify(pubPayload); - const channel = await connection.createChannel(); - await channel.assertExchange(exchangeName, "fanout", { durable: true }); - channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); - // note that the queue has already been used for publication - this._publishedQueueList.push(exchangeName); + if ("pubPayload" in this._config) { + const content = JSON.stringify(pubPayload); + const channel = await connection.createChannel(); + await channel.assertExchange(exchangeName, "fanout", { durable: true }); + channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); + // note that the queue has already been used for publication + this._publishedQueueList.push(exchangeName); + } } return true; } async _sub(connection) { - if (!this.config.sub) { + if (!this._config.sub) { return false; } // extract config - const { namespace, serviceName, sub, subNotification, publisher } = this.config; + const { namespace, serviceName, sub, subNotification, publisher } = this._config; // create channel & listen to event for (const registryName in sub) { for (const eventName in sub[registryName]) { @@ -175,11 +194,11 @@ class Rabbit { } async _pubQueue(connection) { - if (!this.config.pubQueue) { + if (!this._config.pubQueue || !this._connection) { return false; } // extract config - const { namespace, prefetch, pubQueue, pubPayload } = this.config; + const { namespace, prefetch, pubQueue, pubPayload } = this._config; // create channel & publish payload for (const serviceName in pubQueue) { const eventList = pubQueue[serviceName]; @@ -189,24 +208,26 @@ class Rabbit { if (this._publishedQueueList.includes(queueName)) { continue; } - const content = JSON.stringify(pubPayload); - const channel = await connection.createChannel(); - await channel.prefetch(prefetch); - channel.assertQueue(queueName, {durable: true}); - channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); - // note that the queue has already been used for publication - this._publishedQueueList.push(queueName); + if ("pubPayload" in this._config) { + const content = JSON.stringify(pubPayload); + const channel = await connection.createChannel(); + await channel.prefetch(prefetch); + channel.assertQueue(queueName, {durable: true}); + channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); + // note that the queue has already been used for publication + this._publishedQueueList.push(queueName); + } } } return true; } async _subQueue(connection) { - if (!this.config.subQueue) { + if (!this._config.subQueue) { return false; } // extract config - const { namespace, prefetch, serviceName, subQueue } = this.config; + const { namespace, prefetch, serviceName, subQueue } = this._config; // create channel & listen to event for (const eventName in subQueue) { const callback = subQueue[eventName]; diff --git a/lib/service_pub_queue_rabbit.js b/lib/service_pub_queue_rabbit.js index b75456c..f8f10bd 100644 --- a/lib/service_pub_queue_rabbit.js +++ b/lib/service_pub_queue_rabbit.js @@ -18,6 +18,7 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { this.SERVICE_NAME = config.default("name", "unnamed-service"); this.VERSION = pack.version; + this._rabbit = null; this._status = "ok"; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); @@ -32,13 +33,17 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { } publishEvent(service, event, payload) { - const rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - pubQueue: {[service]: [event]}, - pubPayload: payload, - }); + if (!this._rabbit) { + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pubQueue: {[service]: [event]}, + pubPayload: payload, + }); + } else { + this._rabbit.publish(payload); + } } info() { diff --git a/lib/service_pub_rabbit.js b/lib/service_pub_rabbit.js index 8dd26d1..e3dd445 100644 --- a/lib/service_pub_rabbit.js +++ b/lib/service_pub_rabbit.js @@ -19,6 +19,7 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this.VERSION = pack.version; this._status = "ok"; + this._rabbit = null; this._initializing = false; this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); @@ -41,13 +42,17 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { } publishEvent(event, payload) { - const rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - pub: {[event]: event ? this._publishedEvent[event] : ""}, - pubPayload: payload, - }); + if (!this._rabbit) { + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pub: {[event]: event ? this._publishedEvent[event] : ""}, + pubPayload: payload, + }); + } else { + this._rabbit.publish(payload); + } } info() { diff --git a/lib/service_sub_queue_rabbit.js b/lib/service_sub_queue_rabbit.js index 07e7089..c467216 100644 --- a/lib/service_sub_queue_rabbit.js +++ b/lib/service_sub_queue_rabbit.js @@ -19,6 +19,7 @@ class ServiceSubRabbit extends ServiceSubQueue { this.VERSION = pack.version; this._initialized = false; + this._rabbit = null; this._status = "ok"; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); @@ -59,12 +60,14 @@ class ServiceSubRabbit extends ServiceSubQueue { return true; } this._initialized = true; - const rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - subQueue: this._subscribeConfig, - }); + if (!this._rabbit) { + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + subQueue: this._subscribeConfig, + }); + } } extension() { diff --git a/lib/service_sub_rabbit.js b/lib/service_sub_rabbit.js index 7304bcd..fa08405 100644 --- a/lib/service_sub_rabbit.js +++ b/lib/service_sub_rabbit.js @@ -25,8 +25,8 @@ class ServiceSubRabbit extends ServiceSub { this.remainingAttempts = this.maxAttempts - 1; this._status = "ok"; + this._rabbit = null; this._initialized = false; - this._connection = null; this._secret = this.config.default("service.secret", null); this._namespace = config.default("service.rabbit.namespace", "default"); @@ -113,14 +113,16 @@ class ServiceSubRabbit extends ServiceSub { } subscribeAndNotificationConfigPromise.then(() => { - const rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - sub: this._subscribeConfig, - subNotification: this._notificationConfig, - publisher - }); + if (!this._rabbit) { + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + sub: this._subscribeConfig, + subNotification: this._notificationConfig, + publisher + }); + } }); } diff --git a/old-lib/service_amqp.js b/old-lib/service_amqp.js deleted file mode 100644 index cf110c2..0000000 --- a/old-lib/service_amqp.js +++ /dev/null @@ -1,64 +0,0 @@ -"use strict"; - -const amqp = require("amqplib"); -const sleep = require("then-sleep"); -const { Component, AsyncEmitter } = require("merapi"); - -module.exports = class Amqp extends Component.mixin(AsyncEmitter) { - - constructor(config, logger) { - super(); - this.config = config; - this.logger = logger; - this._connection; - this._initializing = false; - } - - initialize() { - return this.doConnect(); - } - - getConnection() { - return this._connection; - } - - *doConnect() { - if (this._initializing) return; - this._initializing = true; - let { secure, user, password, host, port, connectionString } = this.config.default("service.rabbit", { host: "localhost", port: 5672 }); - - let protocol = (typeof secure === "boolean" && secure) ? "amqps" : "amqp"; - - port = (protocol === "amqps") ? 5671 : 5672; - - if (!connectionString) { - if (user && password) { - connectionString = `${protocol}://${user}:${password}@${host}:${port}`; - } - else { - connectionString = `${protocol}://${host}:${port}`; - } - } - - try { - this._connection = yield amqp.connect(connectionString); - - this.logger.info("Connected to rmq."); - - this._connection.on("close", this.doConnect.bind(this)); - this._connection.on("error", this.doConnect.bind(this)); - - this.emit("connected"); - - this._initializing = false; - } catch (e) { - this._initializing = false; - - this.logger.warn("Failed to connect to rmq.", e); - yield sleep(3000); - this.logger.info("Attempting to reconnect to rmq."); - - yield this.doConnect(); - } - } -}; diff --git a/old-lib/service_pub_queue_rabbit.js b/old-lib/service_pub_queue_rabbit.js deleted file mode 100644 index 5edb415..0000000 --- a/old-lib/service_pub_queue_rabbit.js +++ /dev/null @@ -1,98 +0,0 @@ -"use strict"; - -const { Component, AsyncEmitter } = require("merapi"); -const pack = require("../package"); - -class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { - - constructor(config, logger, injector, amqp, servicePubQueue) { - super(); - - this.config = config; - this.logger = logger; - this.injector = injector; - this.amqp = amqp; - this.servicePubQueue = servicePubQueue; - - this.SERVICE_NAME = config.default("name", "unnamed-service"); - this.VERSION = pack.version; - - this._status = "ok"; - this._initializing = false; - this._connection = null; - this._channels = {}; - this._registry = {}; - this._namespace = config.default("service.rabbit.namespace", "default"); - - this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this)); - this.amqp.on("connected", () => { - this.init(); - }); - } - - *initialize() { - if (this.amqp.getConnection()) - this.init(); - } - - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - - let desc = this.config.default("service.queue.publish", {}); - - for (let service in desc) { - for (let event in desc[service]) { - this.createPublisher(service, event); - } - } - - Object.assign(this._registry, this.config.default("service.registry", {})); - - this._initializing = false; - } - - publishEvent(service, event, payload) { - let channel = this._channels[service] && this._channels[service][event]; - if (channel) { - let queueName = `${this._namespace}.queue.${service}.${event}`; - let content = JSON.stringify(payload); - channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); - } - } - - *createPublisher(service, event) { - let channel = yield this._connection.createChannel(); - let queueName = `${this._namespace}.queue.${service}.${event}`; - - if (!this._channels[service]) - this._channels[service] = {}; - - this._channels[service][event] = channel; - - yield channel.assertQueue(queueName, { durable: true }); - } - - info() { - return { - version: this.VERSION, - status: this._status - }; - } - - status() { - return this._status; - } - - extension() { - return {}; - } - - *mount() { } - - *unmount() { } -} - -module.exports = ServicePubQueueRabbit; diff --git a/old-lib/service_pub_rabbit.js b/old-lib/service_pub_rabbit.js deleted file mode 100644 index 4ebe5a6..0000000 --- a/old-lib/service_pub_rabbit.js +++ /dev/null @@ -1,100 +0,0 @@ -"use strict"; - -const { Component, AsyncEmitter } = require("merapi"); -const pack = require("../package"); - -class ServicePubRabbit extends Component.mixin(AsyncEmitter) { - - constructor(config, logger, injector, amqp, servicePub) { - super(); - - this.config = config; - this.logger = logger; - this.injector = injector; - this.amqp = amqp; - this.servicePub = servicePub; - - this.SERVICE_NAME = config.default("name", "unnamed-service"); - this.VERSION = pack.version; - - this._status = "ok"; - this._initializing = false; - this._connection = null; - this._namespace = config.default("service.rabbit.namespace", "default"); - - this.servicePub.on("trigger", this.publishEvent.bind(this)); - this.amqp.on("connected", () => { - this.init(); - }); - } - - *initialize() { - if (this.amqp.getConnection()) - this.init(); - } - - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - this._eventList = []; - this._channels = {}; - - let desc = this.config.default("service.publish", {}); - for (let i in desc) { - this._eventList.push(i); - yield this.createPublisher(i, desc[i]); - } - - this._initializing = false; - } - - publishEvent(event, payload) { - let channel = this._channels[event]; - if (channel && this._eventList.includes(event)) { - let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event; - let content = JSON.stringify(payload); - channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); - } - } - - *createPublisher(event) { - let channel = yield this._connection.createChannel(); - let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event; - this._channels[event] = channel; - - yield channel.assertExchange(exchangeName, "fanout", { durable: true }); - } - - info() { - return { - version: this.VERSION, - status: this._status - }; - } - - status() { - return this._status; - } - - extension() { - return { - exchanges: this._eventList.map((event) => this.SERVICE_NAME + "." + event) - }; - } - - *mount(service) { - service; - } - - *unmount(service) { - service; - } - - getEventList() { - return this._eventList; - } -} - -module.exports = ServicePubRabbit; diff --git a/old-lib/service_sub_queue_rabbit.js b/old-lib/service_sub_queue_rabbit.js deleted file mode 100644 index 5892808..0000000 --- a/old-lib/service_sub_queue_rabbit.js +++ /dev/null @@ -1,97 +0,0 @@ -"use strict"; - -const { async, utils } = require("merapi"); -const ServiceSubQueue = require("merapi-plugin-service/lib/service_sub_queue"); -const pack = require("../package"); - -class ServiceSubRabbit extends ServiceSubQueue { - - constructor(config, logger, injector, amqp) { - super(config, logger, injector); - - this.config = config; - this.logger = logger; - this.injector = injector; - this.amqp = amqp; - - this.SERVICE_NAME = config.default("name", "unnamed-service"); - this.VERSION = pack.version; - - this._status = "ok"; - this._initializing = false; - this._connection = null; - this._namespace = config.default("service.rabbit.namespace", "default"); - - this.amqp.on("connected", () => { - this.init(); - }); - } - - *initialize() { - if (this.amqp.getConnection()) - this.init(); - } - - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - - this._subscribers = {}; - this._channels = {}; - this._queues = []; - - let desc = this.config.default("service.queue.subscribe", {}); - let prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); - - for (let event in desc) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._channels[event] = channel; - - let method = yield this.injector.resolveMethod(desc[event]); - this.createQueue(event, method); - } - } - - *createQueue(event, method) { - let channel = this._channels[event]; - - if (channel) { - let queueName = `${this._namespace}.queue.${this.SERVICE_NAME}.${event}`; - - yield channel.assertQueue(queueName, { durable: true }); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = yield this.runMethod(method, payload); - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - }.bind(this))); - - } - } - - *runMethod(method, payload) { - let ret = method(payload); - if (utils.isPromise(ret)) yield ret; - - return ret; - } - - extension() { - return {}; - } - - *mount() { } - - *unmount() { } -} - -module.exports = ServiceSubRabbit; diff --git a/old-lib/service_sub_rabbit.js b/old-lib/service_sub_rabbit.js deleted file mode 100644 index 9dc8ea0..0000000 --- a/old-lib/service_sub_rabbit.js +++ /dev/null @@ -1,206 +0,0 @@ -"use strict"; - -const { async, utils } = require("merapi"); -const ServiceSub = require("merapi-plugin-service/lib/service_sub"); -const pack = require("../package"); -const request = require("requestretry"); - -class ServiceSubRabbit extends ServiceSub { - - constructor(config, logger, injector, amqp, servicePubRabbit) { - super(config, logger, injector); - - this.config = config; - this.logger = logger; - this.injector = injector; - this.amqp = amqp; - this.servicePubRabbit = servicePubRabbit; - - this.SERVICE_NAME = config.default("name", "unnamed-service"); - this.VERSION = pack.version; - - this.maxAttempts = this.config.default("service.rabbit.maxAttempts", 5); - this.retryDelay = this.config.default("service.rabbit.retryDelay", 5000); - this.remainingAttempts = this.maxAttempts - 1; - - this._status = "ok"; - this._initializing = false; - this._connection = null; - this._secret = this.config.default("service.secret", null); - this._namespace = config.default("service.rabbit.namespace", "default"); - - this.amqp.on("connected", () => { - this.init(); - }); - } - - *initialize() { - if (this.amqp.getConnection()) - this.init(); - } - - *init() { - if (this._initializing) return; - this._initializing = true; - - this._connection = this.amqp.getConnection(); - - this._subscribers = {}; - this._channels = {}; - this._notificationChannels = {}; - this._queues = []; - - let desc = this.config.default("service.subscribe", {}); - let prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); - Object.assign(this._registry, this.config.default("service.registry", {})); - - for (let i in desc) { - this._subscriptions[i] = {}; - - let info = yield this.getServiceInfo(i); - let rabbitAvailable = info ? Object.keys(info.modules).some(key => key == "pub-rabbit") : false; - - for (let event in desc[i]) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._channels[event] = channel; - - let hook = i + "." + event; - let method = yield this.injector.resolveMethod(desc[i][event]); - this._subscriptions[i][event] = { hook, method }; - this._hooks.push(hook); - - if (rabbitAvailable) { - let publisherName = info.name || "publisher"; - yield this.createQueue(publisherName, event, method); - } - else { - this.createHook(hook, method); - } - } - } - - let notification = this.config.default("service.notification.subscribe", {}); - for (let event in notification) { - let channel = yield this._connection.createChannel(); - yield channel.prefetch(prefetch); - this._notificationChannels[event] = channel; - - let method = yield this.injector.resolveMethod(notification[event]); - yield this.createNotificationQueue(event, method); - } - } - - *createNotificationQueue(event, method) { - let channel = this._channels[event]; - let queueName = `${this._namespace}.${this.SERVICE_NAME}.${event}`; - - yield channel.assertQueue(queueName, { durable: true }); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = method(payload); - - if (utils.isPromise(ret)) - yield ret; - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - })); - } - - *createQueue(publisherName, event, method) { - let channel = this._channels[event]; - - if (channel) { - let queueName = this._namespace + "." + publisherName + "." + this.SERVICE_NAME + "." + event; - let exchangeName = this._namespace + "." + publisherName + "." + event; - - yield channel.assertQueue(queueName, { durable: true }); - yield channel.assertExchange(exchangeName, "fanout", { durable: true }); - yield channel.bindQueue(queueName, exchangeName, ""); - this._queues.push(queueName); - - channel.consume(queueName, async(function* (message) { - try { - let payload = JSON.parse(message.content.toString()); - let ret = yield this.runMethod(method, payload); - - channel.ack(message); - } - catch (e) { - channel.nack(message); - } - }.bind(this))); - - } - } - - *runMethod(method, payload) { - let ret = method(payload); - if (utils.isPromise(ret)) yield ret; - - return ret; - } - - retryStrategy(err, response) { - if (this.remainingAttempts < this.maxAttempts - 1) { - this.logger.warn(`Retrying request to publisher's /info, will request ${this.remainingAttempts} more time(s) in ${this.retryDelay}ms`); - } - - this.remainingAttempts--; - - return request.RetryStrategies.HTTPOrNetworkError(err, response); - } - - *getServiceInfo(service) { - let response; - - try { - let jsonBody = { - url: this.resolve(service) + "/info", - method: "GET", - json: true, - maxAttempts: this.maxAttempts, - retryDelay: this.retryDelay, - retryStrategy: this.retryStrategy.bind(this), - fullResponse: false - } - - if (this._secret != null) { - jsonBody["headers"] = { - "Authorization": `Bearer ${this._secret}` - } - } - - response = yield request(jsonBody); - } - catch (e) { - this.logger.warn("Giving up..."); - this.logger.warn(e.stack); - return null; - } - - return response; - } - - extension() { - return Object.assign(super.extension(), { - queues: this._queues - }); - } - - *mount(service) { - service; - } - - *unmount(service) { - service; - } -} - -module.exports = ServiceSubRabbit; diff --git a/labs/rabbit-reconnect-test.js b/reconnect-example/rabbit-reconnect-test.js similarity index 80% rename from labs/rabbit-reconnect-test.js rename to reconnect-example/rabbit-reconnect-test.js index a68444f..7c8a089 100644 --- a/labs/rabbit-reconnect-test.js +++ b/reconnect-example/rabbit-reconnect-test.js @@ -2,12 +2,6 @@ const { execSync } = require("child_process"); const Rabbit = require("../lib/Rabbit.js"); const { rabbitConnection, rabbitUrl, startRabbitCommand, stopRabbitCommand } = require("../test/configuration.js"); -async function sleep(delay) { - return new Promise((resolve, reject) => { - setTimeout(resolve, delay); - }); -} - async function main() { // create subscribers @@ -53,6 +47,18 @@ async function main() { pubPayload: { qty: 3, saos: "tomat" } }); + // re use rabbitPub + rabbitPub.publish({qty: 2, saos: "sambal"}); + + // only create publisher without publish + const anotherRabbitPub = new Rabbit({ + connection: { connectionString: rabbitUrl }, + serviceName: "alpha", + pub: { pesanBakso: "pesanBaksoTrigger" }, + }); + + anotherRabbitPub.publish({qty: 1, saos: "kecap"}); + } main(); From c05556d409d9e1a9cbf78c03da1f4d12e8b7d097 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Fri, 22 Mar 2019 15:32:14 +0700 Subject: [PATCH 04/11] refactor --- lib/Rabbit.js | 63 ++++++++++++++-------- lib/service_pub_queue_rabbit.js | 28 +++++----- lib/service_pub_rabbit.js | 21 ++++---- reconnect-example/rabbit-reconnect-test.js | 4 +- test/service_pub_queue_rabbit_test.js | 3 +- test/service_pub_rabbit_test.js | 3 +- test/service_sub_queue_rabbit_test.js | 1 + test/service_sub_rabbit_test.js | 1 + 8 files changed, 71 insertions(+), 53 deletions(-) diff --git a/lib/Rabbit.js b/lib/Rabbit.js index 8e10326..489ad7f 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -1,5 +1,13 @@ const amqp = require("amqplib"); +/* + * This class support some action defined by configuration: + * - pubQueue --> publish to a queue + * - pub --> publish to exchange that will be fanned-out to multiple queue + * - subQueue --> subscribe to a queue + * - sub --> subscribe to exchange + */ + const defaultConfig = { connection: { host: "localhost", @@ -75,41 +83,49 @@ class Rabbit { this._run(); } - async publish(payload) { + async publish(config) { if (this._connection) { - this._config.pubPayload = payload; + delete this._config.pubPayload; + if ("pub" in config) { + this._config.pub = config.pub; + } + if ("pubQueue" in config) { + this._config.pubQueue = config.pubQueue; + } + if ("pubPayload" in config) { + this._config.pubPayload = config.pubPayload; + } this._publishedQueueList = []; await this._pubQueue(this._connection); await this._pub(this._connection); } else { await sleep(1000); - await this.publish(payload); + await this.publish(config); } } async _run() { try { // create connection - const connection = await amqp.connect(this._connectionString); + this._connection = await amqp.connect(this._connectionString); this._logger.info("Connected to rmq."); - connection.on("error", async (error) => { + this._connection.on("error", async (error) => { this._connection = null; this._logger.error("Connection error : ", error); await sleep(1000); await this._run(); }); - connection.on("close", async (error) => { + this._connection.on("close", async (error) => { this._connection = null; this._logger.error("Connection close : ", error); await sleep(1000); await this._run(); }); - this._connection = connection; // process pubSub handler await this._subQueue(this._connection); await this._sub(this._connection); - await this._pubQueue(connection); - await this._pub(connection); + await this._pubQueue(this._connection); + await this._pub(this._connection); // add to channels } catch (error) { this._connection = null; @@ -120,7 +136,8 @@ class Rabbit { } } - async _pub(connection) { + async _pub() { + // "pubPayLoad" might contains "false" or "null" if (!this._config.pub || !this._connection) { return false; } @@ -130,14 +147,14 @@ class Rabbit { for (const eventName in pub) { const triggerName = pub[eventName]; const exchangeName = `${namespace}.${serviceName}.${eventName}`; + const channel = await this._connection.createChannel(); + await channel.assertExchange(exchangeName, "fanout", { durable: true }); // if queue is already used for publication, then skip it if (this._publishedQueueList.includes(exchangeName)) { continue; } if ("pubPayload" in this._config) { const content = JSON.stringify(pubPayload); - const channel = await connection.createChannel(); - await channel.assertExchange(exchangeName, "fanout", { durable: true }); channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); // note that the queue has already been used for publication this._publishedQueueList.push(exchangeName); @@ -146,8 +163,8 @@ class Rabbit { return true; } - async _sub(connection) { - if (!this._config.sub) { + async _sub() { + if (!this._config.sub || !this._connection) { return false; } // extract config @@ -159,7 +176,7 @@ class Rabbit { const callback = sub[registryName][eventName]; const exchangeName = `${namespace}.${publisherName}.${eventName}`; const queueName = `${namespace}.${publisherName}.${serviceName}.${eventName}`; - const channel = await connection.createChannel(); + const channel = await this._connection.createChannel(); await channel.assertQueue(queueName, {durable: true}); await channel.assertExchange(exchangeName, "fanout", {durable: true}); await channel.bindQueue(queueName, exchangeName, ""); @@ -178,7 +195,7 @@ class Rabbit { for (const eventName in subNotification) { const callback = subNotification[eventName]; const queueName = `${namespace}.${serviceName}.${eventName}`; - const channel = await connection.createChannel(); + const channel = await this._connection.createChannel(); channel.assertQueue(queueName, {durable: true}); channel.consume(queueName, (message) => { try { @@ -193,7 +210,7 @@ class Rabbit { return true; } - async _pubQueue(connection) { + async _pubQueue() { if (!this._config.pubQueue || !this._connection) { return false; } @@ -204,15 +221,15 @@ class Rabbit { const eventList = pubQueue[serviceName]; for (const eventName of eventList) { const queueName = `${namespace}.queue.${serviceName}.${eventName}`; + const channel = await this._connection.createChannel(); + await channel.prefetch(prefetch); + channel.assertQueue(queueName, {durable: true}); // if queue is already used for publication, then skip it if (this._publishedQueueList.includes(queueName)) { continue; } if ("pubPayload" in this._config) { const content = JSON.stringify(pubPayload); - const channel = await connection.createChannel(); - await channel.prefetch(prefetch); - channel.assertQueue(queueName, {durable: true}); channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); // note that the queue has already been used for publication this._publishedQueueList.push(queueName); @@ -222,8 +239,8 @@ class Rabbit { return true; } - async _subQueue(connection) { - if (!this._config.subQueue) { + async _subQueue() { + if (!this._config.subQueue || !this._connection) { return false; } // extract config @@ -232,7 +249,7 @@ class Rabbit { for (const eventName in subQueue) { const callback = subQueue[eventName]; const queueName = `${namespace}.queue.${serviceName}.${eventName}`; - const channel = await connection.createChannel(); + const channel = await this._connection.createChannel(); await channel.prefetch(prefetch); channel.assertQueue(queueName, {durable: true}); channel.consume(queueName, (message) => { diff --git a/lib/service_pub_queue_rabbit.js b/lib/service_pub_queue_rabbit.js index f8f10bd..d4f24d1 100644 --- a/lib/service_pub_queue_rabbit.js +++ b/lib/service_pub_queue_rabbit.js @@ -18,11 +18,20 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { this.SERVICE_NAME = config.default("name", "unnamed-service"); this.VERSION = pack.version; - this._rabbit = null; this._status = "ok"; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); - + const desc = this.config.default("service.queue.publish", {}); + const pubQueue = {}; + for (const service in desc) { + pubQueue[service] = Object.keys(desc[service]); + } + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pubQueue, + }); this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this)); } @@ -33,17 +42,10 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { } publishEvent(service, event, payload) { - if (!this._rabbit) { - this._rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - pubQueue: {[service]: [event]}, - pubPayload: payload, - }); - } else { - this._rabbit.publish(payload); - } + this._rabbit.publish({ + pubQueue: {[service]: [event]}, + pubPayload: payload + }); } info() { diff --git a/lib/service_pub_rabbit.js b/lib/service_pub_rabbit.js index e3dd445..4b549a1 100644 --- a/lib/service_pub_rabbit.js +++ b/lib/service_pub_rabbit.js @@ -31,6 +31,12 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this._eventList.push(eventName); } this._publishedEvent = publishedEvent; + this._rabbit = new Rabbit({ + namespace: this._namespace, + connection: this._connectionConfig, + serviceName: this.SERVICE_NAME, + pub: this._publishedEvent, + }); this.servicePub.on("trigger", this.publishEvent.bind(this)); } @@ -42,17 +48,10 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { } publishEvent(event, payload) { - if (!this._rabbit) { - this._rabbit = new Rabbit({ - namespace: this._namespace, - connection: this._connectionConfig, - serviceName: this.SERVICE_NAME, - pub: {[event]: event ? this._publishedEvent[event] : ""}, - pubPayload: payload, - }); - } else { - this._rabbit.publish(payload); - } + this._rabbit.publish({ + pub: {[event]: event ? this._publishedEvent[event] : ""}, + pubPayload: payload, + }); } info() { diff --git a/reconnect-example/rabbit-reconnect-test.js b/reconnect-example/rabbit-reconnect-test.js index 7c8a089..c2e86c3 100644 --- a/reconnect-example/rabbit-reconnect-test.js +++ b/reconnect-example/rabbit-reconnect-test.js @@ -48,7 +48,7 @@ async function main() { }); // re use rabbitPub - rabbitPub.publish({qty: 2, saos: "sambal"}); + rabbitPub.publish({ pubPayload: {qty: 2, saos: "sambal"}}); // only create publisher without publish const anotherRabbitPub = new Rabbit({ @@ -57,7 +57,7 @@ async function main() { pub: { pesanBakso: "pesanBaksoTrigger" }, }); - anotherRabbitPub.publish({qty: 1, saos: "kecap"}); + anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap"}}); } diff --git a/test/service_pub_queue_rabbit_test.js b/test/service_pub_queue_rabbit_test.js index f544e30..c1c3c75 100644 --- a/test/service_pub_queue_rabbit_test.js +++ b/test/service_pub_queue_rabbit_test.js @@ -86,6 +86,7 @@ describe("Merapi Plugin Service: Queue Publisher", function() { })); afterEach(async(function*() { + yield sleep(100); yield publisherAContainer.stop(); yield publisherBContainer.stop(); yield channel.close(); @@ -121,14 +122,12 @@ describe("Merapi Plugin Service: Queue Publisher", function() { expect(triggerB).to.not.be.null; })); - /* No need to create queue now, let's just make it when it is really needed it("should create queue", async(function*() { const inQueueResult = yield channel.checkQueue("default.queue.subscriber.in_queue_publisher_test"); const outQueueResult = yield channel.checkQueue("default.queue.subscriber.out_queue_publisher_test"); expect(inQueueResult.queue).to.equal("default.queue.subscriber.in_queue_publisher_test"); expect(outQueueResult.queue).to.equal("default.queue.subscriber.out_queue_publisher_test"); })); - */ }); describe("when publishing event", function() { diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index 1ddace2..28ce10c 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -87,6 +87,7 @@ describe("Merapi Plugin Service: Publisher", function() { })); afterEach(async(function*() { + yield sleep(100); yield publisherAContainer.stop(); yield publisherBContainer.stop(); yield channel.close(); @@ -127,7 +128,6 @@ describe("Merapi Plugin Service: Publisher", function() { ); }); - /* No need to create exchange now, let's just make it when it is really needed it("should create exchanges", async(function*() { yield channel.checkExchange( "default.publisher.incoming_message_publisher_test" @@ -136,7 +136,6 @@ describe("Merapi Plugin Service: Publisher", function() { "default.publisher.outgoing_message_publisher_test" ); })); - */ }); describe("when publishing event", function() { diff --git a/test/service_sub_queue_rabbit_test.js b/test/service_sub_queue_rabbit_test.js index 54f1791..5f45384 100644 --- a/test/service_sub_queue_rabbit_test.js +++ b/test/service_sub_queue_rabbit_test.js @@ -125,6 +125,7 @@ describe("Merapi Plugin Service: Queue Subscriber", function() { })); afterEach(async(function*() { + yield sleep(100); yield subscriberAContainer.stop(); yield subscriberBContainer.stop(); yield channel.close(); diff --git a/test/service_sub_rabbit_test.js b/test/service_sub_rabbit_test.js index e1dbc4f..87d2512 100644 --- a/test/service_sub_rabbit_test.js +++ b/test/service_sub_rabbit_test.js @@ -130,6 +130,7 @@ describe("Merapi Plugin Service: Subscriber", function() { })); afterEach(async(function*() { + yield sleep(100); yield subscriberAContainer.stop(); yield subscriberBContainer.stop(); yield channel.close(); From 64da51f351eec73814d5df67131723e69182e188 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Fri, 22 Mar 2019 16:21:54 +0700 Subject: [PATCH 05/11] add reconnectDelay setting --- README.md | 5 ++++- lib/Rabbit.js | 11 +++++++---- lib/service_pub_queue_rabbit.js | 4 ++++ lib/service_pub_rabbit.js | 3 +++ lib/service_sub_queue_rabbit.js | 2 ++ lib/service_sub_rabbit.js | 2 ++ 6 files changed, 22 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c21f9c2..7a3412c 100644 --- a/README.md +++ b/README.md @@ -210,4 +210,7 @@ export STOP_RABBIT_COMMAND="docker stop rabbitmq" and perform: -`source ./test/test.env && npm test` +``` +source ./test/test.env && npm test +source ./test/test.env && node ./reconnect-example/rabbit-reconnect-test.js +``` diff --git a/lib/Rabbit.js b/lib/Rabbit.js index 489ad7f..c5c9c72 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -20,6 +20,7 @@ const defaultConfig = { serviceName: "unnamed-service", namespace: "default", prefetch: 5, + reconnectDelay: 1000, /* * pubPayload?: {}, // Necessary for: pub + pubQueue * pubQueue?: { // Necessary for: pubQueue @@ -99,12 +100,14 @@ class Rabbit { await this._pubQueue(this._connection); await this._pub(this._connection); } else { - await sleep(1000); + const { reconnectDelay } = this._config; + await sleep(reconnectDelay); await this.publish(config); } } async _run() { + const { reconnectDelay } = this._config; try { // create connection this._connection = await amqp.connect(this._connectionString); @@ -112,13 +115,13 @@ class Rabbit { this._connection.on("error", async (error) => { this._connection = null; this._logger.error("Connection error : ", error); - await sleep(1000); + await sleep(reconnectDelay); await this._run(); }); this._connection.on("close", async (error) => { this._connection = null; this._logger.error("Connection close : ", error); - await sleep(1000); + await sleep(reconnectDelay); await this._run(); }); // process pubSub handler @@ -130,7 +133,7 @@ class Rabbit { } catch (error) { this._connection = null; this._logger.warn("Failed to connect to rmq.", error); - await sleep(1000); + await sleep(reconnectDelay); this._logger.info("Attempting to reconnect to rmq."); await this._run(); } diff --git a/lib/service_pub_queue_rabbit.js b/lib/service_pub_queue_rabbit.js index d4f24d1..f737f71 100644 --- a/lib/service_pub_queue_rabbit.js +++ b/lib/service_pub_queue_rabbit.js @@ -21,16 +21,20 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { this._status = "ok"; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); + const desc = this.config.default("service.queue.publish", {}); const pubQueue = {}; for (const service in desc) { pubQueue[service] = Object.keys(desc[service]); } + this._rabbit = new Rabbit({ namespace: this._namespace, connection: this._connectionConfig, serviceName: this.SERVICE_NAME, pubQueue, + reconnectDelay: this._reconnectDelay, }); this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this)); } diff --git a/lib/service_pub_rabbit.js b/lib/service_pub_rabbit.js index 4b549a1..200a557 100644 --- a/lib/service_pub_rabbit.js +++ b/lib/service_pub_rabbit.js @@ -24,6 +24,7 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); this._eventList = []; const publishedEvent = this.config.default("service.publish", {}); @@ -31,11 +32,13 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this._eventList.push(eventName); } this._publishedEvent = publishedEvent; + this._rabbit = new Rabbit({ namespace: this._namespace, connection: this._connectionConfig, serviceName: this.SERVICE_NAME, pub: this._publishedEvent, + reconnectDelay: this._reconnectDelay, }); this.servicePub.on("trigger", this.publishEvent.bind(this)); diff --git a/lib/service_sub_queue_rabbit.js b/lib/service_sub_queue_rabbit.js index c467216..ca03090 100644 --- a/lib/service_sub_queue_rabbit.js +++ b/lib/service_sub_queue_rabbit.js @@ -24,6 +24,7 @@ class ServiceSubRabbit extends ServiceSubQueue { this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); // rawSubscribeConfig's structure: {[eventName: string]: string} const rawSubscribeConfig = this.config.default("service.queue.subscribe", {}); @@ -66,6 +67,7 @@ class ServiceSubRabbit extends ServiceSubQueue { connection: this._connectionConfig, serviceName: this.SERVICE_NAME, subQueue: this._subscribeConfig, + reconnectDelay: this._reconnectDelay, }); } } diff --git a/lib/service_sub_rabbit.js b/lib/service_sub_rabbit.js index fa08405..7ec3d36 100644 --- a/lib/service_sub_rabbit.js +++ b/lib/service_sub_rabbit.js @@ -31,6 +31,7 @@ class ServiceSubRabbit extends ServiceSub { this._namespace = config.default("service.rabbit.namespace", "default"); this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); // rawSubscribeConfig's structure: @@ -120,6 +121,7 @@ class ServiceSubRabbit extends ServiceSub { serviceName: this.SERVICE_NAME, sub: this._subscribeConfig, subNotification: this._notificationConfig, + reconnectDelay: this._reconnectDelay, publisher }); } From 5bc63516974bf34475d3ff03d8556ed6f5dd5240 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Fri, 22 Mar 2019 16:58:13 +0700 Subject: [PATCH 06/11] add try-catch, remove wrong signature call --- lib/Rabbit.js | 8 ++++---- test/service_pub_queue_rabbit_test.js | 15 +++++++++------ test/service_pub_rabbit_test.js | 15 +++++++++------ 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/Rabbit.js b/lib/Rabbit.js index c5c9c72..1d0e501 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -125,10 +125,10 @@ class Rabbit { await this._run(); }); // process pubSub handler - await this._subQueue(this._connection); - await this._sub(this._connection); - await this._pubQueue(this._connection); - await this._pub(this._connection); + await this._subQueue(); + await this._sub(); + await this._pubQueue(); + await this._pub(); // add to channels } catch (error) { this._connection = null; diff --git a/test/service_pub_queue_rabbit_test.js b/test/service_pub_queue_rabbit_test.js index c1c3c75..c4065df 100644 --- a/test/service_pub_queue_rabbit_test.js +++ b/test/service_pub_queue_rabbit_test.js @@ -86,12 +86,15 @@ describe("Merapi Plugin Service: Queue Publisher", function() { })); afterEach(async(function*() { - yield sleep(100); - yield publisherAContainer.stop(); - yield publisherBContainer.stop(); - yield channel.close(); - yield connection.close(); - + try { + yield sleep(100); + yield publisherAContainer.stop(); + yield publisherBContainer.stop(); + yield channel.close(); + yield connection.close(); + } catch (error) { + // do nothing + } currentIteration++; })); diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index 28ce10c..8151c84 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -87,12 +87,15 @@ describe("Merapi Plugin Service: Publisher", function() { })); afterEach(async(function*() { - yield sleep(100); - yield publisherAContainer.stop(); - yield publisherBContainer.stop(); - yield channel.close(); - yield connection.close(); - + try { + yield sleep(100); + yield publisherAContainer.stop(); + yield publisherBContainer.stop(); + yield channel.close(); + yield connection.close(); + } catch (error) { + // do nothing + } currentIteration++; })); From 0c5b92dfcb104ff6be4789e17ef597954c2f36af Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Fri, 22 Mar 2019 17:04:07 +0700 Subject: [PATCH 07/11] bump new version --- package.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index f59475e..1f1752b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "merapi-plugin-service-rabbit", - "version": "0.5.2", + "version": "0.6.0", "description": "Provide RabbitMQ integration interface", "main": "index.js", "scripts": { @@ -21,7 +21,9 @@ "contributors": [ "Yoga Aliarham ", "Ricky Anders ", - "Reyhan Sofian " + "Reyhan Sofian ", + "Armanda Caesar ", + "Go Frendi Gunawan " ], "license": "ISC", "homepage": "https://github.com/kata-ai/merapi-plugin-service-rabbit#readme", From 24fe7a31f318c3c6431ba3055ad25c62e1939c0e Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Sun, 24 Mar 2019 10:26:08 +0700 Subject: [PATCH 08/11] refactor --- lib/Rabbit.js | 54 +++++++++++----------- lib/service_pub_queue_rabbit.js | 6 +-- lib/service_pub_rabbit.js | 2 +- lib/service_sub_queue_rabbit.js | 2 +- lib/service_sub_rabbit.js | 2 +- reconnect-example/rabbit-reconnect-test.js | 13 ++++-- test/service_pub_queue_rabbit_test.js | 13 ++---- test/service_pub_rabbit_test.js | 13 ++---- 8 files changed, 52 insertions(+), 53 deletions(-) diff --git a/lib/Rabbit.js b/lib/Rabbit.js index 1d0e501..a05e6cd 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -20,12 +20,13 @@ const defaultConfig = { serviceName: "unnamed-service", namespace: "default", prefetch: 5, - reconnectDelay: 1000, + reconnectDelay: 100, /* - * pubPayload?: {}, // Necessary for: pub + pubQueue + * pubQueuePayload?: {}, // Necessary for: pub + pubQueue * pubQueue?: { // Necessary for: pubQueue * [serviceName]: string[], // `eventName` * }, + * pubPayload?: {}, // Necessary for: pub + pubQueue * pub?: { // Necessary for: pub * [eventName]: string, // `triggerName` * }, @@ -76,7 +77,6 @@ async function sleep(delay) { class Rabbit { constructor(config = {}, logger = console) { - this._publishedQueueList = []; // queue name that has been published (to avoid redundant publication) this._connection = null; this._config = getCompleteConfig(config); this._logger = logger; @@ -85,20 +85,14 @@ class Rabbit { } async publish(config) { - if (this._connection) { - delete this._config.pubPayload; + if (this._connection && !("pubPayload" in this._config)) { if ("pub" in config) { this._config.pub = config.pub; } - if ("pubQueue" in config) { - this._config.pubQueue = config.pubQueue; - } if ("pubPayload" in config) { this._config.pubPayload = config.pubPayload; } - this._publishedQueueList = []; - await this._pubQueue(this._connection); - await this._pub(this._connection); + await this._pub(); } else { const { reconnectDelay } = this._config; await sleep(reconnectDelay); @@ -106,6 +100,22 @@ class Rabbit { } } + async publishQueue(config) { + if (this._connection && !("pubQueuePayload" in this._config)) { + if ("pubQueue" in config) { + this._config.pubQueue = config.pubQueue; + } + if ("pubQueuePayload" in config) { + this._config.pubQueuePayload = config.pubQueuePayload; + } + await this._pubQueue(); + } else { + const { reconnectDelay } = this._config; + await sleep(reconnectDelay); + await this.publishQueue(config); + } + } + async _run() { const { reconnectDelay } = this._config; try { @@ -152,17 +162,13 @@ class Rabbit { const exchangeName = `${namespace}.${serviceName}.${eventName}`; const channel = await this._connection.createChannel(); await channel.assertExchange(exchangeName, "fanout", { durable: true }); - // if queue is already used for publication, then skip it - if (this._publishedQueueList.includes(exchangeName)) { - continue; - } + // Only publish if payload exists if ("pubPayload" in this._config) { const content = JSON.stringify(pubPayload); channel.publish(exchangeName, "", Buffer.from(content), { persistent: true }); - // note that the queue has already been used for publication - this._publishedQueueList.push(exchangeName); } } + delete this._config.pubPayload; return true; } @@ -218,7 +224,7 @@ class Rabbit { return false; } // extract config - const { namespace, prefetch, pubQueue, pubPayload } = this._config; + const { namespace, prefetch, pubQueue, pubQueuePayload } = this._config; // create channel & publish payload for (const serviceName in pubQueue) { const eventList = pubQueue[serviceName]; @@ -227,18 +233,14 @@ class Rabbit { const channel = await this._connection.createChannel(); await channel.prefetch(prefetch); channel.assertQueue(queueName, {durable: true}); - // if queue is already used for publication, then skip it - if (this._publishedQueueList.includes(queueName)) { - continue; - } - if ("pubPayload" in this._config) { - const content = JSON.stringify(pubPayload); + // Only publish if payload exists + if ("pubQueuePayload" in this._config) { + const content = JSON.stringify(pubQueuePayload); channel.sendToQueue(queueName, Buffer.from(content), { persistent: true }); - // note that the queue has already been used for publication - this._publishedQueueList.push(queueName); } } } + delete this._config.pubQueuePayload; return true; } diff --git a/lib/service_pub_queue_rabbit.js b/lib/service_pub_queue_rabbit.js index f737f71..cc21bff 100644 --- a/lib/service_pub_queue_rabbit.js +++ b/lib/service_pub_queue_rabbit.js @@ -21,7 +21,7 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { this._status = "ok"; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); - this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 100); const desc = this.config.default("service.queue.publish", {}); const pubQueue = {}; @@ -46,9 +46,9 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) { } publishEvent(service, event, payload) { - this._rabbit.publish({ + this._rabbit.publishQueue({ pubQueue: {[service]: [event]}, - pubPayload: payload + pubQueuePayload: payload }); } diff --git a/lib/service_pub_rabbit.js b/lib/service_pub_rabbit.js index 200a557..6f476de 100644 --- a/lib/service_pub_rabbit.js +++ b/lib/service_pub_rabbit.js @@ -24,7 +24,7 @@ class ServicePubRabbit extends Component.mixin(AsyncEmitter) { this._connection = null; this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); - this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 100); this._eventList = []; const publishedEvent = this.config.default("service.publish", {}); diff --git a/lib/service_sub_queue_rabbit.js b/lib/service_sub_queue_rabbit.js index ca03090..33af8be 100644 --- a/lib/service_sub_queue_rabbit.js +++ b/lib/service_sub_queue_rabbit.js @@ -24,7 +24,7 @@ class ServiceSubRabbit extends ServiceSubQueue { this._namespace = config.default("service.rabbit.namespace", "default"); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); - this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 100); // rawSubscribeConfig's structure: {[eventName: string]: string} const rawSubscribeConfig = this.config.default("service.queue.subscribe", {}); diff --git a/lib/service_sub_rabbit.js b/lib/service_sub_rabbit.js index 7ec3d36..c15f3e4 100644 --- a/lib/service_sub_rabbit.js +++ b/lib/service_sub_rabbit.js @@ -31,7 +31,7 @@ class ServiceSubRabbit extends ServiceSub { this._namespace = config.default("service.rabbit.namespace", "default"); this._prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5)); - this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 1000); + this._reconnectDelay = config.default("service.rabbit.reconnectDelay", 100); this._connectionConfig = config.default("service.rabbit", { host: "localhost", port: 5672 }); // rawSubscribeConfig's structure: diff --git a/reconnect-example/rabbit-reconnect-test.js b/reconnect-example/rabbit-reconnect-test.js index c2e86c3..b66f211 100644 --- a/reconnect-example/rabbit-reconnect-test.js +++ b/reconnect-example/rabbit-reconnect-test.js @@ -10,6 +10,7 @@ async function main() { connection: { connectionString: rabbitUrl }, serviceName: "beta", subQueue: { pesanPizza: (payload) => console.log("Pesan Pizza", payload) }, + reconnectDelay: 1000, }); const rabbitSub = new Rabbit({ @@ -25,6 +26,7 @@ async function main() { pesanBakso: (payload) => console.log("Pesan Bakso", payload) }, }, + reconnectDelay: 1000, }); // restart rabbit @@ -37,14 +39,16 @@ async function main() { connection: { connectionString: rabbitUrl }, serviceName: "alpha", pubQueue: { beta: ["pesanPizza"] }, - pubPayload: { qty: 2, topping: "cheese" } + pubQueuePayload: { qty: 2, topping: "cheese" }, + reconnectDelay: 1000, }); const rabbitPub = new Rabbit({ connection: { connectionString: rabbitUrl }, serviceName: "alpha", pub: { pesanBakso: "pesanBaksoTrigger" }, - pubPayload: { qty: 3, saos: "tomat" } + pubPayload: { qty: 3, saos: "tomat" }, + reconnectDelay: 1000, }); // re use rabbitPub @@ -57,7 +61,10 @@ async function main() { pub: { pesanBakso: "pesanBaksoTrigger" }, }); - anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap"}}); + anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap1"}}); + anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap2"}}); + anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap3"}}); + anotherRabbitPub.publish({pubPayload: {qty: 1, saos: "kecap4"}}); } diff --git a/test/service_pub_queue_rabbit_test.js b/test/service_pub_queue_rabbit_test.js index c4065df..93fcf5e 100644 --- a/test/service_pub_queue_rabbit_test.js +++ b/test/service_pub_queue_rabbit_test.js @@ -86,15 +86,10 @@ describe("Merapi Plugin Service: Queue Publisher", function() { })); afterEach(async(function*() { - try { - yield sleep(100); - yield publisherAContainer.stop(); - yield publisherBContainer.stop(); - yield channel.close(); - yield connection.close(); - } catch (error) { - // do nothing - } + yield publisherAContainer.stop(); + yield publisherBContainer.stop(); + yield channel.close(); + yield connection.close(); currentIteration++; })); diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index 8151c84..194fafa 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -87,15 +87,10 @@ describe("Merapi Plugin Service: Publisher", function() { })); afterEach(async(function*() { - try { - yield sleep(100); - yield publisherAContainer.stop(); - yield publisherBContainer.stop(); - yield channel.close(); - yield connection.close(); - } catch (error) { - // do nothing - } + yield publisherAContainer.stop(); + yield publisherBContainer.stop(); + yield channel.close(); + yield connection.close(); currentIteration++; })); From b4c55f820ac553a18c8e0951504efe63b0cafff3 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Sun, 24 Mar 2019 10:40:45 +0700 Subject: [PATCH 09/11] add timeout --- test/service_pub_queue_rabbit_test.js | 1 + test/service_pub_rabbit_test.js | 1 + test/service_sub_queue_rabbit_test.js | 1 + test/service_sub_rabbit_test.js | 1 + 4 files changed, 4 insertions(+) diff --git a/test/service_pub_queue_rabbit_test.js b/test/service_pub_queue_rabbit_test.js index 93fcf5e..9ffdaf8 100644 --- a/test/service_pub_queue_rabbit_test.js +++ b/test/service_pub_queue_rabbit_test.js @@ -22,6 +22,7 @@ describe("Merapi Plugin Service: Queue Publisher", function() { beforeEach(async(function*() { this.timeout(5000); + yield sleep(100); let publisherConfig = { name: "publisher", diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index 194fafa..7fd67c8 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -26,6 +26,7 @@ describe("Merapi Plugin Service: Publisher", function() { this.timeout(5000); beforeEach(async(function*() { + yield sleep(100); let publisherConfig = { name: "publisher", version: "1.0.0", diff --git a/test/service_sub_queue_rabbit_test.js b/test/service_sub_queue_rabbit_test.js index 5f45384..2d8a516 100644 --- a/test/service_sub_queue_rabbit_test.js +++ b/test/service_sub_queue_rabbit_test.js @@ -24,6 +24,7 @@ describe("Merapi Plugin Service: Queue Subscriber", function() { beforeEach(async(function*() { this.timeout(5000); + yield sleep(100); let publisherConfig = { name: "publisher", diff --git a/test/service_sub_rabbit_test.js b/test/service_sub_rabbit_test.js index 87d2512..814b611 100644 --- a/test/service_sub_rabbit_test.js +++ b/test/service_sub_rabbit_test.js @@ -28,6 +28,7 @@ describe("Merapi Plugin Service: Subscriber", function() { beforeEach(async(function*() { this.timeout(5000); + yield sleep(100); let publisherConfig = { name: "publisher", From d880fbaad97eabce25f94a3328bcc3ef48b4b341 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Sun, 24 Mar 2019 11:16:26 +0700 Subject: [PATCH 10/11] add try-catch for channel-close error on testing --- test/service_pub_rabbit_test.js | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/service_pub_rabbit_test.js b/test/service_pub_rabbit_test.js index 7fd67c8..8a9a4b8 100644 --- a/test/service_pub_rabbit_test.js +++ b/test/service_pub_rabbit_test.js @@ -84,13 +84,18 @@ describe("Merapi Plugin Service: Publisher", function() { connection = yield amqplib.connect(rabbitUrl); channel = yield connection.createChannel(); - yield sleep(100); + yield sleep(1000); })); afterEach(async(function*() { yield publisherAContainer.stop(); yield publisherBContainer.stop(); - yield channel.close(); + try { + // for unknown reason, sometime channel is already closed + yield channel.close(); + } catch(error) { + // do nothing + } yield connection.close(); currentIteration++; })); @@ -161,6 +166,7 @@ describe("Merapi Plugin Service: Publisher", function() { let message = []; q = yield channel.assertQueue("default.queue2"); + yield sleep(100); triggerA = yield publisherAContainer.resolve( "triggerOutgoingMessagePublisherTest" ); @@ -170,7 +176,7 @@ describe("Merapi Plugin Service: Publisher", function() { exchangeName = "default.publisher.outgoing_message_publisher_test"; for (let i = 0; i < 5; i++) { - yield sleep(150); + yield sleep(100); if (i % 2 == 0) { yield triggerA(i); } else { From ee50f42aee9b662523aa49096c5f33db56476708 Mon Sep 17 00:00:00 2001 From: goFrendiAsgard Date: Wed, 27 Mar 2019 20:24:46 +0700 Subject: [PATCH 11/11] add try-catch --- lib/Rabbit.js | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/Rabbit.js b/lib/Rabbit.js index a05e6cd..9b18131 100644 --- a/lib/Rabbit.js +++ b/lib/Rabbit.js @@ -92,7 +92,13 @@ class Rabbit { if ("pubPayload" in config) { this._config.pubPayload = config.pubPayload; } - await this._pub(); + try { + await this._pub(); + } catch (error) { + const { reconnectDelay } = this._config; + await sleep(reconnectDelay); + await this.publish(config); + } } else { const { reconnectDelay } = this._config; await sleep(reconnectDelay); @@ -108,7 +114,13 @@ class Rabbit { if ("pubQueuePayload" in config) { this._config.pubQueuePayload = config.pubQueuePayload; } - await this._pubQueue(); + try { + await this._pubQueue(); + } catch (error) { + const { reconnectDelay } = this._config; + await sleep(reconnectDelay); + await this.publishQueue(config); + } } else { const { reconnectDelay } = this._config; await sleep(reconnectDelay);