Skip to content

Commit e7730f2

Browse files
author
kkuzmin
authored
Log (#36)
* Add processLog Add default event handlers. Check if collector_id exists Bump package version Pass customer id if exists Bump package version * Address comments.
1 parent 25963a4 commit e7730f2

File tree

4 files changed

+253
-98
lines changed

4 files changed

+253
-98
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ compile: deps
1212

1313
test: compile
1414
npm run test
15+
16+
publish:
17+
npm run publish
1518

1619
clean:
1720
rm -rf node_modules

al_aws_collector.js

Lines changed: 167 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class AlAwsCollector {
6868
static get IngestTypes() {
6969
return {
7070
SECMSGS : 'secmsgs',
71-
VPCFLOW : 'vpcflow'
71+
VPCFLOW : 'vpcflow',
72+
LOGMSGS : 'logmsgs'
7273
}
7374
};
7475

@@ -84,7 +85,8 @@ class AlAwsCollector {
8485
})
8586
}
8687

87-
constructor(context, collectorType, ingestType, version, aimsCreds, formatFun, healthCheckFuns, statsFuns) {
88+
constructor(context, collectorType, ingestType, version, aimsCreds,
89+
formatFun, healthCheckFuns, statsFuns) {
8890
this._invokeContext = context;
8991
this._arn = context.invokedFunctionArn;
9092
this._collectorType = collectorType;
@@ -97,16 +99,33 @@ class AlAwsCollector {
9799
process.env.al_data_residency :
98100
'default';
99101
this._alAzcollectEndpoint = process.env.azollect_api;
100-
this._aimsc = new m_alCollector.AimsC(process.env.al_api, aimsCreds);
102+
this._aimsc = new m_alCollector.AimsC(process.env.al_api, aimsCreds, null, null, process.env.customer_id);
101103
this._endpointsc = new m_alCollector.EndpointsC(process.env.al_api, this._aimsc);
102104
this._azcollectc = new m_alCollector.AzcollectC(process.env.azollect_api, this._aimsc, collectorType);
103105
this._ingestc = new m_alCollector.IngestC(process.env.ingest_api, this._aimsc, 'lambda_function');
104106
this._formatFun = formatFun;
105107
this._customHealthChecks = healthCheckFuns;
106108
this._customStatsFuns = statsFuns;
109+
this._collectorId = process.env.collector_id;
107110
}
108111

109-
_getAttrs() {
112+
set context (context) {
113+
this._invokeContext = context;
114+
}
115+
get context () {
116+
return this._invokeContext;
117+
}
118+
119+
done(error) {
120+
let context = this._invokeContext;
121+
if (error) {
122+
return context.fail(error);
123+
} else {
124+
return context.succeed();
125+
}
126+
}
127+
128+
getProperties() {
110129
return {
111130
awsAccountId : m_alAws.arnToAccId(this._arn),
112131
region : this._region,
@@ -144,7 +163,7 @@ class AlAwsCollector {
144163

145164
register(event, custom) {
146165
const context = this._invokeContext;
147-
const regValues = Object.assign(this._getAttrs(), custom);
166+
const regValues = Object.assign(this.getProperties(), custom);
148167

149168
async.waterfall([
150169
(asyncCallback) => {
@@ -154,8 +173,8 @@ class AlAwsCollector {
154173
} = process.env;
155174

156175
if(!azcollect_api || !ingest_api){
157-
// handling errors like this because the other unit tests seem to indicat that the collectorshould
158-
// register even if there is an error in getting the endpoints.
176+
// handling errors like this because the other unit tests seem to indicate that
177+
// the collector should register even if there is an error in getting the endpoints.
159178
this.updateEndpoints((err, newConfig) => {
160179
if(err){
161180
console.warn('AWSC0002 Error updating endpoints', err);
@@ -179,13 +198,18 @@ class AlAwsCollector {
179198
}
180199
},
181200
(asyncCallback) => {
182-
this._azcollectc.register(regValues)
183-
.then(resp => {
184-
asyncCallback(null);
185-
})
186-
.catch(exception => {
187-
asyncCallback('AWSC0003 registration error: ' + exception);
188-
});
201+
if (!process.env.collector_id || process.env.collector_id === 'none') {
202+
this._azcollectc.register(regValues)
203+
.then(resp => {
204+
const newCollectorId = resp.collector ? resp.collector.id : 'none';
205+
return m_alAws.setEnv({ collector_id: newCollectorId }, asyncCallback);
206+
})
207+
.catch(exception => {
208+
return asyncCallback('AWSC0003 registration error: ' + exception);
209+
});
210+
} else {
211+
return asyncCallback(null);
212+
}
189213
}
190214
],
191215
(err)=> {
@@ -197,6 +221,13 @@ class AlAwsCollector {
197221
});
198222
}
199223

224+
handleCheckin() {
225+
var collector = this;
226+
collector.checkin(function(err) {
227+
return collector.done(err);
228+
});
229+
}
230+
200231
checkin(callback) {
201232
var collector = this;
202233
const context = this._invokeContext;
@@ -218,7 +249,7 @@ class AlAwsCollector {
218249
],
219250
function(err, checkinParts) {
220251
const checkin = Object.assign(
221-
collector._getAttrs(), checkinParts[0], checkinParts[1]
252+
collector.getProperties(), checkinParts[0], checkinParts[1]
222253
);
223254
collector._azcollectc.checkin(checkin)
224255
.then(resp => {
@@ -238,53 +269,72 @@ class AlAwsCollector {
238269

239270
deregister(event, custom){
240271
const context = this._invokeContext;
241-
const regValues = Object.assign(this._getAttrs(), custom);
272+
const regValues = Object.assign(this.getProperties(), custom);
242273

243274
this._azcollectc.deregister(regValues)
244275
.then(resp => {
245276
return response.send(event, context, response.SUCCESS);
246277
})
247278
.catch(exception => {
248-
return response.send(event, context, response.FAILED, {Error: exception});
279+
console.warn('AWSC0011 Collector deregistration failed. ', exception);
280+
// Respond with SUCCESS in order to delete CF stack with no issues.
281+
return response.send(event, context, response.SUCCESS);
249282
});
250283
}
251284

252-
send(data, callback){
285+
send(data, compress = true, callback) {
253286
var collector = this;
254-
var ingestType = collector._ingestType;
255-
287+
256288
if(!data){
257289
return callback(null);
258290
}
259-
260-
zlib.deflate(data, function(compressionErr, compressed) {
261-
if (compressionErr) {
262-
return callback(compressionErr);
263-
} else {
264-
switch (ingestType) {
265-
case AlAwsCollector.IngestTypes.SECMSGS:
266-
collector._ingestc.sendSecmsgs(compressed)
267-
.then(resp => {
268-
return callback(null, resp);
269-
})
270-
.catch(exception => {
271-
return callback(exception);
272-
});
273-
break;
274-
case AlAwsCollector.IngestTypes.VPCFLOW:
275-
collector._ingestc.sendVpcFlow(compressed)
276-
.then(resp => {
277-
return callback(null, resp);
278-
})
279-
.catch(exception => {
280-
return callback(exception);
281-
});
282-
break;
283-
default:
284-
return callback(`AWSC0005 Unknown Alertlogic ingestion type: ${ingestType}`);
291+
if (compress) {
292+
zlib.deflate(data, function(compressionErr, compressed) {
293+
if (compressionErr) {
294+
return callback(compressionErr);
295+
} else {
296+
return collector._send(compressed, callback);
285297
}
286-
}
287-
});
298+
});
299+
} else {
300+
return collector._send(data, callback);
301+
}
302+
}
303+
304+
_send(data, callback) {
305+
var collector = this;
306+
var ingestType = collector._ingestType;
307+
switch (ingestType) {
308+
case AlAwsCollector.IngestTypes.SECMSGS:
309+
collector._ingestc.sendSecmsgs(data)
310+
.then(resp => {
311+
return callback(null, resp);
312+
})
313+
.catch(exception => {
314+
return callback(exception);
315+
});
316+
break;
317+
case AlAwsCollector.IngestTypes.VPCFLOW:
318+
collector._ingestc.sendVpcFlow(data)
319+
.then(resp => {
320+
return callback(null, resp);
321+
})
322+
.catch(exception => {
323+
return callback(exception);
324+
});
325+
break;
326+
case AlAwsCollector.IngestTypes.LOGMSGS:
327+
collector._ingestc.sendLogmsgs(data)
328+
.then(resp => {
329+
return callback(null, resp);
330+
})
331+
.catch(exception => {
332+
return callback(exception);
333+
});
334+
break;
335+
default:
336+
return callback(`AWSC0005 Unknown Alertlogic ingestion type: ${ingestType}`);
337+
}
288338
}
289339

290340
process(event, callback) {
@@ -294,13 +344,46 @@ class AlAwsCollector {
294344
function(asyncCallback) {
295345
collector._formatFun(event, context, asyncCallback);
296346
},
297-
function(formatedData, asyncCallback) {
298-
collector.send(formatedData, asyncCallback);
347+
function(formattedData, compress, asyncCallback) {
348+
if(arguments.length === 2 && typeof compress === "function"){
349+
asyncCallback = compress;
350+
compress = true;
351+
}
352+
collector.send(formattedData, compress, asyncCallback);
299353
}
300354
],
301355
callback);
302356
}
303357

358+
processLog(messages, formatFun, hostmetaElems, callback) {
359+
if(arguments.length === 3 && typeof hostmetaElems === "function"){
360+
callback = hostmetaElems;
361+
hostmetaElems = this._defaultHostmetaElems();
362+
}
363+
var collector = this;
364+
365+
if (messages && messages.length > 0) {
366+
m_alCollector.AlLog.buildPayload(
367+
collector._collectorId, collector._collectorId, hostmetaElems, messages, formatFun, function(err, payload){
368+
if (err) {
369+
return callback(err);
370+
} else {
371+
return collector.send(payload, false, callback);
372+
}
373+
});
374+
} else {
375+
return callback(null, {});
376+
}
377+
378+
}
379+
380+
handleUpdate() {
381+
var collector = this;
382+
collector.update(function(err) {
383+
return collector.done(err);
384+
});
385+
}
386+
304387
update(callback) {
305388
let collector = this;
306389

@@ -363,21 +446,30 @@ class AlAwsCollector {
363446
});
364447
}
365448

366-
handleDefaultEvents(scheduledEvent, callback) {
449+
handleEvent(event) {
367450
let collector = this;
368-
369-
switch (scheduledEvent.Type) {
370-
case 'SelfUpdate':
371-
return collector.update(callback);
372-
break;
373-
case 'Checkin':
374-
return collector.checkin(callback);
375-
break;
376-
default:
377-
return callback('AWSC0009 Unknown scheduled event detail type: ' + scheduledEvent.Type);
451+
let context = this._invokeContext;
452+
switch (event.RequestType) {
453+
case 'ScheduledEvent':
454+
switch (event.Type) {
455+
case 'SelfUpdate':
456+
return collector.handleUpdate();
457+
break;
458+
case 'Checkin':
459+
return collector.handleCheckin();
460+
break;
461+
default:
462+
return context.fail('AWSC0009 Unknown scheduled event detail type: ' + event.Type);
463+
}
464+
case 'Create':
465+
return collector.register(event, {});
466+
case 'Delete':
467+
return collector.deregister(event, {});
468+
default:
469+
return context.fail('AWSC0012 Unknown event:' + event);
378470
}
379471
}
380-
472+
381473
_applyConfigChanges(newValues, config, callback) {
382474
var jsonConfig = JSON.stringify(config);
383475
var newConfig = JSON.parse(jsonConfig);
@@ -425,6 +517,19 @@ class AlAwsCollector {
425517
delete(newConfig.MasterArn);
426518
return newConfig;
427519
}
520+
521+
_defaultHostmetaElems() {
522+
return [
523+
{
524+
key: 'host_type',
525+
value: {str: 'lambda'}
526+
},
527+
{
528+
key: 'local_hostname',
529+
value: {str: process.env.AWS_LAMBDA_FUNCTION_NAME}
530+
}
531+
];
532+
}
428533
}
429534

430535
module.exports = AlAwsCollector;

0 commit comments

Comments
 (0)