Skip to content

Commit 3dec4e8

Browse files
authored
Amqp reconnection (#18)
1 parent deb0857 commit 3dec4e8

File tree

14 files changed

+2875
-1592
lines changed

14 files changed

+2875
-1592
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ jobs:
77
- checkout
88
- restore_cache:
99
key: dependency-cache-{{ checksum "package.json" }}
10-
- run:
11-
name: Audit Dependencies
12-
command: npm audit --audit-level=high
1310
- run:
1411
name: Installing Dependencies
1512
command: npm install
13+
- run:
14+
name: Audit Dependencies
15+
command: npm run audit
1616
- save_cache:
1717
key: dependency-cache-{{ checksum "package.json" }}
1818
paths:

.eslintrc.js

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
module.exports = {
2-
'extends': 'airbnb-base',
3-
'env': {
4-
'mocha': true,
5-
'node': true,
6-
}
2+
extends: 'airbnb-base',
3+
env: {
4+
mocha: true,
5+
node: true,
6+
},
7+
rules: {
8+
'no-await-in-loop': 0,
9+
'max-len': ['error', { code: 180 }],
10+
'no-restricted-syntax': 0,
11+
},
712
};

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
## 1.4.0 (March 25, 2023)
2+
* Implemented retry mechanism on connection errors
3+
* Added configuration fields to set retry options
4+
* Added `Don't encrypt payload` and `Content-Type` configuration fields to `Publish` action
5+
* Added `Don't decrypt payload` configuration field to `Consume` trigger
6+
* Upgrade to sailor 2.7.1
7+
* Upgrade amqplib to 0.10.3
8+
19
## 1.3.3 (March 25, 2021)
210

311
* Upgrade to sailor 2.6.24

README.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
A component designed to talk to Advanced Message Queuing Protocol,
88
(**AMQP**) APIs. AMQP is an open standard for passing business messages
9-
between applications or organisations (see [amqp.org](https://www.amqp.org) for more).
9+
between applications or organizations (see [amqp.org](https://www.amqp.org) for more).
1010

1111
AMQP component establishes an asynchronous communications with queues and topics
1212
to publish or consume records.
@@ -23,8 +23,7 @@ keys that are specified in one string separated by commas.
2323

2424
### Environment variables
2525

26-
This component will automatically encrypt data that is sent to the queue when following
27-
environment variables are set:
26+
This component will automatically encrypt data that is sent to the queue when following environment variables are set and `Don't encrypt payload` unchecked
2827

2928
* `ELASTICIO_MESSAGE_CRYPTO_IV` - vector for symmetric encryption
3029
* `ELASTICIO_MESSAGE_CRYPTO_PASSWORD` - password for symmetric encryption
@@ -47,23 +46,34 @@ also use URL syntax to provide further parameters and any other options
4746
Will consume the incoming message object that contains `body` with the payload.
4847
If the exchange doesn't exist it will be created on start.
4948

50-
Optionally you can use `#` or `*` to wildcard. For more information check the
51-
tutorial provided at the [RabbitMQ site](http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html).
49+
#### Configuration Fields
50+
* **Exchange** - (string, required): Exchange name where you want to get messages
51+
* **Binding Keys** - (string, optional): Optionally you can use `#` or `*` to wildcard. For more information check the tutorial provided at the [RabbitMQ site](http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html).
52+
* **Don't decrypt payload** - (checkbox, optional): If checked payload will be not decrypted
53+
* **Reconnect Timeout** - (string, optional, 5 by default, maximum 1000): In case of errors how long to wait until retry is seconds
54+
* **Reconnect Attempts** - (string, optional, 12 by default, maximum 1000): How many times try to reconnect before throw error
55+
5256

5357
## Actions
5458

5559
### Publish
56-
5760
Will publish the messages into an exchange. This exchange will be created on
5861
start if it doesn't exists.
5962

63+
#### Configuration Fields
64+
* **Exchange** - (string, required): Exchange name where you want to send message to
65+
* **Don't encrypt payload** - (checkbox, optional): If checked payload will be not encrypted
66+
* **Content-Type** - (string, optional): Content-Type of pushed payload, default is `application/octet-stream`
67+
* **Reconnect Timeout** - (string, optional, 5 by default, maximum 1000): In case of errors how long to wait until retry is seconds
68+
* **Reconnect Attempts** - (string, optional, 12 by default, maximum 1000): How many times try to reconnect before throw error. 12 by default
69+
70+
6071
## Known limitations
6172

6273
Following limitations of the component are known:
6374
* You can not publish to the default exchange.
6475
* All published exchanges are `topic` exchanges by default. However, with the `topic` exchanges one can emulate `direct` and `fanout` exchanges.
6576

66-
6777
## License
6878

6979
Apache-2.0 © [elastic.io GmbH](https://elastic.io)

component.json

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"title": "AMQP",
33
"buildType": "docker",
44
"description": "Pub/Sub Component for async communication with queues and topics",
5+
"version": "1.4.0",
56
"credentials": {
67
"fields": {
78
"amqpURI": {
@@ -23,6 +24,34 @@
2324
"required": true,
2425
"placeholder": "up_to_200_symbols",
2526
"note": "This exchange will be created on start if not exists."
27+
},
28+
"doNotEncrypt": {
29+
"label": "Don't encrypt payload",
30+
"viewClass": "CheckBoxView",
31+
"help": {
32+
"description": "If checked, payload will be not encrypted"
33+
}
34+
},
35+
"contentType": {
36+
"label": "Content-Type",
37+
"viewClass": "TextFieldWithNoteView",
38+
"note": "Content-Type of pushed payload, default is 'application/octet-stream'"
39+
},
40+
"reconnectTimeOut": {
41+
"label": "Reconnect Time Out",
42+
"required": false,
43+
"viewClass": "TextFieldView",
44+
"help": {
45+
"description": "In case of errors how long to wait until retry in seconds. 5 by default"
46+
}
47+
},
48+
"reconnectAttempts": {
49+
"label": "Reconnect Attempts",
50+
"required": false,
51+
"viewClass": "TextFieldView",
52+
"help": {
53+
"description": "How many times try to reconnect before throw error. 12 by default"
54+
}
2655
}
2756
},
2857
"metadata": {
@@ -49,6 +78,29 @@
4978
"required": false,
5079
"placeholder": "this.key,that.key",
5180
"note": "Optional. You can use <b>#</b> or <b>*</b> to wildcard, more info <a href=\"http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html\" target=\"_top\">here</a>"
81+
},
82+
"doNotDecrypt": {
83+
"label": "Don't decrypt payload",
84+
"viewClass": "CheckBoxView",
85+
"help": {
86+
"description": "If checked, payload will be not decrypted"
87+
}
88+
},
89+
"reconnectTimeOut": {
90+
"label": "Reconnect Timeout",
91+
"required": false,
92+
"viewClass": "TextFieldView",
93+
"help": {
94+
"description": "In case of errors how long to wait until retry is seconds. 5 by default"
95+
}
96+
},
97+
"reconnectAttempts": {
98+
"label": "Reconnect Attempts",
99+
"required": false,
100+
"viewClass": "TextFieldView",
101+
"help": {
102+
"description": "How many times try to reconnect before throw error. 12 by default"
103+
}
52104
}
53105
},
54106
"metadata": { }

lib/actions/publish.js

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,29 @@
1-
const amqp = require('amqplib');
2-
const logger = require('@elastic.io/component-logger')();
3-
41
const encryptor = require('../encryptor.js');
2+
const { AMQPClient } = require('../amqp.js');
53

6-
let channel;
7-
8-
/**
9-
* This method will be called from elastic.io platform on initialization
10-
*
11-
* @param cfg
12-
*/
13-
async function init(cfg) {
14-
logger.info('Starting initialization');
15-
const { amqpURI } = cfg;
16-
const amqpExchange = cfg.topic;
17-
logger.debug('Connecting to amqp...');
18-
const conn = await amqp.connect(amqpURI);
19-
logger.debug('Creating a confirm channel');
20-
channel = await conn.createConfirmChannel();
21-
logger.debug('Asserting topic exchange exchange...');
22-
await channel.assertExchange(amqpExchange, 'topic');
23-
}
4+
let amqpClient;
245

25-
/**
26-
* This method will be called from elastic.io platform providing following data
27-
*
28-
* @param msg incoming message object that contains ``body`` with payload
29-
* @param cfg configuration that is account information and configuration field values
30-
*/
316
async function processAction(msg, cfg) {
32-
const self = this;
33-
const amqpExchange = cfg.topic;
7+
if (!amqpClient || !amqpClient.connection) { amqpClient = new AMQPClient(cfg, this); }
8+
amqpClient.setLogger(this.logger);
349

35-
self.logger.info('Publishing message...');
36-
const encryptedData = encryptor.encryptMessageContent(self, {
37-
body: msg.body.payload || msg.body,
38-
attachments: msg.attachments,
39-
});
40-
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
41-
contentType: 'application/octet-stream',
10+
let data;
11+
if (cfg.doNotEncrypt) {
12+
data = msg.body.payload || msg.body;
13+
data = Buffer.from(JSON.stringify(data));
14+
} else {
15+
data = encryptor.encryptMessageContent(this, {
16+
body: msg.body.payload || msg.body,
17+
attachments: msg.attachments,
18+
});
19+
}
20+
this.logger.info('Publishing message...');
21+
22+
await amqpClient.publish(msg.body.routingKey || '', data, {
23+
contentType: cfg.contentType || 'application/octet-stream',
4224
messageId: msg.id,
4325
});
44-
self.logger.info('Message published');
45-
await channel.waitForConfirms();
46-
self.logger.info('Message publishing confirmed');
4726
return msg;
4827
}
4928

5029
module.exports.process = processAction;
51-
module.exports.init = init;

lib/amqp.js

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
const amqp = require('amqp-connection-manager');
2+
const platformLogger = require('@elastic.io/component-logger')();
3+
4+
const RECONNECT_TIMEOUT = 5;
5+
const RECONNECT_ATTEMPTS = 12;
6+
7+
const validateAttemptsAndTimeout = (input, def) => {
8+
if (!input) return def;
9+
if (input && Number(input).toString() !== 'NaN' && Number(input) <= 1000 && Number(input) >= 0) {
10+
return Number(input);
11+
}
12+
throw new Error('"Reconnect Timeout" and "Reconnect Attempts" should be valid number between 0 and 1000');
13+
};
14+
15+
class AMQPClient {
16+
constructor(cfg, context) {
17+
this.connection = null;
18+
this.channel = null;
19+
this.logger = context.logger || platformLogger;
20+
this.cfg = cfg;
21+
this.queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
22+
this.retry = 0;
23+
this.context = context;
24+
this.reconnectTimeOut = validateAttemptsAndTimeout(cfg.reconnectTimeOut, RECONNECT_TIMEOUT);
25+
this.reconnectAttempts = validateAttemptsAndTimeout(cfg.reconnectAttempts, RECONNECT_ATTEMPTS);
26+
}
27+
28+
setLogger(logger) {
29+
this.logger = logger;
30+
}
31+
32+
init(confirmChannel = true, assertExchangeOptions, deleteQueue) {
33+
return new Promise((resolve, reject) => {
34+
this.connection = amqp.connect([this.cfg.amqpURI], {
35+
reconnectTimeInSeconds: this.reconnectTimeOut,
36+
});
37+
this.connection.on('connect', () => {
38+
this.retry = 0;
39+
this.logger.info('Successfully connected to RabbitMQ');
40+
resolve();
41+
});
42+
this.connection.on('connectFailed', ({ err }) => {
43+
this.retry += 1;
44+
if (this.retry >= this.reconnectAttempts) {
45+
const errMsg = new Error(`Connection failed after ${this.reconnectAttempts} attempts`);
46+
this.connection.emit('error', { err: errMsg });
47+
this.context.emit('error', errMsg);
48+
delete this.connection;
49+
} else {
50+
this.logger.error(`Connection failed due to: ${err}, ${this.retry} of ${this.reconnectAttempts} retry after ${this.reconnectTimeOut}sec`);
51+
}
52+
});
53+
this.connection.on('disconnect', ({ err }) => { this.logger.error(`Connection disconnected due to: ${err}`); });
54+
this.connection.on('error', ({ err }) => {
55+
this.logger.error(`Connection encountered an error: ${err}`);
56+
reject(err);
57+
});
58+
this.connection.on('blocked', ({ reason }) => { this.logger.error(`Connection blocked due to: ${reason}`); });
59+
this.connection.on('unblocked', () => { this.logger.info('Connection unblocked'); });
60+
this.channel = this.connection.createChannel({
61+
confirm: confirmChannel,
62+
setup: async (channel) => {
63+
try {
64+
this.logger.debug('Asserting topic exchange...');
65+
await channel.assertExchange(this.cfg.topic, 'topic', assertExchangeOptions);
66+
if (!confirmChannel) {
67+
this.logger.debug('Asserting queue');
68+
await channel.assertQueue(this.queueName, { exclusive: false, durable: false });
69+
const keys = (this.cfg.bindingKeys || '#').split(',').map((s) => s.trim());
70+
for (const key of keys) {
71+
this.logger.debug('Binding queue to exchange...');
72+
await channel.bindQueue(this.queueName, this.cfg.topic, key);
73+
}
74+
}
75+
if (deleteQueue) {
76+
this.logger.info(`Deleting queue ${this.queueName}`);
77+
await channel.deleteQueue(this.queueName);
78+
}
79+
this.logger.info('Successfully finished channel setup');
80+
return channel;
81+
} catch (err) {
82+
this.logger.error(`Error on Channel setup: ${err}`);
83+
return channel;
84+
}
85+
},
86+
});
87+
});
88+
}
89+
90+
async publish(routingKey, content, options) {
91+
await this.init();
92+
this.logger.info('Going to publish message');
93+
await this.channel.publish(this.cfg.topic, routingKey, content, options);
94+
this.logger.info('Message published');
95+
}
96+
97+
async consume(onMessage, options) {
98+
this.logger.info(`Starting consuming from ${this.queueName}`);
99+
return this.channel.consume(this.queueName, onMessage, options);
100+
}
101+
102+
async waitForConnect() {
103+
return this.channel.waitForConnect();
104+
}
105+
}
106+
107+
module.exports.AMQPClient = AMQPClient;

0 commit comments

Comments
 (0)