@@ -32,52 +32,42 @@ module.exports = function connector(emitter, opts) {
3232
3333 let bulkarray = [ ] ;
3434
35- const startMonitoring = ( ) => {
36- const preConfigured = [ 'memory' ] ;
35+ const preconfiguredEvents = [ 'memory' ] ;
3736
38- /*
39- * Memory is a special case as we change the structure of the data
40- */
41- emitter . on ( 'memory' , memory => {
42- const data = {
43- process : {
44- private : memory . private ,
45- physical : memory . physical ,
46- virtual : memory . virtual ,
47- } ,
48- system : {
49- physical : memory . physical_used ,
50- total : memory . physical_total ,
51- } ,
52- } ;
53- publishData ( 'memory' , memory . time , data ) ;
54- } ) ;
55-
56- const registerCallback = ( value , type ) => {
57- if ( preConfigured . indexOf ( type ) === - 1 ) {
58- emitter . on ( type , eventdata => {
59- const data = { } ;
60- const { properties } = value . body . properties [ type ] ;
61- for ( const prop of Object . keys ( properties ) ) {
62- data [ prop ] = eventdata [ prop ] ;
63- }
64- publishData ( type , eventdata . time , data ) ;
65- } ) ;
66- }
37+ /*
38+ * Memory is a special case as we change the structure of the data
39+ */
40+ emitter . on ( 'memory' , memory => {
41+ const data = {
42+ process : {
43+ private : memory . private ,
44+ physical : memory . physical ,
45+ virtual : memory . virtual ,
46+ } ,
47+ system : {
48+ physical : memory . physical_used ,
49+ total : memory . physical_total ,
50+ } ,
6751 } ;
52+ publishData ( 'memory' , memory . time , data ) ;
53+ } ) ;
6854
69- /*
70- * Register a callback for every event type we have a mapping for
71- */
72- getJSONfromDir ( 'mappings' , registerCallback ) ;
73-
74- setInterval ( ( ) => {
75- if ( bulkarray . length > 0 ) {
76- publishBulk ( bulkarray ) ;
77- bulkarray = [ ] ;
55+ /*
56+ * Register a callback for every event type we have a mapping for
57+ */
58+ mapJsonInDir ( 'mappings' , ( value , type ) => {
59+ if ( preconfiguredEvents . indexOf ( type ) !== - 1 ) {
60+ return ;
61+ }
62+ emitter . on ( type , eventData => {
63+ const data = { } ;
64+ const { properties } = value . body . properties [ type ] ;
65+ for ( const prop of Object . keys ( properties ) ) {
66+ data [ prop ] = eventData [ prop ] ;
7867 }
79- } , 5000 ) ;
80- } ;
68+ publishData ( type , eventData . time , data ) ;
69+ } ) ;
70+ } ) ;
8171
8272 /*
8373 * Publishing data to Elasticsearch (ES)
@@ -97,17 +87,6 @@ module.exports = function connector(emitter, opts) {
9787 bulkarray . push ( action , doc ) ;
9888 } ;
9989
100- const publishBulk = ( ) => {
101- esClient
102- . bulk ( {
103- index : opts . index ,
104- type : 'doc' ,
105- body : bulkarray ,
106- } )
107- // .then(res => console.log('Published bulk update ' + res)
108- . catch ( err => console . log ( 'Error doing bulk update ' + err ) ) ;
109- } ;
110-
11190 esClient
11291 . search ( {
11392 index : '.kibana' ,
@@ -140,47 +119,55 @@ module.exports = function connector(emitter, opts) {
140119 }
141120
142121 /*
143- * Check to see if the appmetrics index exists in ElasticSearch . if
122+ * Check to see if the appmetrics index exists in Elasticsearch . if
144123 * is doesn't, then create the index to store the data into and
145124 * upload the data type mappings
146125 */
147126 return esClient . indices
148127 . exists ( { index : opts . index } )
149128 . then ( exists => ! exists && esClient . indices . create ( { index : opts . index } ) )
150- . then ( ( ) => putMappings ( esClient , esVersion , opts . index ) ) ;
129+ . then ( ( ) => putMappings ( esClient , esVersion , opts . index ) )
130+ . then ( ( ) => esVersion ) ;
151131 } )
152132 . catch ( err => console . error ( 'Failed to create index' , err . stack ) )
153133 )
154- . then ( startMonitoring ) ;
134+ . then ( esVersion => {
135+ publishBulk ( esClient , esVersion , opts . index , bulkarray ) ;
136+ setInterval ( publishBulk , 5000 , esClient , esVersion , opts . index , bulkarray ) ;
137+ } ) ;
155138
156139 return emitter ;
157140} ;
158141
159- function getJSONfromDir ( directory , callback ) {
142+ function mapJsonInDir ( directory , callback ) {
160143 const dirPath = path . join ( __dirname , '..' , directory ) ;
161- fs . readdir ( dirPath , ( err , files ) => {
162- if ( err ) {
163- console . log ( 'Failed to read from ' + dirPath ) ;
164- } else {
165- for ( const filename of files ) {
166- const file = path . join ( dirPath , filename ) ;
167- const basename = path . basename ( filename , '.json' ) ;
168- callback . call ( this , JSON . parse ( fs . readFileSync ( file , 'utf8' ) ) , basename ) ;
169- }
144+ try {
145+ const files = fs . readdirSync ( dirPath ) ;
146+ for ( const filename of files ) {
147+ const file = path . join ( dirPath , filename ) ;
148+ const basename = path . basename ( filename , '.json' ) ;
149+ callback ( JSON . parse ( fs . readFileSync ( file , 'utf8' ) ) , basename ) ;
170150 }
171- } ) ;
151+ } catch ( err ) {
152+ console . error ( 'Failed to read from ' + dirPath ) ;
153+ console . error ( err ) ;
154+ }
172155}
173156
174157/*
175158 * Put the mappings for the data we create into the index. It
176159 * shouldn't matter if we replace existing records as they should be the same...
177160 */
178161function putMappings ( esClient , esVersion , index ) {
179- getJSONfromDir ( 'mappings' , mapping => {
162+ mapJsonInDir ( 'mappings' , ( mapping , type ) => {
180163 mapping . index = index ;
181164 if ( esVersion <= 2 ) {
182165 backportFieldTypes ( mapping ) ;
183166 }
167+ if ( esVersion <= 5 ) {
168+ mapping . type = type ;
169+ delete mapping . body . properties . type ;
170+ }
184171 esClient . indices
185172 . putMapping ( mapping )
186173 // .then(res => console.log('Put mapping for ' + fileName))
@@ -189,7 +176,7 @@ function putMappings(esClient, esVersion, index) {
189176}
190177
191178function putIndexes ( esClient , esVersion , index ) {
192- getJSONfromDir ( 'indexes' , indexPattern => {
179+ mapJsonInDir ( 'indexes' , indexPattern => {
193180 indexPattern . id = 'index-pattern:' + index ;
194181 indexPattern . body [ indexPattern . body . type ] . title = index ;
195182 if ( esVersion <= 5 ) {
@@ -203,7 +190,7 @@ function putIndexes(esClient, esVersion, index) {
203190}
204191
205192function putDashboards ( esClient , esVersion ) {
206- getJSONfromDir ( 'dashboards' , dashboard => {
193+ mapJsonInDir ( 'dashboards' , dashboard => {
207194 if ( esVersion <= 5 ) {
208195 backportKibanaDoc ( dashboard ) ;
209196 }
@@ -215,10 +202,11 @@ function putDashboards(esClient, esVersion) {
215202}
216203
217204function putCharts ( esClient , esVersion , index ) {
218- getJSONfromDir ( 'charts' , chart => {
205+ mapJsonInDir ( 'charts' , chart => {
219206 const { kibanaSavedObjectMeta } = chart . body [ chart . body . type ] ;
220207 const searchSourceJSON = JSON . parse ( kibanaSavedObjectMeta . searchSourceJSON ) ;
221- kibanaSavedObjectMeta . searchSourceJSON = JSON . stringify ( Object . assign ( searchSourceJSON , { index } ) ) ;
208+ searchSourceJSON . index = index ;
209+ kibanaSavedObjectMeta . searchSourceJSON = JSON . stringify ( searchSourceJSON ) ;
222210 if ( esVersion <= 5 ) {
223211 backportKibanaDoc ( chart ) ;
224212 }
@@ -229,6 +217,25 @@ function putCharts(esClient, esVersion, index) {
229217 } ) ;
230218}
231219
220+ function publishBulk ( esClient , esVersion , index , actions ) {
221+ if ( ! actions . length ) return ;
222+ if ( esVersion <= 5 ) {
223+ for ( let i = 0 ; i < actions . length ; i += 2 ) {
224+ actions [ i ] . index . _type = actions [ i + 1 ] . type ;
225+ delete actions [ i + 1 ] . type ;
226+ }
227+ }
228+ esClient
229+ . bulk ( {
230+ index,
231+ type : 'doc' ,
232+ body : actions ,
233+ } )
234+ // .then(res => console.log('Published bulk update ' + res)
235+ . catch ( err => console . log ( 'Error doing bulk update ' + err ) ) ;
236+ actions . length = 0 ;
237+ }
238+
232239/*
233240 * Converts Kibana 6.x documents back to 2.x/5.x
234241 */
0 commit comments