1
- 'use strict' ;
2
- const eioUtils = require ( 'elasticio-node' ) . messages ;
1
+ const { messages } = require ( 'elasticio-node' ) ;
3
2
const co = require ( 'co' ) ;
4
3
const amqp = require ( 'amqplib' ) ;
4
+ const logger = require ( '@elastic.io/component-logger' ) ( ) ;
5
5
const encryptor = require ( '../encryptor.js' ) ;
6
- const debug = require ( 'debug' ) ( 'consumer' ) ;
7
6
8
7
let channel ;
9
8
const queueName = `eio_consumer_${ process . env . ELASTICIO_FLOW_ID } _${ process . env . ELASTICIO_USER_ID } ` ;
@@ -15,33 +14,34 @@ let listening;
15
14
* @param cfg
16
15
*/
17
16
function init ( cfg ) {
18
- console . log ( 'Starting initialization, queueName=%s' , queueName ) ;
19
- const amqpURI = cfg . amqpURI ;
20
- const amqpExchange = cfg . topic ;
21
- const keys = ( cfg . bindingKeys || '#' ) . split ( ',' ) . map ( ( s ) => s . trim ( ) ) ;
22
- return co ( function * initialize ( ) {
23
- debug ( 'Connecting to amqpURI=%s' , amqpURI ) ;
24
- const conn = yield amqp . connect ( amqpURI ) ;
17
+ logger . info ( 'Starting initialization, queueName=%s' , queueName ) ;
18
+ const { amqpURI } = cfg ;
19
+ const amqpExchange = cfg . topic ;
20
+ const keys = ( cfg . bindingKeys || '#' ) . split ( ',' ) . map ( ( s ) => s . trim ( ) ) ;
21
+ return co ( function * initialize ( ) {
22
+ logger . debug ( 'Connecting to amqpURI=%s' , amqpURI ) ;
23
+ const conn = yield amqp . connect ( amqpURI ) ;
25
24
26
- debug ( 'Creating a receiver channel' ) ;
27
- channel = yield conn . createChannel ( ) ;
25
+ logger . debug ( 'Creating a receiver channel' ) ;
26
+ channel = yield conn . createChannel ( ) ;
28
27
29
- debug ( 'Asserting topic exchange exchange=%s' , amqpExchange ) ;
30
- yield channel . assertExchange ( amqpExchange , 'topic' ) ;
28
+ logger . debug ( 'Asserting topic exchange exchange=%s' , amqpExchange ) ;
29
+ yield channel . assertExchange ( amqpExchange , 'topic' ) ;
31
30
32
- debug ( 'Asserting queue' ) ;
33
- yield channel . assertQueue ( queueName , {
34
- exclusive : false ,
35
- durable : false ,
36
- autoDelete : true
37
- } ) ;
38
-
39
- for ( let key of keys ) {
40
- debug ( `Binding queue to exchange queue=${ queueName } exchange=${ amqpExchange } bindingKey=${ key } ` ) ;
41
- yield channel . bindQueue ( queueName , amqpExchange , key ) ;
42
- }
43
- console . log ( 'Initialization completed' ) ;
31
+ logger . debug ( 'Asserting queue' ) ;
32
+ yield channel . assertQueue ( queueName , {
33
+ exclusive : false ,
34
+ durable : false ,
35
+ autoDelete : true ,
44
36
} ) ;
37
+
38
+ // eslint-disable-next-line no-restricted-syntax
39
+ for ( const key of keys ) {
40
+ logger . debug ( `Binding queue to exchange queue=${ queueName } exchange=${ amqpExchange } bindingKey=${ key } ` ) ;
41
+ yield channel . bindQueue ( queueName , amqpExchange , key ) ;
42
+ }
43
+ logger . info ( 'Initialization completed' ) ;
44
+ } ) ;
45
45
}
46
46
47
47
/**
@@ -50,31 +50,34 @@ function init(cfg) {
50
50
* @param msg incoming message object that contains ``body`` with payload
51
51
* @param cfg configuration that is account information and configuration field values
52
52
*/
53
+ // eslint-disable-next-line no-unused-vars
53
54
function processAction ( msg , cfg ) {
54
- console . log ( 'Trigger started' ) ;
55
- if ( listening ) {
56
- console . log ( 'Trigger was called again, we will ignore this run' ) ;
57
- return Promise . resolve ( ) ;
58
- }
59
- const consumer = ( msg ) => {
60
- debug ( 'Have got message fields=%j properties=%j' , msg . fields , msg . properties ) ;
61
- const decrypted = encryptor . decryptMessageContent ( msg . content ) ;
62
- debug ( 'Decrypted message=%j' , decrypted ) ;
63
- const newMsg = eioUtils . newMessageWithBody ( decrypted . body || { } ) ;
64
- newMsg . id = msg . properties . messageId ;
65
- newMsg . attachments = decrypted . attachments || { } ;
66
- this . emit ( 'data' , newMsg ) ;
67
- } ;
68
- return co ( function * consume ( ) {
69
- console . log ( 'Starting consuming from %s' , queueName ) ;
70
- yield channel . consume ( queueName , consumer , {
71
- noAck : true , // We can't really assert if message was consumed if we emit it yet
72
- consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } `
73
- } ) ;
74
- console . log ( 'Consumption started' ) ;
75
- listening = true ;
55
+ const self = this ;
56
+ self . logger . info ( 'Trigger started' ) ;
57
+ if ( listening ) {
58
+ self . logger . info ( 'Trigger was called again, we will ignore this run' ) ;
59
+ return Promise . resolve ( ) ;
60
+ }
61
+ // eslint-disable-next-line no-shadow
62
+ const consumer = ( msg ) => {
63
+ self . logger . debug ( 'Have got message fields=%j properties=%j' , msg . fields , msg . properties ) ;
64
+ const decrypted = encryptor . decryptMessageContent ( self , msg . content ) ;
65
+ self . logger . debug ( 'Decrypted message=%j' , decrypted ) ;
66
+ const newMsg = messages . newMessageWithBody ( decrypted . body || { } ) ;
67
+ newMsg . id = msg . properties . messageId ;
68
+ newMsg . attachments = decrypted . attachments || { } ;
69
+ self . emit ( 'data' , newMsg ) ;
70
+ } ;
71
+ return co ( function * consume ( ) {
72
+ self . logger . info ( 'Starting consuming from %s' , queueName ) ;
73
+ yield channel . consume ( queueName , consumer , {
74
+ noAck : true , // We can't really assert if message was consumed if we emit it yet
75
+ consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } ` ,
76
76
} ) ;
77
+ self . logger . info ( 'Consumption started' ) ;
78
+ listening = true ;
79
+ } ) ;
77
80
}
78
81
79
82
module . exports . process = processAction ;
80
- module . exports . init = init ;
83
+ module . exports . init = init ;
0 commit comments