1
1
const { messages } = require ( 'elasticio-node' ) ;
2
- const co = require ( 'co' ) ;
3
2
const amqp = require ( 'amqplib' ) ;
4
3
const logger = require ( '@elastic.io/component-logger' ) ( ) ;
4
+
5
5
const encryptor = require ( '../encryptor.js' ) ;
6
6
7
7
let channel ;
@@ -13,35 +13,34 @@ let listening;
13
13
*
14
14
* @param cfg
15
15
*/
16
- function init ( cfg ) {
17
- logger . info ( 'Starting initialization, queueName=%s' , queueName ) ;
16
+ async function init ( cfg ) {
17
+ logger . info ( 'Starting initialization...' ) ;
18
18
const { amqpURI } = cfg ;
19
19
const amqpExchange = cfg . topic ;
20
20
const keys = ( cfg . bindingKeys || '#' ) . split ( ',' ) . map ( ( s ) => s . trim ( ) ) ;
21
- return co ( function * initialize ( ) {
22
- logger . debug ( 'Connecting to amqpURI=%s' , amqpURI ) ;
23
- const conn = yield amqp . connect ( amqpURI ) ;
24
-
25
- logger . debug ( 'Creating a receiver channel' ) ;
26
- channel = yield conn . createChannel ( ) ;
21
+ logger . debug ( 'Connecting to amqpURI...' ) ;
22
+ const conn = await amqp . connect ( amqpURI ) ;
27
23
28
- logger . debug ( 'Asserting topic exchange exchange=%s' , amqpExchange ) ;
29
- yield channel . assertExchange ( amqpExchange , 'topic' ) ;
24
+ logger . debug ( 'Creating a receiver channel' ) ;
25
+ channel = await conn . createChannel ( ) ;
30
26
31
- logger . debug ( 'Asserting queue' ) ;
32
- yield channel . assertQueue ( queueName , {
33
- exclusive : false ,
34
- durable : false ,
35
- autoDelete : true ,
36
- } ) ;
27
+ logger . debug ( 'Asserting topic exchange exchange...' ) ;
28
+ await channel . assertExchange ( amqpExchange , 'topic' ) ;
37
29
38
- // eslint-disable-next-line no-restricted-syntax
39
- for ( const key of keys ) {
40
- logger . debug ( `Binding queue to exchange queue=${ queueName } exchange=${ amqpExchange } bindingKey=${ key } ` ) ;
41
- yield channel . bindQueue ( queueName , amqpExchange , key ) ;
42
- }
43
- logger . info ( 'Initialization completed' ) ;
30
+ logger . debug ( 'Asserting queue' ) ;
31
+ await channel . assertQueue ( queueName , {
32
+ exclusive : false ,
33
+ durable : false ,
34
+ autoDelete : true ,
44
35
} ) ;
36
+
37
+ // eslint-disable-next-line no-restricted-syntax
38
+ for ( const key of keys ) {
39
+ logger . debug ( 'Binding queue to exchange...' ) ;
40
+ // eslint-disable-next-line no-await-in-loop
41
+ await channel . bindQueue ( queueName , amqpExchange , key ) ;
42
+ }
43
+ logger . info ( 'Initialization completed' ) ;
45
44
}
46
45
47
46
/**
@@ -50,8 +49,8 @@ function init(cfg) {
50
49
* @param msg incoming message object that contains ``body`` with payload
51
50
* @param cfg configuration that is account information and configuration field values
52
51
*/
53
- // eslint-disable-next-line no-unused-vars
54
- function processAction ( msg , cfg ) {
52
+ // eslint-disable-next-line no-unused-vars,consistent-return
53
+ async function processAction ( msg , cfg ) {
55
54
const self = this ;
56
55
self . logger . info ( 'Trigger started' ) ;
57
56
if ( listening ) {
@@ -60,23 +59,21 @@ function processAction(msg, cfg) {
60
59
}
61
60
// eslint-disable-next-line no-shadow
62
61
const consumer = ( msg ) => {
63
- self . logger . debug ( 'Have got message fields=%j properties=%j' , msg . fields , msg . properties ) ;
62
+ self . logger . debug ( 'New message got' ) ;
64
63
const decrypted = encryptor . decryptMessageContent ( self , msg . content ) ;
65
- self . logger . debug ( 'Decrypted message=%j' , decrypted ) ;
64
+ self . logger . debug ( 'Message decrypted' ) ;
66
65
const newMsg = messages . newMessageWithBody ( decrypted . body || { } ) ;
67
66
newMsg . id = msg . properties . messageId ;
68
67
newMsg . attachments = decrypted . attachments || { } ;
69
68
self . emit ( 'data' , newMsg ) ;
70
69
} ;
71
- return co ( function * consume ( ) {
72
- self . logger . info ( 'Starting consuming from %s' , queueName ) ;
73
- yield channel . consume ( queueName , consumer , {
74
- noAck : true , // We can't really assert if message was consumed if we emit it yet
75
- consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } ` ,
76
- } ) ;
77
- self . logger . info ( 'Consumption started' ) ;
78
- listening = true ;
70
+ self . logger . info ( 'Starting consuming from %s' , queueName ) ;
71
+ await channel . consume ( queueName , consumer , {
72
+ noAck : true , // We can't really assert if message was consumed if we emit it yet
73
+ consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } ` ,
79
74
} ) ;
75
+ self . logger . info ( 'Consumption started' ) ;
76
+ listening = true ;
80
77
}
81
78
82
79
module . exports . process = processAction ;
0 commit comments