Skip to content

Commit c70e0a6

Browse files
committed
Improved consumer
1 parent 9755431 commit c70e0a6

File tree

2 files changed

+83
-73
lines changed

2 files changed

+83
-73
lines changed

lib/actions/publish.js

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,17 @@ module.exports.init = init;
1414
* @param cfg
1515
*/
1616
function init(cfg) {
17-
console.log('Starting initialization, cfg=%j', cfg);
18-
const amqpURI = cfg.amqpURI;
19-
const amqpExchange = cfg.topic;
20-
return co(function* gen() {
21-
console.log('Connecting to amqpURI=%s', amqpURI);
22-
const conn = yield amqp.connect(amqpURI);
23-
console.log('Creating a confirm channel');
24-
channel = yield conn.createConfirmChannel();
25-
console.log('Asserting topic exchange exchange=%s', amqpExchange);
26-
yield channel.assertExchange(amqpExchange, 'topic');
27-
});
17+
console.log('Starting initialization, cfg=%j', cfg);
18+
const amqpURI = cfg.amqpURI;
19+
const amqpExchange = cfg.topic;
20+
return co(function* gen() {
21+
console.log('Connecting to amqpURI=%s', amqpURI);
22+
const conn = yield amqp.connect(amqpURI);
23+
console.log('Creating a confirm channel');
24+
channel = yield conn.createConfirmChannel();
25+
console.log('Asserting topic exchange exchange=%s', amqpExchange);
26+
yield channel.assertExchange(amqpExchange, 'topic');
27+
});
2828
}
2929

3030
/**
@@ -34,20 +34,20 @@ function init(cfg) {
3434
* @param cfg configuration that is account information and configuration field values
3535
*/
3636
function processAction(msg, cfg) {
37-
const amqpExchange = cfg.topic;
38-
return co(function* sendMessage() {
39-
console.log('Publishing message id=%s', msg.id);
40-
let encryptedData = encryptor.encryptMessageContent({
41-
body: msg.body.payload || msg.body,
42-
attachments: msg.attachments
43-
});
44-
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
45-
contentType: "application/octet-stream",
46-
messageId: msg.id
47-
});
48-
console.log('Message published id=%s', msg.id);
49-
yield channel.waitForConfirms();
50-
console.log('Message publishing confirmed id=%s', msg.id);
51-
this.emit('data', msg);
52-
}.bind(this));
37+
const amqpExchange = cfg.topic;
38+
return co(function* sendMessage() {
39+
console.log('Publishing message id=%s', msg.id);
40+
let encryptedData = encryptor.encryptMessageContent({
41+
body: msg.body.payload || msg.body,
42+
attachments: msg.attachments
43+
});
44+
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
45+
contentType: "application/octet-stream",
46+
messageId: msg.id
47+
});
48+
console.log('Message published id=%s', msg.id);
49+
yield channel.waitForConfirms();
50+
console.log('Message publishing confirmed id=%s', msg.id);
51+
this.emit('data', msg);
52+
}.bind(this));
5353
}

lib/triggers/consume.js

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,60 +5,70 @@ const amqp = require('amqplib');
55
const encryptor = require('../encryptor.js');
66
const debug = require('debug')('consumer');
77

8-
module.exports.process = processAction;
8+
let channel;
9+
const queueName = `eio_consumer_${process.env.ELASTICIO_FLOW_ID}_${process.env.ELASTICIO_USER_ID}`;
910

