Skip to content

Commit 613864a

Browse files
committed
Updated consumer
1 parent 5a85209 commit 613864a

File tree

4 files changed

+20
-12
lines changed

4 files changed

+20
-12
lines changed

lib/actions/publish.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ function processAction(msg, cfg) {
3434
body: msg.body,
3535
attachments: msg.attachments
3636
});
37-
channel.publish(amqpExchange, 'foo', encryptedData);
37+
channel.publish(amqpExchange, 'foo', encryptedData, {
38+
contentType: "application/octet-stream",
39+
messageId: msg.id
40+
});
3841
console.log('Message published id=%s', msg.id);
3942
yield channel.waitForConfirms();
4043
console.log('Message publishing confirmed id=%s', msg.id);

lib/cipher.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@ function encryptIV(rawData) {
3131
return Buffer.concat([cipher.update(new Buffer.from(rawData)),cipher.final()]);
3232
}
3333

34-
function decryptIV(encData, options) {
34+
function decryptIV(encData) {
3535
debug('About to decrypt:', encData);
3636

37-
options = options || {};
38-
3937
if (!_.isString(encData)) {
4038
throw new Error('RabbitMQ message cipher.decryptIV() accepts only string as parameter.');
4139
}

lib/encryptor.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ function decryptMessageContent(messagePayload, messageHeaders) {
1212
return null;
1313
}
1414
try {
15-
return JSON.parse(cipher.decrypt(messagePayload.toString(), messageHeaders));
15+
return JSON.parse(cipher.decrypt(messagePayload.toString()));
1616
} catch (err) {
1717
console.error(err.stack);
1818
throw Error('Failed to decrypt message: ' + err.message);

lib/triggers/consume.js

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const eioUtils = require('elasticio-node').messages;
33
const co = require('co');
44
const amqp = require('amqplib');
55
const encryptor = require('../encryptor.js');
6+
const debug = require('debug')('consumer');
67

78
module.exports.process = processAction;
89

@@ -19,28 +20,33 @@ function processAction(msg, cfg) {
1920
const queueName = `eio_consumer_${process.env.ELASTICIO_TASK_ID}_${process.env.ELASTICIO_USER_ID}`;
2021
const keys = (cfg.bindingKeys || '#').split(',').map((s) => s.trim());
2122
const consumer = (msg) => {
22-
console.log('Have got message %s', JSON.stringify(msg.content.toString()));
23-
this.emit('data', eioUtils.newMessageWithBody({}));
23+
console.log('Have got message id=%s fields=%j', msg.id, msg.fields);
24+
const decrypted = encryptor.decryptMessageContent(msg.content);
25+
debug('Decrypted message=%j', decrypted);
26+
const newMsg = eioUtils.newMessageWithBody(decrypted.body || {});
27+
newMsg.id = msg.id;
28+
newMsg.attachments = decrypted.attachments || {};
29+
this.emit('data', newMsg);
2430
};
2531
co(function*() {
26-
console.log('Connecting to amqpURI=%s', amqpURI);
32+
debug('Connecting to amqpURI=%s', amqpURI);
2733
const conn = yield amqp.connect(amqpURI);
2834

29-
console.log('Creating a receiver channel');
35+
debug('Creating a receiver channel');
3036
const channel = yield conn.createChannel();
3137

32-
console.log('Asserting topic exchange exchange=%s', amqpExchange);
38+
debug('Asserting topic exchange exchange=%s', amqpExchange);
3339
yield channel.assertExchange(amqpExchange, 'topic');
3440

35-
console.log('Asserting queue');
41+
debug('Asserting queue');
3642
yield channel.assertQueue(queueName, {
3743
exclusive: false,
3844
durable: false,
3945
autoDelete: true
4046
});
4147

4248
for (let key of keys) {
43-
console.log(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
49+
debug(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
4450
yield channel.bindQueue(queueName, amqpExchange, key);
4551
}
4652

@@ -49,6 +55,7 @@ function processAction(msg, cfg) {
4955
noAck: true, // We can't really assert if message was consumed if we emit it yet
5056
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_TASK_ID}`
5157
});
58+
console.log('After consume!');
5259
}.bind(this)).catch(err => {
5360
console.log('Error occurred', err.stack || err);
5461
this.emit('error', err);

0 commit comments

Comments
 (0)