@@ -26,10 +26,20 @@ async function errHelper(text) {
26
26
await this . emit ( 'end' )
27
27
}
28
28
29
+ function sliceIntoChunks ( arr , chunkSize ) {
30
+ const res = [ ] ;
31
+ for ( let i = 0 ; i < arr . length ; i += chunkSize ) {
32
+ const chunk = arr . slice ( i , i + chunkSize ) ;
33
+ res . push ( chunk ) ;
34
+ }
35
+ return res ;
36
+ }
37
+
29
38
async function readCSV ( msg , cfg ) {
30
39
const that = this
31
- const emitAll = cfg . emitAll === true || cfg . emitAll === 'true'
32
- const { body } = msg
40
+ // const emitAll = cfg.emitAll === true || cfg.emitAll === 'true'
41
+ const { emitBehavior } = cfg ;
42
+ const { body } = msg ;
33
43
34
44
// check if url provided in msg
35
45
if ( body . url && body . url . length > 0 ) {
@@ -60,9 +70,7 @@ async function readCSV(msg, cfg) {
60
70
}
61
71
62
72
// if set "Fetch All" create object with results
63
- const outputMsg = {
64
- result : [ ] ,
65
- }
73
+ const result = [ ] ;
66
74
67
75
let dataStream
68
76
const parseStream = papa . parse ( papa . NODE_STREAM_INPUT , parseOptions )
@@ -85,12 +93,12 @@ async function readCSV(msg, cfg) {
85
93
} else {
86
94
data = arrayToObj ( chunk )
87
95
}
88
- if ( emitAll ) {
89
- outputMsg . result . push ( data )
90
- } else {
96
+ if ( emitBehavior === 'emitIndividually' ) {
91
97
parseStream . pause ( )
92
98
await that . emit ( 'data' , messages . newMessageWithBody ( data ) )
93
99
parseStream . resume ( )
100
+ } else {
101
+ result . push ( data )
94
102
}
95
103
}
96
104
}
@@ -111,10 +119,56 @@ async function readCSV(msg, cfg) {
111
119
return
112
120
}
113
121
114
- if ( emitAll ) {
115
- await this . emit ( 'data' , messages . newMessageWithBody ( outputMsg ) )
122
+ if ( emitBehavior === 'fetchAll' ) {
123
+ await this . emit ( 'data' , messages . newMessageWithBody ( { result } ) )
124
+ } else if ( emitBehavior === 'emitBatch' ) {
125
+ const chunks = sliceIntoChunks ( result , body . batchSize ) ;
126
+ // eslint-disable-next-line no-plusplus
127
+ for ( let i = 0 ; i < chunks . length ; i ++ ) {
128
+ // eslint-disable-next-line no-await-in-loop
129
+ await this . emit ( 'data' , messages . newMessageWithBody ( { result : chunks [ i ] } ) )
130
+ }
116
131
}
117
132
this . logger . info ( `Complete, memory used: ${ process . memoryUsage ( ) . heapUsed / 1024 / 1024 } Mb` )
118
133
}
119
134
120
- module . exports . process = readCSV
135
+ module . exports . process = readCSV ;
136
+ module . exports . getMetaModel = async function ( cfg ) {
137
+ const meta = {
138
+ in : {
139
+ type : 'object' ,
140
+ properties : {
141
+ url : {
142
+ type : 'string' ,
143
+ required : true ,
144
+ title : 'URL'
145
+ } ,
146
+ header : {
147
+ type : 'boolean' ,
148
+ required : false ,
149
+ title : 'Contains headers'
150
+ } ,
151
+ delimiter : {
152
+ type : 'string' ,
153
+ required : false ,
154
+ title : 'Delimiter'
155
+ } ,
156
+ dynamicTyping : {
157
+ type : 'boolean' ,
158
+ required : false ,
159
+ title : 'Convert Data types'
160
+ }
161
+ }
162
+ } ,
163
+ out : { }
164
+ } ;
165
+
166
+ if ( cfg . emitBehavior === 'emitBatch' ) {
167
+ meta . in . properties . batchSize = {
168
+ title : 'Batch Size' ,
169
+ type : 'number' ,
170
+ required : true
171
+ }
172
+ }
173
+ return meta ;
174
+ }
0 commit comments