Skip to content

Commit 0069f4d

Browse files
ShkarupaNickif0s
andauthored
add channel close in shutdown hook (#21)
* add channel close in shutdown hook * up --------- Co-authored-by: if0s <[email protected]>
1 parent 9c862f0 commit 0069f4d

File tree

5 files changed

+20
-5
lines changed

5 files changed

+20
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.4.2 (March 27, 2023)
2+
Fixed issue with shutdown hook in `Consume` trigger
3+
14
## 1.4.1 (March 27, 2023)
25
Fixed connection cloning in `Publish` action
36

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"title": "AMQP component",
33
"description": "AMQP Component for async communication with queues and topics",
4-
"version": "1.4.1",
4+
"version": "1.4.2",
55
"credentials": {
66
"fields": {
77
"amqpURI": {

lib/actions/publish.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async function processAction(msg, cfg) {
2121
attachments: msg.attachments,
2222
});
2323
}
24-
this.logger.info('Publishing message...');
24+
this.logger.info({ routingKey: msg.body.routingKey }, `Publishing message with routingKey ${msg.body.routingKey}...`);
2525

2626
await amqpClient.publish(msg.body.routingKey || '', data, {
2727
contentType: cfg.contentType || 'application/octet-stream',

lib/amqp.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ class AMQPClient {
2929
this.logger = logger;
3030
}
3131

32+
async shutdown() {
33+
if (this.channel) {
34+
await this.channel.close();
35+
this.logger.info('The channel closed successfully');
36+
}
37+
if (this.connection) {
38+
await this.connection.close();
39+
this.logger.info('The connection closed successfully');
40+
}
41+
}
42+
3243
init(confirmChannel = true, assertExchangeOptions, deleteQueue) {
3344
return new Promise((resolve, reject) => {
3445
this.connection = amqp.connect([this.cfg.amqpURI], {
@@ -61,14 +72,14 @@ class AMQPClient {
6172
confirm: confirmChannel,
6273
setup: async (channel) => {
6374
try {
64-
this.logger.debug('Asserting topic exchange...');
75+
this.logger.debug({ exchange: this.cfg.topic }, 'Asserting exchange...');
6576
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
6677
if (!confirmChannel) {
67-
this.logger.debug('Asserting queue');
78+
this.logger.debug({ queueName: this.queueName }, 'Asserting queue');
6879
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
6980
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
7081
for (const key of keys) {
71-
this.logger.debug('Binding queue to exchange...');
82+
this.logger.debug({ key, queueName: this.queueName, exchange: this.cfg.topic }, 'Binding queue to exchange...');
7283
await channel.bindQueue(this.queueName, this.cfg.topic, key);
7384
}
7485
}

lib/triggers/consume.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ async function processAction(msg, cfg) {
4242
async function shutdown(cfg) {
4343
amqpClient = new AMQPClient(cfg, this);
4444
await amqpClient.init(true, {}, true);
45+
await amqpClient.shutdown();
4546
}
4647

4748
module.exports.process = processAction;

0 commit comments

Comments
 (0)