1
1
const request = require ( 'request-promise' ) ;
2
2
const co = require ( 'co' ) ;
3
3
const csv = require ( 'csv' ) ;
4
+ const _ = require ( 'lodash' ) ;
4
5
const messages = require ( 'elasticio-node' ) . messages ;
5
6
const client = require ( 'elasticio-rest-node' ) ( ) ;
6
7
@@ -12,12 +13,22 @@ let rowCount = 0;
12
13
exports . init = init ;
13
14
exports . process = processAction ;
14
15
15
- function init ( ) {
16
+ function init ( cfg ) {
16
17
return co ( function * gen ( ) {
17
18
18
- stringifier = csv . stringify ( {
19
- header : true
20
- } ) ;
19
+ const delimiter = cfg . reader . delimiter || ',' ;
20
+ console . log ( 'Using delimiter: \'%s\'' , delimiter ) ;
21
+ const options = {
22
+ header : true ,
23
+ delimiter
24
+ } ;
25
+
26
+ if ( cfg . reader . columns ) {
27
+ console . log ( 'Configured columns:' , cfg . reader . columns ) ;
28
+ options . columns = cfg . reader . columns ;
29
+ }
30
+
31
+ stringifier = csv . stringify ( options ) ;
21
32
22
33
signedUrl = yield client . resources . storage . createSignedUrl ( ) ;
23
34
@@ -28,7 +39,7 @@ function init() {
28
39
} ) ;
29
40
}
30
41
31
- function processAction ( msg ) {
42
+ function processAction ( msg , cfg ) {
32
43
const self = this ;
33
44
34
45
if ( timeout ) {
@@ -59,7 +70,9 @@ function processAction(msg) {
59
70
} ) ;
60
71
} , 10000 ) ;
61
72
62
- stringifier . write ( msg . body ) ;
73
+ const columns = cfg . reader . columns ;
74
+ const row = columns ? _ . pick ( msg . body , columns ) : msg . body ;
75
+ stringifier . write ( row ) ;
63
76
rowCount ++ ;
64
77
this . emit ( 'end' ) ;
65
78
}
0 commit comments