1011
/**
11-
* This method will be called from elastic.io platform providing following data
12+
* This function will be called on component before the first message will be processed
1213
*
13-
* @param msg incoming message object that contains ``body`` with payload
14-
* @param cfg configuration that is account information and configuration field values
14+
* @param cfg
1515
*/
16-
function processAction(msg, cfg) {
17-
console.log('Trigger started, cfg=%j', cfg);
18-
const amqpURI = cfg.amqpURI;
19-
const amqpExchange = cfg.topic;
20-
const queueName = `eio_consumer_${process.env.ELASTICIO_TASK_ID}_${process.env.ELASTICIO_USER_ID}`;
21-
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
22-
const consumer = (msg) => {
23-
console.log('Have got message fields=%j properties=%j', msg.fields, msg.properties);
24-
const decrypted = encryptor.decryptMessageContent(msg.content);
25-
debug('Decrypted message=%j', decrypted);
26-
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
27-
newMsg.id = msg.properties.messageId;
28-
newMsg.attachments = decrypted.attachments || {};
29-
this.emit('data', newMsg);
30-
};
31-
co(function*() {
32-
debug('Connecting to amqpURI=%s', amqpURI);
33-
const conn = yield amqp.connect(amqpURI);
16+
function init(cfg) {
17+
console.log('Starting initialization, cfg=%j queueName=%s', cfg, queueName);
18+
const amqpURI = cfg.amqpURI;
19+
const amqpExchange = cfg.topic;
20+
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
21+
return co(function* initialize() {
22+
debug('Connecting to amqpURI=%s', amqpURI);
23+
const conn = yield amqp.connect(amqpURI);
3424

35-
debug('Creating a receiver channel');
36-
const channel = yield conn.createChannel();
25+
debug('Creating a receiver channel');
26+
channel = yield conn.createChannel();
3727

38-
debug('Asserting topic exchange exchange=%s', amqpExchange);
39-
yield channel.assertExchange(amqpExchange, 'topic');
28+
debug('Asserting topic exchange exchange=%s', amqpExchange);
29+
yield channel.assertExchange(amqpExchange, 'topic');
4030

41-
debug('Asserting queue');
42-
yield channel.assertQueue(queueName, {
43-
exclusive: false,
44-
durable: false,
45-
autoDelete: true
46-
});
31+
debug('Asserting queue');
32+
yield channel.assertQueue(queueName, {
33+
exclusive: false,
34+
durable: false,
35+
autoDelete: true
36+
});
4737

48-
for (let key of keys) {
49-
debug(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
50-
yield channel.bindQueue(queueName, amqpExchange, key);
51-
}
38+
for (let key of keys) {
39+
debug(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
40+
yield channel.bindQueue(queueName, amqpExchange, key);
41+
}
42+
console.log('Initialization completed');
43+
});
44+
}
5245

53-
console.log('Start consuming');
54-
yield channel.consume(queueName, consumer, {
55-
noAck: true, // We can't really assert if message was consumed if we emit it yet
56-
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_TASK_ID}`
46+
/**
47+
* This method will be called from elastic.io platform providing following data
48+
*
49+
* @param msg incoming message object that contains ``body`` with payload
50+
* @param cfg configuration that is account information and configuration field values
51+
*/
52+
function processAction(msg, cfg) {
53+
console.log('Trigger started, cfg=%j', cfg);
54+
const consumer = (msg) => {
55+
debug('Have got message fields=%j properties=%j', msg.fields, msg.properties);
56+
const decrypted = encryptor.decryptMessageContent(msg.content);
57+
debug('Decrypted message=%j', decrypted);
58+
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
59+
newMsg.id = msg.properties.messageId;
60+
newMsg.attachments = decrypted.attachments || {};
61+
this.emit('data', newMsg);
62+
};
63+
return co(function* consume() {
64+
console.log('Starting consuming from %s', queueName);
65+
yield channel.consume(queueName, consumer, {
66+
noAck: true, // We can't really assert if message was consumed if we emit it yet
67+
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_FLOW_ID}`
68+
});
69+
console.log('Consumption started');
5770
});
58-
console.log('After consume!');
59-
}.bind(this)).catch(err => {
60-
console.log('Error occurred', err.stack || err);
61-
this.emit('error', err);
62-
this.emit('end');
63-
});
6471
}
72+
73+
module.exports.process = processAction;
74+
module.exports.init = init;

0 commit comments

Comments
 (0)