Skip to content

Commit 62321fd

Browse files
authored
fix: remove graceful timeout (#22)
1 parent df1c450 commit 62321fd

File tree

6 files changed

+227
-173
lines changed

6 files changed

+227
-173
lines changed

index.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ module.exports = function () {
2828
let serviceSubQueueRabbit = yield container.resolve("serviceSubQueueRabbit");
2929
service.addModule("pub-queue-rabbit", servicePubQueueRabbit);
3030
service.addModule("sub-queue-rabbit", serviceSubQueueRabbit);
31+
},
32+
33+
*onStop(container) {
34+
let service = yield container.resolve("amqp");
35+
yield service.stop();
3136
}
3237
};
33-
};
38+
};

lib/service_amqp.js

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
1515
this.serviceSub = null;
1616
this.serviceSubQueue = null;
1717
this.signals = ["SIGINT", "SIGTERM", "SIGQUIT"];
18-
this.isShuttingDown = false;
1918
}
2019

2120
initialize() {
@@ -28,10 +27,6 @@ module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
2827

2928
*doConnect() {
3029
if (this._initializing) return;
31-
32-
// we don't need to reconnect if there's any shutting down process
33-
if (this.isShuttingDown) return;
34-
3530
this._initializing = true;
3631
let {
3732
secure,
@@ -68,7 +63,6 @@ module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
6863

6964
this._connection.on("close", this.doConnect.bind(this));
7065
this._connection.on("error", this.doConnect.bind(this));
71-
this.handleShutdown();
7266

7367
this.emit("connected");
7468

@@ -86,33 +80,13 @@ module.exports = class Amqp extends Component.mixin(AsyncEmitter) {
8680
}
8781
}
8882

89-
handleShutdown() {
90-
this.signals.forEach((signal) =>
91-
process.addListener(signal, () => {
92-
console.log("Received signal", signal);
93-
this.cleanup(signal);
94-
95-
console.log("Exiting process in 3sec...");
96-
setTimeout(() => {
97-
process.exit(0);
98-
}, 3000);
99-
})
100-
);
101-
}
102-
103-
cleanup(signal) {
104-
if (!this.isShuttingDown) {
105-
try {
106-
console.log("Shutting down rabbitmq plugin...");
107-
108-
this.isShuttingDown = true;
109-
110-
this._connection.close();
111-
} catch (e) {
112-
console.error("Error while shutting down gracefully", e);
113-
114-
process.exit(1);
115-
}
83+
*stop() {
84+
try {
85+
this.logger.info("Shutting down rabbitmq plugin...");
86+
this._connection.close();
87+
} catch (e) {
88+
this.logger.info("Error while shutting down gracefully", e);
89+
process.exit(1);
11690
}
11791
}
11892
};

lib/service_sub_queue_rabbit.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ServiceSubRabbit extends ServiceSubQueue {
6464

6565
yield channel.assertQueue(queueName, { durable: true });
6666

67+
const consumerTag = process.env.HOSTNAME || "";
6768
channel.consume(queueName, async(function* (message) {
6869
try {
6970
let payload = JSON.parse(message.content.toString());
@@ -74,7 +75,7 @@ class ServiceSubRabbit extends ServiceSubQueue {
7475
catch (e) {
7576
channel.nack(message);
7677
}
77-
}.bind(this)));
78+
}.bind(this)), { consumerTag });
7879

7980
}
8081
}
@@ -95,4 +96,4 @@ class ServiceSubRabbit extends ServiceSubQueue {
9596
*unmount() { }
9697
}
9798

98-
module.exports = ServiceSubRabbit;
99+
module.exports = ServiceSubRabbit;

lib/service_sub_rabbit.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class ServiceSubRabbit extends ServiceSub {
107107

108108
yield channel.assertQueue(queueName, { durable: true });
109109

110+
const consumerTag = process.env.HOSTNAME || "";
110111
channel.consume(queueName, async(function* (message) {
111112
try {
112113
let payload = JSON.parse(message.content.toString());
@@ -120,7 +121,7 @@ class ServiceSubRabbit extends ServiceSub {
120121
catch (e) {
121122
channel.nack(message);
122123
}
123-
}));
124+
}), { consumerTag });
124125
}
125126

126127
*createQueue(publisherName, event, method) {
@@ -135,6 +136,7 @@ class ServiceSubRabbit extends ServiceSub {
135136
yield channel.bindQueue(queueName, exchangeName, "");
136137
this._queues.push(queueName);
137138

139+
const consumerTag = process.env.HOSTNAME || "";
138140
channel.consume(queueName, async(function* (message) {
139141
try {
140142
let payload = JSON.parse(message.content.toString());
@@ -145,7 +147,7 @@ class ServiceSubRabbit extends ServiceSub {
145147
catch (e) {
146148
channel.nack(message);
147149
}
148-
}.bind(this)));
150+
}.bind(this)), { consumerTag });
149151

150152
}
151153
}

0 commit comments

Comments
 (0)