@@ -19,6 +19,8 @@ let rowCount = 0;
19
19
let ax ;
20
20
let putUrl ;
21
21
22
+ let readyFlag = false ;
23
+
22
24
exports . init = async function init ( cfg ) {
23
25
const delimiter = cfg . writer . separator || ',' ;
24
26
const header = cfg . includeHeaders !== 'No' ;
@@ -45,17 +47,25 @@ exports.init = async function init(cfg) {
45
47
console . log ( 'CSV file to be uploaded file to uri=%s' , putUrl ) ;
46
48
ax = axios . create ( ) ;
47
49
util . addRetryCountInterceptorToAxios ( ax ) ;
50
+ readyFlag = true ;
48
51
} ;
49
52
50
53
exports . process = async function ProcessAction ( msg , cfg ) {
51
54
// eslint-disable-next-line consistent-this
52
55
const self = this ;
53
56
57
+ while ( ! readyFlag ) {
58
+ // eslint-disable-next-line no-loop-func,no-await-in-loop
59
+ await new Promise ( resolve => timeout ( resolve , 100 ) ) ;
60
+ }
61
+
54
62
if ( timeout ) {
55
63
clearTimeout ( timeout ) ;
56
64
}
57
65
58
66
timeout = setTimeout ( async ( ) => {
67
+ readyFlag = false ;
68
+
59
69
console . log ( 'Closing the stream due to inactivity' ) ;
60
70
61
71
const finalRowCount = rowCount ;
@@ -68,6 +78,7 @@ exports.process = async function ProcessAction(msg, cfg) {
68
78
maxContentLength : REQUEST_MAX_CONTENT_LENGTH ,
69
79
} ) ;
70
80
stringifier . end ( ) ;
81
+
71
82
const messageToEmit = messages . newMessageWithBody ( {
72
83
rowCount : finalRowCount ,
73
84
} ) ;
@@ -80,7 +91,6 @@ exports.process = async function ProcessAction(msg, cfg) {
80
91
rowCount = 0 ;
81
92
console . log ( 'Emitting message %j' , messageToEmit ) ;
82
93
await self . emit ( 'data' , messageToEmit ) ;
83
- await exports . init ( cfg ) ;
84
94
} , TIMEOUT_BETWEEN_EVENTS ) ;
85
95
86
96
let row = msg . body . writer ;
0 commit comments