Skip to content

Commit 8246f7c

Browse files
authored
logs and messages in the last flow execution (#25)
1 parent bd0ad31 commit 8246f7c

File tree

8 files changed

+186
-711
lines changed

8 files changed

+186
-711
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 1.4.3 (June 17, 2024)
2+
* Now logs and messages for `Consume` trigger will be located in the last flow execution
3+
* Upgrade to sailor 2.7.2
4+
* Upgrade amqplib to 0.10.4
5+
16
## 1.4.2 (March 27, 2023)
27
Fixed issue with shutdown hook in `Consume` trigger
38

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ If the exchange doesn't exist it will be created on start.
4848

4949
#### Please note: The flow must be set as real-time! Otherwise, errors may appear.
5050

51-
We recommend you set the lowest flow schedule (cron expression) frequency possible. E.g. once a day (0 0 * * *). And start the flow with the button ‘Run Now’ manually. Even though it does not affect the logic directly, each scheduled flow execution will create a record in the Executions list with no messages and no logs inside. All the logs and emitted messages will be appearing in the first execution.
51+
We recommend you set the lowest flow schedule (cron expression) frequency possible. E.g. once a day (0 0 * * *). And start the flow with the button ‘Run Now’ manually. Even though it does not affect the logic directly, each scheduled flow execution will create a record in the Executions list and can make debugging difficult. All the logs and emitted messages will be appearing in the last execution.
5252

5353
#### Configuration Fields
5454
* **Exchange** - (string, required): Exchange name where you want to get messages

component.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"title": "AMQP component",
33
"description": "AMQP Component for async communication with queues and topics",
4-
"version": "1.4.2",
4+
"version": "1.4.3",
55
"credentials": {
66
"fields": {
77
"amqpURI": {
@@ -60,7 +60,7 @@
6060
},
6161
"triggers": {
6262
"consume": {
63-
"title": "Consume",
63+
"title": "Consume (Real-time flows only)",
6464
"main": "./lib/triggers/consume.js",
6565
"type": "polling",
6666
"fields": {

lib/actions/publish.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async function processAction(msg, cfg) {
2121
attachments: msg.attachments,
2222
});
2323
}
24-
this.logger.info({ routingKey: msg.body.routingKey }, `Publishing message with routingKey ${msg.body.routingKey}...`);
24+
this.logger.info(`Publishing message with routingKey ${msg.body.routingKey}...`);
2525

2626
await amqpClient.publish(msg.body.routingKey || '', data, {
2727
contentType: cfg.contentType || 'application/octet-stream',

lib/amqp.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ class AMQPClient {
7272
confirm: confirmChannel,
7373
setup: async (channel) => {
7474
try {
75-
this.logger.debug({ exchange: this.cfg.topic }, 'Asserting exchange...');
75+
this.logger.debug(`Asserting exchange "${this.cfg.topic}"...`);
7676
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
7777
if (!confirmChannel) {
78-
this.logger.debug({ queueName: this.queueName }, 'Asserting queue');
78+
this.logger.debug(`Asserting queue "${this.queueName}"`);
7979
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
8080
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
8181
for (const key of keys) {
82-
this.logger.debug({ key, queueName: this.queueName, exchange: this.cfg.topic }, 'Binding queue to exchange...');
82+
this.logger.debug(`Key: "${key}". Binding queue "${this.queueName}" to exchange "${this.cfg.topic}"...`);
8383
await channel.bindQueue(this.queueName, this.cfg.topic, key);
8484
}
8585
}

lib/triggers/consume.js

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,57 @@ const encryptor = require('../encryptor.js');
44
const { AMQPClient } = require('../amqp.js');
55

66
let amqpClient;
7+
let context;
78

89
// eslint-disable-next-line consistent-return
910
async function processAction(msg, cfg) {
10-
this.logger.info('Trigger started');
11+
context = this;
1112
if (!amqpClient || !amqpClient.connection) {
12-
amqpClient = new AMQPClient(cfg, this);
13+
context.logger.info('Trigger started');
14+
amqpClient = new AMQPClient(cfg, context);
1315
await amqpClient.init(false);
1416
} else {
15-
this.logger.info('Trigger was called again, we will ignore this run');
17+
context.logger.info('Trigger is running, waiting for new messages');
18+
amqpClient.setLogger(context.logger);
1619
return;
1720
}
1821

1922
// eslint-disable-next-line no-shadow
2023
const consumer = (msg) => {
21-
this.logger.debug('New message got');
24+
context.logger.debug('Got a new message');
2225
let data;
26+
2327
if (cfg.doNotDecrypt) {
24-
data = JSON.parse(msg.content);
28+
try {
29+
data = JSON.parse(msg.content);
30+
} catch (err) {
31+
const errMsg = 'Failed to parse message, if it is encrypted you need to uncheck "Don\'t decrypt payload"';
32+
context.logger.error(errMsg);
33+
context.emit('error', errMsg);
34+
return;
35+
}
2536
} else {
26-
data = encryptor.decryptMessageContent(this, msg.content);
27-
this.logger.debug('Message decrypted');
37+
try {
38+
data = encryptor.decryptMessageContent(context, msg.content);
39+
context.logger.debug('Message decrypted');
40+
} catch (err) {
41+
const errMsg = `${err.message}`;
42+
context.logger.error(errMsg);
43+
context.emit('error', errMsg);
44+
return;
45+
}
2846
}
47+
2948
const newMsg = messages.newMessageWithBody(data || {});
3049
newMsg.id = msg.properties.messageId;
3150
newMsg.attachments = data.attachments || {};
32-
this.emit('data', newMsg);
51+
context.emit('data', newMsg);
3352
};
3453
await amqpClient.consume(consumer, {
3554
noAck: true,
3655
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`,
3756
});
38-
this.logger.info('Consumption started');
57+
context.logger.info('Consumption started');
3958
}
4059

4160
// eslint-disable-next-line no-unused-vars

0 commit comments

Comments
 (0)