Skip to content

Commit 686cf85

Browse files
committed
MIgrated to new sailor, improved publishing
1 parent b040363 commit 686cf85

File tree

2 files changed

+25
-27
lines changed

2 files changed

+25
-27
lines changed

lib/actions/publish.js

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,29 @@ const co = require('co');
33
const amqp = require('amqplib');
44
const encryptor = require('../encryptor.js');
55

6-
var conn, channel;
6+
let channel;
77

88
module.exports.process = processAction;
9+
module.exports.init = init;
10+
11+
/**
12+
* This methdo will be called from elastic.io platform on initialization
13+
*
14+
* @param cfg
15+
*/
16+
function init(cfg) {
17+
console.log('Starting initialization, cfg=%j', cfg);
18+
const amqpURI = cfg.amqpURI;
19+
const amqpExchange = cfg.topic;
20+
return co(function* gen() {
21+
console.log('Connecting to amqpURI=%s', amqpURI);
22+
const conn = yield amqp.connect(amqpURI);
23+
console.log('Creating a confirm channel');
24+
channel = yield conn.createConfirmChannel();
25+
console.log('Asserting topic exchange exchange=%s', amqpExchange);
26+
yield channel.assertExchange(amqpExchange, 'topic');
27+
});
28+
}
929

1030
/**
1131
* This method will be called from elastic.io platform providing following data
@@ -14,42 +34,20 @@ module.exports.process = processAction;
1434
* @param cfg configuration that is account information and configuration field values
1535
*/
1636
function processAction(msg, cfg) {
17-
console.log('Action started');
18-
const amqpURI = cfg.amqpURI;
1937
const amqpExchange = cfg.topic;
20-
co(function*() {
21-
if (!conn) {
22-
console.log('Connecting to amqpURI=%s', amqpURI);
23-
conn = yield amqp.connect(amqpURI);
24-
}
25-
if (!channel) {
26-
console.log('Creating a confirm channel');
27-
channel = yield conn.createConfirmChannel();
28-
console.log('Asserting topic exchange exchange=%s', amqpExchange);
29-
yield channel.assertExchange(amqpExchange, 'topic');
30-
}
38+
return co(function* sendMessage() {
3139
console.log('Publishing message id=%s', msg.id);
3240
let encryptedData = encryptor.encryptMessageContent({
3341
body: msg.body.payload || msg.body,
3442
attachments: msg.attachments
3543
});
36-
console.log(encryptedData, encryptor.decryptMessageContent(encryptedData));
37-
channel.publish(amqpExchange, 'foo', encryptedData, {
44+
channel.publish(amqpExchange, msg.body.routingKey || '', encryptedData, {
3845
contentType: "application/octet-stream",
3946
messageId: msg.id
4047
});
4148
console.log('Message published id=%s', msg.id);
4249
yield channel.waitForConfirms();
4350
console.log('Message publishing confirmed id=%s', msg.id);
4451
this.emit('data', msg);
45-
this.emit('end');
46-
console.log('Closing the channel');
47-
yield channel.close();
48-
console.log('Closing the connnection');
49-
yield conn.close();
50-
}.bind(this)).catch(err => {
51-
console.log('Error occurred', err.stack || err);
52-
this.emit('error', err);
53-
this.emit('end');
54-
});
52+
}.bind(this));
5553
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"co": "^4.6.0",
2828
"debug": "^2.4.4",
2929
"elasticio-node": "0.0.5",
30-
"elasticio-sailor-nodejs": "1.3.0",
30+
"elasticio-sailor-nodejs": "2.0.0",
3131
"request": "^2.75.0",
3232
"request-promise": "^4.1.1"
3333
},

0 commit comments

Comments
 (0)