Skip to content

Commit df1c450

Browse files
authored
feat: add graceful shutdown (#21)
1 parent 20c3801 commit df1c450

File tree

1 file changed

+100
-52
lines changed

1 file changed

+100
-52
lines changed

lib/service_amqp.js

Lines changed: 100 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,118 @@
1+
//@ts-check
12
"use strict";
23

34
const amqp = require("amqplib");
45
const sleep = require("then-sleep");
56
const { Component, AsyncEmitter } = require("merapi");
67

78
module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
8-
9-
constructor(config, logger) {
10-
super();
11-
this.config = config;
12-
this.logger = logger;
13-
this._connection;
14-
this._initializing = false;
15-
this.serviceSub = null;
16-
this.serviceSubQueue = null;
17-
}
18-
19-
initialize() {
20-
return this.doConnect();
21-
}
22-
23-
getConnection() {
24-
return this._connection;
9+
constructor(config, logger) {
10+
super();
11+
this.config = config;
12+
this.logger = logger;
13+
this._connection;
14+
this._initializing = false;
15+
this.serviceSub = null;
16+
this.serviceSubQueue = null;
17+
this.signals = ["SIGINT", "SIGTERM", "SIGQUIT"];
18+
this.isShuttingDown = false;
19+
}
20+
21+
initialize() {
22+
return this.doConnect();
23+
}
24+
25+
getConnection() {
26+
return this._connection;
27+
}
28+
29+
*doConnect() {
30+
if (this._initializing) return;
31+
32+
// we don't need to reconnect if there's any shutting down process
33+
if (this.isShuttingDown) return;
34+
35+
this._initializing = true;
36+
let {
37+
secure,
38+
user,
39+
password,
40+
host,
41+
port,
42+
connectionString,
43+
heartbeat = 20,
44+
} = this.config.default("service.rabbit", {
45+
host: "localhost",
46+
port: 5672,
47+
heartbeat: 20,
48+
});
49+
50+
let protocol = typeof secure === "boolean" && secure ? "amqps" : "amqp";
51+
52+
port = protocol === "amqps" ? 5671 : 5672;
53+
54+
if (!connectionString) {
55+
if (user && password) {
56+
connectionString = `${protocol}://${user}:${password}@${host}:${port}`;
57+
} else {
58+
connectionString = `${protocol}://${host}:${port}`;
59+
}
2560
}
2661

27-
*doConnect() {
28-
if (this._initializing) return;
29-
this._initializing = true;
30-
let { secure, user, password, host, port, connectionString, heartbeat } = this.config.default("service.rabbit", { host: "localhost", port: 5672, heartbeat: 20 });
62+
let connectionStringOpts = `${connectionString}?heartbeat=${heartbeat}`;
3163

32-
let protocol = (typeof secure === "boolean" && secure) ? "amqps" : "amqp";
64+
try {
65+
this._connection = yield amqp.connect(connectionStringOpts);
3366

34-
port = (protocol === "amqps") ? 5671 : 5672;
67+
this.logger.info("Connected to rmq.");
3568

36-
if (!connectionString) {
37-
if (user && password) {
38-
connectionString = `${protocol}://${user}:${password}@${host}:${port}`;
39-
}
40-
else {
41-
connectionString = `${protocol}://${host}:${port}`;
42-
}
43-
}
44-
45-
let connectionStringOpts = `${connectionString}?heartbeat=${heartbeat}`;
69+
this._connection.on("close", this.doConnect.bind(this));
70+
this._connection.on("error", this.doConnect.bind(this));
71+
this.handleShutdown();
4672

47-
try {
48-
this._connection = yield amqp.connect(connectionStringOpts);
73+
this.emit("connected");
4974

50-
this.logger.info("Connected to rmq.");
75+
this._initializing = false;
76+
} catch (e) {
77+
this._initializing = false;
5178

52-
this._connection.on("close", this.doConnect.bind(this));
53-
this._connection.on("error", this.doConnect.bind(this));
79+
this.logger.warn("Failed to connect to rmq.", e);
80+
yield sleep(3000);
81+
this.logger.info("Attempting to reconnect to rmq.");
82+
this.serviceSub._initializing = false;
83+
this.serviceSubQueue._initializing = false;
5484

55-
this.emit("connected");
56-
57-
this._initializing = false;
58-
} catch (e) {
59-
this._initializing = false;
60-
61-
this.logger.warn("Failed to connect to rmq.", e);
62-
yield sleep(3000);
63-
this.logger.info("Attempting to reconnect to rmq.");
64-
this.serviceSub._initializing = false;
65-
this.serviceSubQueue._initializing = false;
66-
67-
yield this.doConnect();
68-
}
85+
yield this.doConnect();
86+
}
87+
}
88+
89+
handleShutdown() {
90+
this.signals.forEach((signal) =>
91+
process.addListener(signal, () => {
92+
console.log("Received signal", signal);
93+
this.cleanup(signal);
94+
95+
console.log("Exiting process in 3sec...");
96+
setTimeout(() => {
97+
process.exit(0);
98+
}, 3000);
99+
})
100+
);
101+
}
102+
103+
cleanup(signal) {
104+
if (!this.isShuttingDown) {
105+
try {
106+
console.log("Shutting down rabbitmq plugin...");
107+
108+
this.isShuttingDown = true;
109+
110+
this._connection.close();
111+
} catch (e) {
112+
console.error("Error while shutting down gracefully", e);
113+
114+
process.exit(1);
115+
}
69116
}
117+
}
70118
};

0 commit comments

Comments
 (0)