Skip to content

Commit 4c4e1d7

Browse files
committed
Added consumer. Fixed typo
1 parent 626d31b commit 4c4e1d7

File tree

4 files changed

+70
-1
lines changed

4 files changed

+70
-1
lines changed

component.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,16 @@
2323
"placeholder": "up_to_200_symbols"
2424
}
2525
},
26+
"metadata": {}
27+
}
28+
},
29+
"triggers": {
30+
"consume": {
31+
"title": "Consume",
32+
"main": "./lib/actions/consume.js",
2633
"metadata": {
34+
"in": {},
35+
"out": "./lib/schemas/consume.out.json"
2736
}
2837
}
2938
}

lib/actions/publish.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ module.exports.process = processAction;
1616
function processAction(msg, cfg) {
1717
console.log('Action started');
1818
const amqpURI = cfg.amqpURI;
19-
const amqpExchange = cfg.topc;
19+
const amqpExchange = cfg.topic;
2020
co(function*() {
2121
if (!conn) {
2222
console.log('Connecting to amqpURI=%s', amqpURI);

lib/schemas/consume.out.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"type": "object",
3+
"properties": {
4+
"outValue": {
5+
"type": "string",
6+
"required": true,
7+
"title": "Output Value"
8+
}
9+
}
10+
}

lib/triggers/consume.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict';
2+
const eioUtils = require('elasticio-node').messages;
3+
const co = require('co');
4+
const amqp = require('amqplib');
5+
const encryptor = require('../encryptor.js');
6+
7+
var conn, channel;
8+
9+
module.exports.process = processAction;
10+
11+
/**
12+
* This method will be called from elastic.io platform providing following data
13+
*
14+
* @param msg incoming message object that contains ``body`` with payload
15+
* @param cfg configuration that is account information and configuration field values
16+
*/
17+
function processAction(msg, cfg) {
18+
console.log('Trigger started, cfg=%j', cfg);
19+
const amqpURI = cfg.amqpURI;
20+
const amqpExchange = cfg.topc;
21+
22+
co(function*() {
23+
if (!conn) {
24+
console.log('Connecting to amqpURI=%s', amqpURI);
25+
conn = yield amqp.connect(amqpURI);
26+
}
27+
if (!channel) {
28+
console.log('Creating a confirm channel');
29+
channel = yield conn.createConfirmChannel();
30+
console.log('Asserting topic exchange exchange=%s', amqpExchange);
31+
yield channel.assertExchange(amqpExchange, 'topic');
32+
}
33+
console.log('Publishing message id=%s', msg.id);
34+
let encryptedData = encryptor.encryptMessageContent({
35+
id: msg.id,
36+
body: msg.body,
37+
attachments: msg.attachments
38+
});
39+
channel.publish(amqpExchange, 'foo', encryptedData);
40+
console.log('Message published id=%s', msg.id);
41+
yield channel.waitForConfirms();
42+
console.log('Message publishing confirmed id=%s', msg.id);
43+
this.emit('data', msg);
44+
this.emit('end');
45+
}.bind(this)).catch(err => {
46+
console.log('Error occurred', err.stack || err);
47+
this.emit('error', err);
48+
this.emit('end');
49+
});
50+
}

0 commit comments

Comments
 (0)