Skip to content

Commit e6c2305

Browse files
committed
First version of the triger
1 parent 2477266 commit e6c2305

File tree

3 files changed

+41
-27
lines changed

3 files changed

+41
-27
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ Now we are ready to push it:
3333
$ git push elasticio master
3434
```
3535

36+
## How consumer works
37+
38+
Consumer will register a non-exclusive non-durable queue with autodelete=true without
39+
any dead-letter. Name of the queue will be dynamically
40+
generated based on the user ID, TASK ID prefixed with ``eio_consumer_``.
41+
This queue will be bound to the exchange with specified bound key or multiple
42+
bound keys that are specified in one string separated by commas.
43+
3644
## Authentication
3745

3846
This component exects user to provide a AMQP URI, username and password should be embedded

component.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@
3939
"placeholder": "up_to_200_symbols",
4040
"note": "This exchange will be created on start if not exists."
4141
},
42-
"bindingKey": {
43-
"label": "Binding Key",
42+
"bindingKeys": {
43+
"label": "Binding Keys",
4444
"viewClass": "TextFieldWithNoteView",
4545
"required": false,
46-
"placeholder": "dot.delimited.words",
47-
"note": "You can use <b>#</b> or <b>*</b> to wildcard, more info <a href=\"http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html\">here</a>"
46+
"placeholder": "this.key,that.key",
47+
"note": "Optional. You can use <b>#</b> or <b>*</b> to wildcard, more info <a href=\"http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html\">here</a>"
4848
}
4949
},
5050
"metadata": {

lib/triggers/consume.js

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ const co = require('co');
44
const amqp = require('amqplib');
55
const encryptor = require('../encryptor.js');
66

7-
var conn, channel;
8-
97
module.exports.process = processAction;
108

119
/**
@@ -18,30 +16,38 @@ function processAction(msg, cfg) {
1816
console.log('Trigger started, cfg=%j', cfg);
1917
const amqpURI = cfg.amqpURI;
2018
const amqpExchange = cfg.topic;
21-
19+
const queueName = `eio_consumer_${process.env.ELASTICIO_TASK_ID}_${process.env.ELASTICIO_USER_ID}`;
20+
const keys = cfg.bindingKeys.split(',').map((s)=>s.trim());
21+
const consumer = (msg) => {
22+
console.log('consuming message %s in generator', JSON.stringify(msg.content.toString()));
23+
};
2224
co(function*() {
23-
if (!conn) {
24-
console.log('Connecting to amqpURI=%s', amqpURI);
25-
conn = yield amqp.connect(amqpURI);
26-
}
27-
if (!channel) {
28-
console.log('Creating a confirm channel');
29-
channel = yield conn.createConfirmChannel();
30-
console.log('Asserting topic exchange exchange=%s', amqpExchange);
31-
yield channel.assertExchange(amqpExchange, 'topic');
25+
console.log('Connecting to amqpURI=%s', amqpURI);
26+
const conn = yield amqp.connect(amqpURI);
27+
28+
console.log('Creating a receiver channel');
29+
const channel = yield conn.createChannel();
30+
31+
console.log('Asserting topic exchange exchange=%s', amqpExchange);
32+
yield channel.assertExchange(amqpExchange, 'topic');
33+
34+
console.log('Asserting queue');
35+
yield channel.assertQueue(queueName, {
36+
exclusive: false,
37+
durable: false,
38+
autoDelete: true
39+
});
40+
41+
for(key of keys) {
42+
console.log(`Binding queue to exchange queue=${queueName} exchange=${amqpExchange} bindingKey=${key}`);
43+
yield channel.bindQueue(queueName, amqpExchange, key);
3244
}
33-
console.log('Publishing message id=%s', msg.id);
34-
let encryptedData = encryptor.encryptMessageContent({
35-
id: msg.id,
36-
body: msg.body,
37-
attachments: msg.attachments
45+
46+
console.log('Start consuming');
47+
yield channel.consume(queueName, consumer, {
48+
noAck: true, // We can't really assert if message was consumed if we emit it yet
49+
consumerTag: `consumer_${process.env.ELASTICIO_EXEC_ID}_${process.env.ELASTICIO_TASK_ID}`
3850
});
39-
channel.publish(amqpExchange, 'foo', encryptedData);
40-
console.log('Message published id=%s', msg.id);
41-
yield channel.waitForConfirms();
42-
console.log('Message publishing confirmed id=%s', msg.id);
43-
this.emit('data', msg);
44-
this.emit('end');
4551
}.bind(this)).catch(err => {
4652
console.log('Error occurred', err.stack || err);
4753
this.emit('error', err);

0 commit comments

Comments
 (0)