Skip to content

Commit 75c1665

Browse files
committed
Now do everything in init
1 parent c70e0a6 commit 75c1665

File tree

1 file changed

+28
-22
lines changed

1 file changed

+28
-22
lines changed

lib/triggers/consume.js

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ const amqp = require('amqplib');
55
const encryptor = require('../encryptor.js');
66
const debug = require('debug')('consumer');
77

8-
let channel;
9-
const queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
10-
118
/**
129
* This function will be called on component before the first message will be processed
1310
*
@@ -17,13 +14,31 @@ function init(cfg) {
1714
console.log('Starting initialization, cfg=%j queueName=%s', cfg, queueName);
1815
const amqpURI = cfg.amqpURI;
1916
const amqpExchange = cfg.topic;
17+
const queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
2018
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
19+
20+
/**
21+
* Consumer function that will be called when new message
22+
* is received
23+
*
24+
* @param msg
25+
*/
26+
const consumer = (msg) => {
27+
debug('Have got message fields=%j properties=%j', msg.fields, msg.properties);
28+
const decrypted = encryptor.decryptMessageContent(msg.content);
29+
debug('Decrypted message=%j', decrypted);
30+
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
31+
newMsg.id = msg.properties.messageId;
32+
newMsg.attachments = decrypted.attachments || {};
33+
this.emit('data', newMsg);
34+
};
35+
2136
return co(function* initialize() {
2237
debug('Connecting to amqpURI=%s', amqpURI);
2338
const conn = yield amqp.connect(amqpURI);
2439

2540
debug('Creating a receiver channel');
26-
channel = yield conn.createChannel();
41+
const channel = yield conn.createChannel();
2742

2843
debug('Asserting topic exchange exchange=%s', amqpExchange);
2944
yield channel.assertExchange(amqpExchange, 'topic');
@@ -40,6 +55,13 @@ function init(cfg) {
4055
yield channel.bindQueue(queueName, amqpExchange, key);
4156
}
4257
console.log('Initialization completed');
58+
59+
console.log('Starting consuming from queue=%s', queueName);
60+
yield channel.consume(queueName, consumer, {
61+
noAck: true, // We can't really assert if message was consumed if we emit it yet
62+
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`
63+
});
64+
console.log('Consumption started');
4365
});
4466
}
4567

@@ -50,24 +72,8 @@ function init(cfg) {
5072
* @param cfg configuration that is account information and configuration field values
5173
*/
5274
function processAction(msg, cfg) {
53-
console.log('Trigger started, cfg=%j', cfg);
54-
const consumer = (msg) => {
55-
debug('Have got message fields=%j properties=%j', msg.fields, msg.properties);
56-
const decrypted = encryptor.decryptMessageContent(msg.content);
57-
debug('Decrypted message=%j', decrypted);
58-
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
59-
newMsg.id = msg.properties.messageId;
60-
newMsg.attachments = decrypted.attachments || {};
61-
this.emit('data', newMsg);
62-
};
63-
return co(function* consume() {
64-
console.log('Starting consuming from %s', queueName);
65-
yield channel.consume(queueName, consumer, {
66-
noAck: true, // We can't really assert if message was consumed if we emit it yet
67-
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`
68-
});
69-
console.log('Consumption started');
70-
});
75+
console.log('Trigger started but we don not need to do anything here');
76+
return Promise.resolve();
7177
}
7278

7379
module.exports.process = processAction;

0 commit comments

Comments
 (0)