Skip to content

Commit 66a4e8e

Browse files
committed
Add event listeners synchronously, use legacy Logstash mapping types with ES <= 5.x
1 parent e3e0b72 commit 66a4e8e

File tree

1 file changed

+79
-72
lines changed

1 file changed

+79
-72
lines changed

lib/appmetrics-elk.js

Lines changed: 79 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -32,52 +32,42 @@ module.exports = function connector(emitter, opts) {
3232

3333
let bulkarray = [];
3434

35-
const startMonitoring = () => {
36-
const preConfigured = ['memory'];
35+
const preconfiguredEvents = ['memory'];
3736

38-
/*
39-
* Memory is a special case as we change the structure of the data
40-
*/
41-
emitter.on('memory', memory => {
42-
const data = {
43-
process: {
44-
private: memory.private,
45-
physical: memory.physical,
46-
virtual: memory.virtual,
47-
},
48-
system: {
49-
physical: memory.physical_used,
50-
total: memory.physical_total,
51-
},
52-
};
53-
publishData('memory', memory.time, data);
54-
});
55-
56-
const registerCallback = (value, type) => {
57-
if (preConfigured.indexOf(type) === -1) {
58-
emitter.on(type, eventdata => {
59-
const data = {};
60-
const { properties } = value.body.properties[type];
61-
for (const prop of Object.keys(properties)) {
62-
data[prop] = eventdata[prop];
63-
}
64-
publishData(type, eventdata.time, data);
65-
});
66-
}
37+
/*
38+
* Memory is a special case as we change the structure of the data
39+
*/
40+
emitter.on('memory', memory => {
41+
const data = {
42+
process: {
43+
private: memory.private,
44+
physical: memory.physical,
45+
virtual: memory.virtual,
46+
},
47+
system: {
48+
physical: memory.physical_used,
49+
total: memory.physical_total,
50+
},
6751
};
52+
publishData('memory', memory.time, data);
53+
});
6854

69-
/*
70-
* Register a callback for every event type we have a mapping for
71-
*/
72-
getJSONfromDir('mappings', registerCallback);
73-
74-
setInterval(() => {
75-
if (bulkarray.length > 0) {
76-
publishBulk(bulkarray);
77-
bulkarray = [];
55+
/*
56+
* Register a callback for every event type we have a mapping for
57+
*/
58+
mapJsonInDir('mappings', (value, type) => {
59+
if (preconfiguredEvents.indexOf(type) !== -1) {
60+
return;
61+
}
62+
emitter.on(type, eventData => {
63+
const data = {};
64+
const { properties } = value.body.properties[type];
65+
for (const prop of Object.keys(properties)) {
66+
data[prop] = eventData[prop];
7867
}
79-
}, 5000);
80-
};
68+
publishData(type, eventData.time, data);
69+
});
70+
});
8171

8272
/*
8373
* Publishing data to Elasticsearch (ES)
@@ -97,17 +87,6 @@ module.exports = function connector(emitter, opts) {
9787
bulkarray.push(action, doc);
9888
};
9989

100-
const publishBulk = () => {
101-
esClient
102-
.bulk({
103-
index: opts.index,
104-
type: 'doc',
105-
body: bulkarray,
106-
})
107-
// .then(res => console.log('Published bulk update ' + res)
108-
.catch(err => console.log('Error doing bulk update ' + err));
109-
};
110-
11190
esClient
11291
.search({
11392
index: '.kibana',
@@ -140,47 +119,55 @@ module.exports = function connector(emitter, opts) {
140119
}
141120

142121
/*
143-
* Check to see if the appmetrics index exists in ElasticSearch. if
122+
* Check to see if the appmetrics index exists in Elasticsearch. if
144123
* is doesn't, then create the index to store the data into and
145124
* upload the data type mappings
146125
*/
147126
return esClient.indices
148127
.exists({ index: opts.index })
149128
.then(exists => !exists && esClient.indices.create({ index: opts.index }))
150-
.then(() => putMappings(esClient, esVersion, opts.index));
129+
.then(() => putMappings(esClient, esVersion, opts.index))
130+
.then(() => esVersion);
151131
})
152132
.catch(err => console.error('Failed to create index', err.stack))
153133
)
154-
.then(startMonitoring);
134+
.then(esVersion => {
135+
publishBulk(esClient, esVersion, opts.index, bulkarray);
136+
setInterval(publishBulk, 5000, esClient, esVersion, opts.index, bulkarray);
137+
});
155138

