1
1
var _ = require ( 'underscore' ) ;
2
2
var util = require ( 'util' ) ;
3
3
var csv = require ( 'csv' ) ;
4
- var messages = require ( '../messages.js' ) ;
4
+ var messages = require ( 'elasticio-node' ) . messages ;
5
5
var moment = require ( 'moment' ) ;
6
- var s3client = require ( 's3' ) . getClient ( process . env . S3_BUCKET ) ;
7
- var HeartBeatStream = require ( '../heartbeat-stream.js' ) . HeartBeatStream ;
6
+ var debug = require ( 'debug' ) ( 'csv' ) ;
7
+ var request = require ( 'request' ) ;
8
8
9
- exports . process = function ( msg , cfg ) {
10
- var attachments = msg . attachments , that = this ;
9
+ function processRead ( msg , cfg ) {
10
+ var csvURL = cfg . url ;
11
+ var that = this ;
12
+ var index = 0 ;
13
+ var separator = cfg . reader ? cfg . reader . separator || "," : "," ;
14
+ var startRow = cfg . reader ? cfg . reader . startRow || 0 : 0 ;
11
15
12
- if ( ! attachments || Object . keys ( attachments ) . length == 0 ) {
13
- this . info ( 'No attachments found in incoming message, CSV Read component will terminate this flow' ) ;
14
- return this . emit ( 'end' ) ;
15
- }
16
- var csvs = Object . keys ( attachments ) . filter ( function ( name ) {
17
- var att = attachments [ name ] ;
18
- var type = att [ "content-type" ] ;
19
16
20
- if ( ! att . s3 ) {
21
- console . log ( "Only S3 attachments supported yet" ) ;
22
- return false ;
17
+ if ( ! csvURL || csvURL . length == 0 ) {
18
+ console . error ( 'URL of the CSV is missing' ) ;
19
+ that . emit ( 'error' , 'URL of the CSV is missing' ) ;
20
+ return that . emit ( 'end' ) ;
21
+ }
22
+ var parser = csv . parse ( { delimiter : separator } ) ;
23
+
24
+ parser . on ( 'readable' , function ( ) {
25
+ while ( record = parser . read ( ) ) {
26
+ debug ( 'Have got a row #%s' , index ) ;
27
+ if ( index >= startRow ) {
28
+ var msg = createRowMessage ( record , cfg . reader . columns ) ;
29
+ that . emit ( 'data' , msg ) ;
30
+ } else {
31
+ debug ( 'Row #%s is skipped based on configuration' , index ) ;
32
+ }
33
+ index ++ ;
23
34
}
24
-
25
- return true ;
26
35
} ) ;
27
36
28
- if ( csvs . length === 0 ) {
29
- this . info ( 'Can not find suitable attachments to parse' ) ;
30
- return this . emit ( 'end' ) ;
31
- }
32
-
33
- this . debug ( 'Will parse following CSV files [%s]' , csvs . join ( ',' ) ) ;
34
-
35
- csvs . forEach ( function ( name ) {
36
- parseAttachment ( that , s3client . getEncrypted ( attachments [ name ] . s3 ) , cfg ) ;
37
+ parser . on ( 'finish' , function fireEnd ( ) {
38
+ debug ( 'Number of lines: ' + index ) ;
39
+ that . emit ( 'end' ) ;
37
40
} ) ;
38
- } ;
39
-
40
- var parseAttachment = function ( that , attachmentPromise , cfg ) {
41
-
42
- var separator = cfg . reader . separator || "," ;
43
41
44
- var startRow = cfg . reader . startRow || 0 ;
45
-
46
- attachmentPromise
47
- . then ( function ( res ) {
48
-
49
- new HeartBeatStream ( )
50
- . on ( 'heartbeat' , function ( ) {
51
- that . emit ( 'heartbeat' ) ;
52
- } ) . start ( res ) ;
53
-
54
- csv ( ) . from ( res , { delimiter : separator } )
55
- . on ( 'record' , function ( row , index ) {
56
-
57
- if ( index >= startRow ) {
58
-
59
- var msg = createRowMessage ( row , cfg . reader . columns ) ;
60
-
61
- that . emit ( 'data' , msg ) ;
62
- }
63
- } )
64
- . on ( 'end' , function ( count ) {
65
- that . debug ( 'Number of lines: ' + count ) ;
42
+ parser . on ( 'error' , function emitError ( error ) {
43
+ debug ( 'Error reported by CSV read' , error ) ;
44
+ that . emit ( 'error' , error ) ;
45
+ } ) ;
66
46
67
- that . emit ( 'end' ) ;
68
- } )
69
- . on ( 'error' , function ( error ) {
70
- console . log ( error . stack ) ;
71
- } ) ;
47
+ debug ( 'Sending GET request to url=%s' , csvURL ) ;
48
+ request . get ( csvURL )
49
+ . on ( 'response' , function ( response ) {
50
+ debug ( 'Have got response status=%s headers=%j' , response . statusCode , response . headers ) ;
51
+ if ( response . statusCode != 200 ) {
52
+ that . emit ( 'error' , 'Unexpected response code code=' + response . statusCode ) ;
53
+ throw Error ( 'Unexpected response code code=' + response . statusCode ) ;
54
+ }
72
55
} )
73
- . fail ( function ( e ) {
74
- console . log ( e ) ;
75
- } )
76
- . done ( ) ;
77
- } ;
56
+ . pipe ( parser ) ;
57
+ }
78
58
79
59
var createRowMessage = function ( row , columns ) {
80
60
81
61
var body = { } ;
82
62
83
- _ . each ( row , function ( value , index ) {
63
+ _ . each ( row , function ( value , index ) {
84
64
var col = columns [ index ] ;
85
65
86
66
if ( col ) {
@@ -96,21 +76,21 @@ var createRowMessage = function (row, columns) {
96
76
var formatValue = function ( value , col ) {
97
77
var type = col . type || "string" ;
98
78
99
- var formatter = formatters [ type ] || function ( value , col ) {
100
- return value ;
101
- } ;
79
+ var formatter = formatters [ type ] || function ( value , col ) {
80
+ return value ;
81
+ } ;
102
82
103
83
return formatter ( value , col ) ;
104
84
} ;
105
85
106
86
var formatters = { } ;
107
87
108
- formatters [ "date" ] = function ( value , col ) {
88
+ formatters [ "date" ] = function ( value , col ) {
109
89
110
90
return moment ( value , col . format ) . toDate ( ) ;
111
91
} ;
112
92
113
- formatters [ "number" ] = function ( value , col ) {
93
+ formatters [ "number" ] = function ( value , col ) {
114
94
115
95
if ( col . format === "dec_comma" ) {
116
96
//123.456.78,9 => 12345678.9
@@ -125,3 +105,4 @@ formatters["number"] = function(value, col) {
125
105
return parseFloat ( value ) ;
126
106
} ;
127
107
108
+ exports . process = processRead ;
0 commit comments