@@ -5,40 +5,26 @@ const amqp = require('amqplib');
5
5
const encryptor = require ( '../encryptor.js' ) ;
6
6
const debug = require ( 'debug' ) ( 'consumer' ) ;
7
7
8
+ let channel ;
9
+ const queueName = `eio_consumer_${ process . env . ELASTICIO_FLOW_ID } _${ process . env . ELASTICIO_USER_ID } ` ;
10
+ let listening ;
11
+
8
12
/**
9
13
* This function will be called on component before the first message will be processed
10
14
*
11
15
* @param cfg
12
16
*/
13
17
function init ( cfg ) {
14
- console . log ( 'Starting initialization, cfg=%j' , cfg ) ;
18
+ console . log ( 'Starting initialization, cfg=%j queueName=%s ' , cfg , queueName ) ;
15
19
const amqpURI = cfg . amqpURI ;
16
20
const amqpExchange = cfg . topic ;
17
- const queueName = `eio_consumer_${ process . env . ELASTICIO_FLOW_ID } _${ process . env . ELASTICIO_USER_ID } ` ;
18
21
const keys = ( cfg . bindingKeys || '#' ) . split ( ',' ) . map ( ( s ) => s . trim ( ) ) ;
19
-
20
- /**
21
- * Consumer function that will be called when new message
22
- * is received
23
- *
24
- * @param msg
25
- */
26
- const consumer = ( msg ) => {
27
- debug ( 'Have got message fields=%j properties=%j' , msg . fields , msg . properties ) ;
28
- const decrypted = encryptor . decryptMessageContent ( msg . content ) ;
29
- debug ( 'Decrypted message=%j' , decrypted ) ;
30
- const newMsg = eioUtils . newMessageWithBody ( decrypted . body || { } ) ;
31
- newMsg . id = msg . properties . messageId ;
32
- newMsg . attachments = decrypted . attachments || { } ;
33
- this . emit ( 'data' , newMsg ) ;
34
- } ;
35
-
36
22
return co ( function * initialize ( ) {
37
23
debug ( 'Connecting to amqpURI=%s' , amqpURI ) ;
38
24
const conn = yield amqp . connect ( amqpURI ) ;
39
25
40
26
debug ( 'Creating a receiver channel' ) ;
41
- const channel = yield conn . createChannel ( ) ;
27
+ channel = yield conn . createChannel ( ) ;
42
28
43
29
debug ( 'Asserting topic exchange exchange=%s' , amqpExchange ) ;
44
30
yield channel . assertExchange ( amqpExchange , 'topic' ) ;
@@ -55,13 +41,6 @@ function init(cfg) {
55
41
yield channel . bindQueue ( queueName , amqpExchange , key ) ;
56
42
}
57
43
console . log ( 'Initialization completed' ) ;
58
-
59
- console . log ( 'Starting consuming from queue=%s' , queueName ) ;
60
- yield channel . consume ( queueName , consumer , {
61
- noAck : true , // We can't really assert if message was consumed if we emit it yet
62
- consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } `
63
- } ) ;
64
- console . log ( 'Consumption started' ) ;
65
44
} ) ;
66
45
}
67
46
@@ -72,8 +51,29 @@ function init(cfg) {
72
51
* @param cfg configuration that is account information and configuration field values
73
52
*/
74
53
function processAction ( msg , cfg ) {
75
- console . log ( 'Trigger started but we don not need to do anything here' ) ;
76
- return Promise . resolve ( ) ;
54
+ console . log ( 'Trigger started, cfg=%j' , cfg ) ;
55
+ if ( listening ) {
56
+ console . log ( 'Trigger was called again, we will ignore this run' ) ;
57
+ return Promise . resolve ( ) ;
58
+ }
59
+ const consumer = ( msg ) => {
60
+ debug ( 'Have got message fields=%j properties=%j' , msg . fields , msg . properties ) ;
61
+ const decrypted = encryptor . decryptMessageContent ( msg . content ) ;
62
+ debug ( 'Decrypted message=%j' , decrypted ) ;
63
+ const newMsg = eioUtils . newMessageWithBody ( decrypted . body || { } ) ;
64
+ newMsg . id = msg . properties . messageId ;
65
+ newMsg . attachments = decrypted . attachments || { } ;
66
+ this . emit ( 'data' , newMsg ) ;
67
+ } ;
68
+ return co ( function * consume ( ) {
69
+ console . log ( 'Starting consuming from %s' , queueName ) ;
70
+ yield channel . consume ( queueName , consumer , {
71
+ noAck : true , // We can't really assert if message was consumed if we emit it yet
72
+ consumerTag : `consumer_${ process . env . ELASTICIO_EXEC_ID } _${ process . env . ELASTICIO_FLOW_ID } `
73
+ } ) ;
74
+ console . log ( 'Consumption started' ) ;
75
+ listening = true ;
76
+ } ) ;
77
77
}
78
78
79
79
module . exports . process = processAction ;
0 commit comments