Skip to content

Commit 6e40b08

Browse files
add different handling for on close and on error
1 parent 5dab0f5 commit 6e40b08

File tree

8 files changed

+80
-167
lines changed

8 files changed

+80
-167
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ coverage
55
.vscode
66
.nyc_output
77
test/test.env
8+
labs/

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ export RABBIT_HOST=0.0.0.0
203203
export RABBIT_PORT=5672
204204
export RABBIT_USERNAME=root
205205
export RABBIT_PASSWORD=toor
206+
# these commands are for reconnecting test
207+
export START_RABBIT_COMMAND="docker start rabbitmq"
208+
export STOP_RABBIT_COMMAND="docker stop rabbitmq"
206209
```
207210

208211
and perform:

lib/service_amqp.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"use strict";
22

3+
const amqpConnectionManager = require('amqp-connection-manager');
34
const amqp = require("amqplib");
45
const sleep = require("then-sleep");
56
const { Component, AsyncEmitter } = require("merapi");
@@ -42,20 +43,32 @@ module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
4243

4344
try {
4445
this._connection = yield amqp.connect(connectionString);
46+
// this._connection = yield amqpConnectionManager.connect([connectionString]);
4547

4648
this.logger.info("Connected to rmq.");
4749

48-
this._connection.on("close", this.doConnect.bind(this));
49-
this._connection.on("error", this.doConnect.bind(this));
50+
this._connection.on("close", (e) => {
51+
this.logger.error(e);
52+
this.emit("disconnected");
53+
this._connection.close();
54+
this.doConnect.bind(this)();
55+
});
56+
this._connection.on("error", (e) => {
57+
this.logger.error(e);
58+
this.emit("disconnected");
59+
this._connection = null;
60+
this.doConnect.bind(this)();
61+
});
5062

5163
this.emit("connected");
5264

5365
this._initializing = false;
5466
} catch (e) {
67+
this.logger.error(e);
5568
this._initializing = false;
5669

5770
this.logger.warn("Failed to connect to rmq.", e);
58-
yield sleep(3000);
71+
yield sleep(30000);
5972
this.logger.info("Attempting to reconnect to rmq.");
6073

6174
yield this.doConnect();

lib/service_sub_queue_rabbit.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,4 @@ class ServiceSubRabbit extends ServiceSubQueue {
9494
*unmount() { }
9595
}
9696

97-
module.exports = ServiceSubRabbit;
97+
module.exports = ServiceSubRabbit;

package-lock.json

Lines changed: 51 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
"license": "ISC",
2727
"homepage": "https://github.com/kata-ai/merapi-plugin-service-rabbit#readme",
2828
"dependencies": {
29-
"amqplib": "^0.4.2",
29+
"amqp-connection-manager": "^2.3.0",
30+
"amqplib": "^0.5.3",
3031
"merapi": "^0.16.0",
3132
"merapi-plugin-service": "^0.3.5",
3233
"requestretry": "^1.12.0",

test/configuration.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
const { exec } = require("child_process");
2+
13
const rabbitConnection = {
24
host: process.env.RABBIT_HOST || "localhost",
35
port: process.env.RABBIT_PORT || 5672,
@@ -9,8 +11,12 @@ const rabbitConnection = {
911
};
1012

1113
const rabbitUrl = `amqp://${rabbitConnection.user}:${rabbitConnection.password}@${rabbitConnection.host}:${rabbitConnection.port}`;
14+
const startRabbitCommand = process.env.START_RABBIT_COMMAND;
15+
const stopRabbitCommand = process.env.STOP_RABBIT_COMMAND;
1216

1317
module.exports = {
1418
rabbitConnection,
1519
rabbitUrl,
20+
startRabbitCommand,
21+
stopRabbitCommand,
1622
};

test/service_sub_queue_reconnect_rabbit_test.js

Lines changed: 0 additions & 142 deletions
This file was deleted.

0 commit comments

Comments
 (0)