Skip to content

Commit 9de93bb

Browse files
committed
Initial implementation
1 parent 20896d9 commit 9de93bb

File tree

5 files changed

+112
-22
lines changed

5 files changed

+112
-22
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ $ git push elasticio master
3838
This component is using the internal RabbitMQ instance that is available as part of elastic.io
3939
infrastructure, it expects following environment variables to be present when started:
4040
* ``ELASTICIO_AMQP_URI`` something like ``amqp://foo:bar@server``
41-
* ``ELASTICIO_MESSAGE_CRYPTO_IV`` key for symmetric encryption
41+
* ``ELASTICIO_MESSAGE_CRYPTO_IV`` vector for symmetric encryption
4242
* ``ELASTICIO_MESSAGE_CRYPTO_PASSWORD`` password for symmetric encryption
43-
* ``ELASTICIO_PUBLISH_MESSAGES_TO`` AMQP exchange name, like ``56a63248661ec20500000014_user``
43+
* ``ELASTICIO_TASK_ID`` ID of the currently running task, used to construct name for exchange
44+
* ``ELASTICIO_USER_ID`` ID of the current user, used to construct name of the exchange
4445

4546

4647
## Known issues

lib/actions/publish.js

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
'use strict';
22
const eioUtils = require('elasticio-node').messages;
33
const co = require('co');
4-
const rp = require('request-promise');
4+
const amqp = require('amqplib');
5+
const encryptor = require('../encryptor.js');
6+
7+
var connection, channel;
58

69
module.exports.process = processAction;
710

@@ -13,23 +16,31 @@ module.exports.process = processAction;
1316
* @param snapshot - current values from the snapshot
1417
*/
1518
function processAction(msg, cfg, snapshot) {
16-
console.log('Action started, snapshot=%j', snapshot);
17-
19+
console.log('Action started');
20+
const amqpURI = process.env.ELASTICIO_AMQP_URI;
21+
const amqpExchange = `pubsub_${process.env.ELASTICIO_TASK_ID}_${process.env.ELASTICIO_USER_ID}`;
1822
co(function*() {
19-
console.log('Creating new request bin');
20-
21-
const bin = yield rp({
22-
method: 'POST',
23-
uri: 'http://requestb.in/api/v1/bins',
24-
json: true
23+
if (!connection) {
24+
console.log('Connecting to amqpURI=%s', amqpURI);
25+
connection = yield amqp.connect(amqpURI);
26+
}
27+
if (!channel) {
28+
console.log('Creating a confirm channel');
29+
channel = yield conn.createConfirmChannel();
30+
console.log('Asserting topic exchange exchange=%s', amqpExchange);
31+
yield channel.assertExchange(amqpExchange, 'topic');
32+
}
33+
console.log('Publishing message id=%s', msg.id);
34+
let encryptedData = encryptor.encryptMessageContent({
35+
id: msg.id,
36+
body: msg.body,
37+
attachments: msg.attachments
2538
});
26-
27-
console.log('New request bin created bin=%j', bin);
28-
29-
this.emit('data', eioUtils.newMessageWithBody(bin));
30-
31-
console.log('Processing completed');
32-
39+
channel.publish(amqpExchange, 'foo', encryptedData);
40+
console.log('Message published id=%s', msg.id);
41+
yield channel.waitForConfirms();
42+
console.log('Message publishing confirmed id=%s', msg.id);
43+
this.emit('data', msg);
3344
this.emit('end');
3445
}.bind(this)).catch(err => {
3546
console.log('Error occurred', err.stack || err);

lib/cipher.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
var _ = require('lodash');
2+
var crypto = require('crypto');
3+
var debug = require('debug')('sailor:cipher');
4+
5+
var ALGORYTHM = 'aes-256-cbc';
6+
var PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD;
7+
var VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV;
8+
9+
exports.id = 1;
10+
exports.encrypt = encryptIV;
11+
exports.decrypt = decryptIV;
12+
13+
function encryptIV(rawData) {
14+
debug('About to encrypt:', rawData);
15+
16+
if (!_.isString(rawData)) {
17+
throw new Error('RabbitMQ message cipher.encryptIV() accepts only string as parameter.');
18+
}
19+
20+
if (!PASSWORD) {
21+
return rawData;
22+
}
23+
24+
if (!VECTOR) {
25+
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
26+
}
27+
28+
var encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
29+
var cipher = crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR);
30+
return cipher.update(rawData, 'utf-8', 'base64') + cipher.final('base64');
31+
}
32+
33+
function decryptIV(encData, options) {
34+
debug('About to decrypt:', encData);
35+
36+
options = options || {};
37+
38+
if (!_.isString(encData)) {
39+
throw new Error('RabbitMQ message cipher.decryptIV() accepts only string as parameter.');
40+
}
41+
42+
if (!PASSWORD) {
43+
return encData;
44+
}
45+
46+
if (!VECTOR) {
47+
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
48+
}
49+
50+
var decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
51+
var cipher = crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR);
52+
53+
var result = cipher.update(encData, 'base64', 'utf-8') + cipher.final('utf-8');
54+
55+
return result;
56+
}

lib/encryptor.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
var cipher = require('./cipher.js');
2+
3+
exports.encryptMessageContent = encryptMessageContent;
4+
exports.decryptMessageContent = decryptMessageContent;
5+
6+
function encryptMessageContent(messagePayload) {
7+
return cipher.encrypt(JSON.stringify(messagePayload));
8+
}
9+
10+
function decryptMessageContent(messagePayload, messageHeaders) {
11+
if (!messagePayload || messagePayload.toString().length === 0) {
12+
return null;
13+
}
14+
try {
15+
return JSON.parse(cipher.decrypt(messagePayload.toString(), messageHeaders));
16+
} catch (err) {
17+
console.error(err.stack);
18+
throw Error('Failed to decrypt message: ' + err.message);
19+
}
20+
}

package.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
"elasticio-component"
2323
],
2424
"dependencies": {
25-
"co": "^4.6.0",
25+
"amqplib": "^0.5.1",
2626
"bluebird": "^3.4.6",
27-
"request": "^2.75.0",
28-
"request-promise": "^4.1.1",
27+
"co": "^4.6.0",
28+
"debug": "^2.4.4",
29+
"elasticio-node": "0.0.5",
2930
"elasticio-sailor-nodejs": "1.3.0",
30-
"elasticio-node": "0.0.5"
31+
"request": "^2.75.0",
32+
"request-promise": "^4.1.1"
3133
},
3234
"devDependencies": {
3335
"eslint": "^2.1.0",

0 commit comments

Comments
 (0)