1
- 'use strict' ;
2
-
3
1
const axios = require ( 'axios' ) ;
4
- const co = require ( 'co' ) ;
5
2
const csv = require ( 'csv' ) ;
6
3
const _ = require ( 'lodash' ) ;
7
- const messages = require ( 'elasticio-node' ) . messages ;
4
+ const { messages } = require ( 'elasticio-node' ) ;
8
5
const client = require ( 'elasticio-rest-node' ) ( ) ;
9
6
10
7
const util = require ( '../util/util' ) ;
@@ -21,79 +18,86 @@ let timeout;
21
18
let rowCount = 0 ;
22
19
let ax ;
23
20
let putUrl ;
21
+
22
+ let readyFlag = false ;
23
+
24
24
exports . init = async function init ( cfg ) {
25
- return co ( function * gen ( ) {
26
-
27
- const delimiter = cfg . writer . separator || ',' ;
28
- const header = cfg . includeHeaders !== 'No' ;
29
- console . log ( 'Using delimiter: \'%s\'' , delimiter ) ;
30
- const options = {
31
- header,
32
- delimiter
33
- } ;
34
-
35
- if ( cfg . writer . columns ) {
36
-
37
- const columns = Object . keys ( _ . keyBy ( cfg . writer . columns , 'property' ) ) ;
38
-
39
- console . log ( 'Configured column names:' , columns ) ;
40
- options . columns = columns ;
41
- }
42
-
43
- stringifier = csv . stringify ( options ) ;
44
- signedUrl = yield client . resources . storage . createSignedUrl ( ) ;
45
- putUrl = signedUrl . put_url ;
46
- console . log ( 'CSV file to be uploaded file to uri=%s' , putUrl ) ;
47
- ax = axios . create ( ) ;
48
- util . addRetryCountInterceptorToAxios ( ax ) ;
49
- } ) ;
25
+ const delimiter = cfg . writer . separator || ',' ;
26
+ const header = cfg . includeHeaders !== 'No' ;
27
+ console . log ( 'Using delimiter: \'%s\'' , delimiter ) ;
28
+ const options = {
29
+ header,
30
+ delimiter,
31
+ } ;
32
+
33
+ if ( cfg . writer . columns ) {
34
+ const columns = Object . keys ( _ . keyBy ( cfg . writer . columns , 'property' ) ) ;
35
+
36
+ console . log ( 'Configured column names:' , columns ) ;
37
+ options . columns = columns ;
38
+ }
39
+
40
+ stringifier = csv . stringify ( options ) ;
41
+ signedUrl = await client . resources . storage . createSignedUrl ( ) ;
42
+ putUrl = signedUrl . put_url ;
43
+ console . log ( 'CSV file to be uploaded file to uri=%s' , putUrl ) ;
44
+ ax = axios . create ( ) ;
45
+ util . addRetryCountInterceptorToAxios ( ax ) ;
46
+ readyFlag = true ;
50
47
} ;
51
48
52
49
exports . process = async function ProcessAction ( msg , cfg ) {
53
- // eslint-disable-next-line consistent-this
54
- const self = this ;
55
-
56
- if ( timeout ) {
57
- clearTimeout ( timeout ) ;
58
- }
59
-
60
-
61
- timeout = setTimeout ( ( ) => {
62
- console . log ( 'Closing the stream due to inactivity' ) ;
63
- co ( function * gen ( ) {
64
- const finalRowCount = rowCount ;
65
- console . log ( 'The resulting CSV file contains %s rows' , finalRowCount ) ;
66
- ax . put ( putUrl , stringifier , {
67
- method : 'PUT' ,
68
- timeout : REQUEST_TIMEOUT ,
69
- retry : REQUEST_MAX_RETRY ,
70
- delay : REQUEST_RETRY_DELAY ,
71
- maxContentLength : REQUEST_MAX_CONTENT_LENGTH
72
- } ) ;
73
- stringifier . end ( ) ;
74
- const messageToEmit = messages . newMessageWithBody ( {
75
- rowCount : finalRowCount
76
- } ) ;
77
- const fileName = messageToEmit . id + '.csv' ;
78
- messageToEmit . attachments [ fileName ] = {
79
- 'content-type' : 'text/csv' ,
80
- 'url' : signedUrl . get_url
81
- } ;
82
- signedUrl = null ;
83
- rowCount = 0 ;
84
- console . log ( 'Emitting message %j' , messageToEmit ) ;
85
- yield self . emit ( 'data' , messageToEmit ) ;
86
- yield exports . init ( cfg ) ;
87
- } ) ;
88
- } , TIMEOUT_BETWEEN_EVENTS ) ;
89
- let row = msg . body . writer ;
90
- console . log ( `Incoming data: ${ JSON . stringify ( row ) } ` ) ;
91
- if ( cfg . writer . columns ) {
92
- const columns = Object . keys ( _ . keyBy ( cfg . writer . columns , 'property' ) ) ;
93
- row = _ . pick ( row , columns ) ;
94
- }
95
- console . log ( `Writing Row: ${ JSON . stringify ( row ) } ` ) ;
96
- stringifier . write ( row ) ;
97
- rowCount ++ ;
98
- await this . emit ( 'end' ) ;
50
+ // eslint-disable-next-line consistent-this
51
+ const self = this ;
52
+
53
+ while ( ! readyFlag ) {
54
+ // eslint-disable-next-line no-loop-func,no-await-in-loop
55
+ await new Promise ( resolve => timeout ( resolve , 100 ) ) ;
56
+ }
57
+
58
+ if ( timeout ) {
59
+ clearTimeout ( timeout ) ;
60
+ }
61
+
62
+ timeout = setTimeout ( async ( ) => {
63
+ readyFlag = false ;
64
+
65
+ console . log ( 'Closing the stream due to inactivity' ) ;
66
+
67
+ const finalRowCount = rowCount ;
68
+ console . log ( 'The resulting CSV file contains %s rows' , finalRowCount ) ;
69
+ ax . put ( putUrl , stringifier , {
70
+ method : 'PUT' ,
71
+ timeout : REQUEST_TIMEOUT ,
72
+ retry : REQUEST_MAX_RETRY ,
73
+ delay : REQUEST_RETRY_DELAY ,
74
+ maxContentLength : REQUEST_MAX_CONTENT_LENGTH ,
75
+ } ) ;
76
+ stringifier . end ( ) ;
77
+
78
+ const messageToEmit = messages . newMessageWithBody ( {
79
+ rowCount : finalRowCount ,
80
+ } ) ;
81
+ const fileName = `${ messageToEmit . id } .csv` ;
82
+ messageToEmit . attachments [ fileName ] = {
83
+ 'content-type' : 'text/csv' ,
84
+ url : signedUrl . get_url ,
85
+ } ;
86
+ signedUrl = null ;
87
+ rowCount = 0 ;
88
+ console . log ( 'Emitting message %j' , messageToEmit ) ;
89
+ await self . emit ( 'data' , messageToEmit ) ;
90
+ } , TIMEOUT_BETWEEN_EVENTS ) ;
91
+
92
+ let row = msg . body . writer ;
93
+ console . log ( `Incoming data: ${ JSON . stringify ( row ) } ` ) ;
94
+ if ( cfg . writer . columns ) {
95
+ const columns = Object . keys ( _ . keyBy ( cfg . writer . columns , 'property' ) ) ;
96
+ row = _ . pick ( row , columns ) ;
97
+ }
98
+ console . log ( `Writing Row: ${ JSON . stringify ( row ) } ` ) ;
99
+ stringifier . write ( row ) ;
100
+ rowCount += 1 ;
101
+
102
+ await self . emit ( 'end' ) ;
99
103
} ;
0 commit comments