@@ -4,14 +4,23 @@ no-param-reassign */
4
4
const { AttachmentProcessor } = require ( '@elastic.io/component-commons-library' )
5
5
const { messages } = require ( 'elasticio-node' )
6
6
const papa = require ( 'papaparse' )
7
+ const { Readable } = require ( 'stream' ) ;
8
+ const { getUserAgent } = require ( '../util' ) ;
9
+
10
+ const formStream = ( data ) => {
11
+ const stream = new Readable ( ) ;
12
+ stream . push ( data ) ;
13
+ stream . push ( null ) ;
14
+ return stream ;
15
+ }
7
16
8
17
const TIMEOUT_BETWEEN_EVENTS = process . env . TIMEOUT_BETWEEN_EVENTS || 10000 ; // 10s;
9
18
10
19
const rawData = [ ]
11
20
let timeout
12
21
13
- async function proceedData ( data , cfg ) {
14
- let csvString
22
+ async function proceedData ( data , msg , cfg ) {
23
+ let csvString ;
15
24
const delimiter = cfg . separator ? cfg . separator : ','
16
25
17
26
const unparseOptions = {
@@ -29,30 +38,31 @@ async function proceedData(data, cfg) {
29
38
return filtered
30
39
} )
31
40
return result
32
- } )
41
+ } ) ;
33
42
csvString = papa . unparse ( {
34
43
fields,
35
44
data : orderedData
36
- } , unparseOptions )
45
+ } , unparseOptions ) ;
37
46
} else {
38
- csvString = papa . unparse ( data , unparseOptions )
47
+ csvString = papa . unparse ( data , unparseOptions ) ;
39
48
}
49
+ const getAttachment = ( ) => formStream ( csvString ) ;
40
50
41
51
if ( ! cfg . uploadToAttachment ) {
42
52
await this . emit ( 'data' , messages . newMessageWithBody ( { csvString } ) )
43
53
this . logger . info ( `Complete, memory used: ${ process . memoryUsage ( ) . heapUsed / 1024 / 1024 } Mb` )
44
54
return
45
55
}
46
56
47
- const attachmentProcessor = new AttachmentProcessor ( )
48
- let uploadResult
57
+ const attachmentProcessor = new AttachmentProcessor ( getUserAgent ( ) , msg . id )
58
+ let attachmentId ;
49
59
try {
50
- uploadResult = await attachmentProcessor . uploadAttachment ( csvString , 'stream' )
60
+ attachmentId = await attachmentProcessor . uploadAttachment ( getAttachment )
51
61
} catch ( err ) {
52
62
this . logger . error ( `Upload attachment failed: ${ err } ` )
53
63
this . emit ( 'error' , `Upload attachment failed: ${ err } ` )
54
64
}
55
- const attachmentUrl = ` ${ uploadResult . config . url } ${ uploadResult . data . objectId } ?storage_type=maester` ;
65
+ const attachmentUrl = attachmentProcessor . getMaesterAttachmentUrlById ( attachmentId )
56
66
const body = {
57
67
attachmentUrl,
58
68
type : '.csv' ,
@@ -91,13 +101,13 @@ async function writeCSV(msg, cfg) {
91
101
// if not array - create array from all fn calls and send data to proceedData
92
102
if ( Array . isArray ( body . items ) ) {
93
103
this . logger . info ( 'input metadata is array. Proceed with data ' )
94
- await proceedData . call ( this , body . items , cfg )
104
+ await proceedData . call ( this , body . items , msg , cfg )
95
105
} else {
96
106
rawData . push ( body . items )
97
107
if ( timeout ) clearTimeout ( timeout )
98
108
timeout = setTimeout ( ( ) => {
99
109
this . logger . info ( `input metadata is object. Array creation (wait up to ${ TIMEOUT_BETWEEN_EVENTS } ms for more records)` )
100
- proceedData . call ( this , rawData , cfg )
110
+ proceedData . call ( this , rawData , msg , cfg )
101
111
} , TIMEOUT_BETWEEN_EVENTS )
102
112
}
103
113
}
0 commit comments