156139
return emitter;
157140
};
158141

159-
function getJSONfromDir(directory, callback) {
142+
function mapJsonInDir(directory, callback) {
160143
const dirPath = path.join(__dirname, '..', directory);
161-
fs.readdir(dirPath, (err, files) => {
162-
if (err) {
163-
console.log('Failed to read from ' + dirPath);
164-
} else {
165-
for (const filename of files) {
166-
const file = path.join(dirPath, filename);
167-
const basename = path.basename(filename, '.json');
168-
callback.call(this, JSON.parse(fs.readFileSync(file, 'utf8')), basename);
169-
}
144+
try {
145+
const files = fs.readdirSync(dirPath);
146+
for (const filename of files) {
147+
const file = path.join(dirPath, filename);
148+
const basename = path.basename(filename, '.json');
149+
callback(JSON.parse(fs.readFileSync(file, 'utf8')), basename);
170150
}
171-
});
151+
} catch (err) {
152+
console.error('Failed to read from ' + dirPath);
153+
console.error(err);
154+
}
172155
}
173156

174157
/*
175158
* Put the mappings for the data we create into the index. It
176159
* shouldn't matter if we replace existing records as they should be the same...
177160
*/
178161
function putMappings(esClient, esVersion, index) {
179-
getJSONfromDir('mappings', mapping => {
162+
mapJsonInDir('mappings', (mapping, type) => {
180163
mapping.index = index;
181164
if (esVersion <= 2) {
182165
backportFieldTypes(mapping);
183166
}
167+
if (esVersion <= 5) {
168+
mapping.type = type;
169+
delete mapping.body.properties.type;
170+
}
184171
esClient.indices
185172
.putMapping(mapping)
186173
// .then(res => console.log('Put mapping for ' + fileName))
@@ -189,7 +176,7 @@ function putMappings(esClient, esVersion, index) {
189176
}
190177

191178
function putIndexes(esClient, esVersion, index) {
192-
getJSONfromDir('indexes', indexPattern => {
179+
mapJsonInDir('indexes', indexPattern => {
193180
indexPattern.id = 'index-pattern:' + index;
194181
indexPattern.body[indexPattern.body.type].title = index;
195182
if (esVersion <= 5) {
@@ -203,7 +190,7 @@ function putIndexes(esClient, esVersion, index) {
203190
}
204191

205192
function putDashboards(esClient, esVersion) {
206-
getJSONfromDir('dashboards', dashboard => {
193+
mapJsonInDir('dashboards', dashboard => {
207194
if (esVersion <= 5) {
208195
backportKibanaDoc(dashboard);
209196
}
@@ -215,10 +202,11 @@ function putDashboards(esClient, esVersion) {
215202
}
216203

217204
function putCharts(esClient, esVersion, index) {
218-
getJSONfromDir('charts', chart => {
205+
mapJsonInDir('charts', chart => {
219206
const { kibanaSavedObjectMeta } = chart.body[chart.body.type];
220207
const searchSourceJSON = JSON.parse(kibanaSavedObjectMeta.searchSourceJSON);
221-
kibanaSavedObjectMeta.searchSourceJSON = JSON.stringify(Object.assign(searchSourceJSON, { index }));
208+
searchSourceJSON.index = index;
209+
kibanaSavedObjectMeta.searchSourceJSON = JSON.stringify(searchSourceJSON);
222210
if (esVersion <= 5) {
223211
backportKibanaDoc(chart);
224212
}
@@ -229,6 +217,25 @@ function putCharts(esClient, esVersion, index) {
229217
});
230218
}
231219

220+
function publishBulk(esClient, esVersion, index, actions) {
221+
if (!actions.length) return;
222+
if (esVersion <= 5) {
223+
for (let i = 0; i < actions.length; i += 2) {
224+
actions[i].index._type = actions[i + 1].type;
225+
delete actions[i + 1].type;
226+
}
227+
}
228+
esClient
229+
.bulk({
230+
index,
231+
type: 'doc',
232+
body: actions,
233+
})
234+
// .then(res => console.log('Published bulk update ' + res)
235+
.catch(err => console.log('Error doing bulk update ' + err));
236+
actions.length = 0;
237+
}
238+
232239
/*
233240
* Converts Kibana 6.x documents back to 2.x/5.x
234241
*/

0 commit comments

Comments
 (0)