Skip to content

Commit d1e986b

Browse files
author
David Gómez Matarrodona
committed
machine status info and input
1 parent 3ee406e commit d1e986b

File tree

10 files changed

+301
-4
lines changed

10 files changed

+301
-4
lines changed

examples/config/data/replaceProblem.txt

Lines changed: 25 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"config" : {
3+
"datadir" : "/tmp/nsyslog",
4+
"input" : {"buffer" : 100},
5+
"collector" : {
6+
"key" : "ctx",
7+
"ival" : 1000
8+
}
9+
},
10+
11+
"inputs" : {
12+
"stats" : {
13+
"type" : "machine",
14+
"config" : {
15+
}
16+
}
17+
},
18+
19+
"processors" : {
20+
},
21+
22+
"transporters" : {
23+
"console" : {
24+
"type" : "console",
25+
"attach" : ["flow1"],
26+
"config" : {
27+
"format" : "${JSON:originalMessage.memory}",
28+
"json" : {
29+
"format" : true,
30+
"spaces" : 2,
31+
"color" : true
32+
}
33+
}
34+
},
35+
"null" : {
36+
"type" : "null"
37+
}
38+
},
39+
40+
"flows" : [
41+
{"id":"flow1", "from":"stats", "fork":true, "processors":[], "transporters":"console"}
42+
]
43+
}

examples/config/test052_regex.json

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{
2+
"config" : {
3+
"datadir" : "/tmp/nsyslog",
4+
"input" : {"buffer" : 100}
5+
},
6+
7+
"inputs" : {
8+
"lines" : {
9+
"type" : "file",
10+
"config" : {
11+
"path" : "/opt/nsyslog/examples/config/data/replaceProblem.txt",
12+
"readmode" : "offset",
13+
"offset" : "start"
14+
}
15+
}
16+
},
17+
18+
"processors" : {
19+
"replace" : {
20+
"type" : "properties",
21+
"config" : {
22+
"set" : {
23+
"parsed" : "${originalMessage.replace(/(\\\\\\\\)/g,'\\\\').replace(/(\\\\)r(\\\\)n/g,' ')}"
24+
}
25+
}
26+
}
27+
},
28+
29+
"transporters" : {
30+
"console" : {
31+
"type" : "console",
32+
"config" : {
33+
"format" : "${parsed}"
34+
}
35+
},
36+
"null" : {
37+
"type" : "null"
38+
}
39+
},
40+
41+
"flows" : [
42+
{"id":"flow1", "from":"lines", "fork":false, "processors":["replace"], "transporters":"console"}
43+
]
44+
}

lib/cli/basic.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const Watermark = require('../watermark');
22

33
const
44
colorize = require('json-colorizer'),
5+
jsexpr = require('jsexpr'),
56
{inspect} = require('util'),
67
Stats = require('../stats'),
78
cluster = require('../cluster'),
@@ -63,10 +64,16 @@ else {
6364
action(fnstats);
6465

6566
vorpal.
66-
command('global').
67+
command('global [expression]').
6768
description('Shows global memory').
6869
action(function(args,callback){
69-
vorpal.log(global._);
70+
if(!args.expression) {
71+
vorpal.log(global._);
72+
}
73+
else {
74+
let xpr = jsexpr.expr("${"+args.expression+"}");
75+
vorpal.log(xpr(global._));
76+
}
7077
callback();
7178
});
7279

lib/config/inputs.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ const INPUTS = {
2424
sqlserver : srequire("../input/sqlserver"),
2525
zmq : srequire("../input/zmq"),
2626
ws : srequire("../input/ws"),
27-
elastic : srequire("../input/elastic")
27+
elastic : srequire("../input/elastic"),
28+
machine : srequire("../input/machine")
2829
};
2930

3031
Object.keys(INPUTS).forEach(p=>{

lib/input/machine.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
const
2+
Input = require('.'),
3+
MachineCollector = require('../machine');
4+
5+
class MachineInput extends Input {
6+
constructor(id,type) {
7+
super(id,type);
8+
this.collector = MachineCollector.default;
9+
this.cb = null;
10+
}
11+
12+
get mode() {
13+
return Input.MODE.push;
14+
}
15+
16+
async configure(config,callback) {
17+
callback();
18+
}
19+
20+
start(callback) {
21+
this.cb = (data)=>callback(null,{
22+
id : this.id,
23+
type : this.type,
24+
originalMessage : data
25+
});
26+
this.collector.on(MachineCollector.Event.status,this.cb);
27+
}
28+
29+
stop(callback) {
30+
if(this.cb) {
31+
this.collector.removeListener(MachineCollector.Event.status,this.cb);
32+
this.cb = null;
33+
}
34+
callback();
35+
}
36+
37+
pause(callback) {
38+
if(this.cb) {
39+
this.collector.removeListener(MachineCollector.Event.status,this.cb);
40+
}
41+
callback();
42+
}
43+
44+
resume(callback) {
45+
if(this.cb) {
46+
this.collector.removeListener(MachineCollector.Event.status,this.cb);
47+
this.collector.on(MachineCollector.Event.status,this.cb);
48+
}
49+
callback();
50+
}
51+
52+
key(entry) {
53+
return `${entry.input}:${entry.type}`;
54+
}
55+
}
56+
57+
module.exports = MachineInput;

lib/machine.js

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
const
2+
extend = require('extend'),
3+
os = require('os'),
4+
oss = require('os-utils'),
5+
nodeDiskInfo = require('node-disk-info'),
6+
EventEmitter = require('events'),
7+
Shm = require('./shm');
8+
9+
const DEF_CONF = {
10+
key : 'ctx',
11+
ival : 30000
12+
}
13+
14+
const EVENTS = {
15+
status : 'status'
16+
};
17+
18+
var INSTANCE = null;
19+
20+
class MachineCollector extends EventEmitter {
21+
constructor() {
22+
super();
23+
24+
this.key = DEF_CONF.key;
25+
this.ival = DEF_CONF.ival;
26+
this.to = null;
27+
}
28+
29+
static get Event() {
30+
return EVENTS;
31+
}
32+
33+
static get Defaults() {
34+
return DEF_CONF
35+
}
36+
37+
static get default() {
38+
if(!INSTANCE) {
39+
INSTANCE = new MachineCollector();
40+
}
41+
return INSTANCE;
42+
}
43+
44+
configure(options) {
45+
options = extend(true,{},DEF_CONF,options);
46+
47+
this.ival = options.ival;
48+
this.key = options.key;
49+
}
50+
51+
start() {
52+
this.loop();
53+
}
54+
55+
stop() {
56+
clearTimeout(this.to);
57+
this.to = null;
58+
}
59+
60+
async loop() {
61+
let res = await this.collectMachine();
62+
Shm.hset('global',this.key,res);
63+
this.emit('status',res);
64+
this.to = setTimeout(()=>this.loop(),this.ival);
65+
}
66+
67+
async collectMachine() {
68+
return {
69+
platform : process.platform,
70+
arch : process.arch,
71+
sysUpTime : oss.sysUptime(),
72+
processUpTime : oss.processUptime(),
73+
cpu : os.cpus(),
74+
env : process.env,
75+
ifaces : Object.entries(os.networkInterfaces()).map(i=>({name:i[0],data:i[1]})),
76+
disk : await nodeDiskInfo.getDiskInfo(),
77+
cpuLoad : {
78+
min1:oss.loadavg(1),
79+
min5:oss.loadavg(5),
80+
min15:oss.loadavg(15)
81+
},
82+
memory : {
83+
total : os.totalmem(),
84+
free : os.freemem()
85+
}
86+
}
87+
}
88+
}
89+
90+
module.exports = MachineCollector;

lib/nsyslog/index.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
const
22
logger = require("../logger"),
3+
extend = require('extend'),
34
Logger = require("../logger"),
45
mingo = require('../mingo'),
56
ClusterAware = require("./clusteraware"),
67
GlobalDB = require('./globaldb'),
78
FileQueue = require("fileq"),
89
Reemiter = require("../reemiter"),
10+
MachineCollector = require('../machine'),
911
Config = require("../config"),
1012
InputWrapper = require("../input/wrapper"),
1113
Transporters = Config.Transporters,
@@ -77,6 +79,8 @@ class NSyslog extends ClusterAware {
7779
this.flowmap = {};
7880
this.pubsub = null;
7981

82+
this.collector = MachineCollector.default;
83+
8084
Reemiter.configure(this);
8185
}
8286

@@ -315,6 +319,15 @@ class NSyslog extends ClusterAware {
315319
});
316320
}
317321

322+
async startCollector() {
323+
let conf = extend(true,{config:{collector:MachineCollector.Defaults}},this.config);
324+
if(conf.config.collector.enabled!==false) {
325+
logger.info('Starting machine collector...');
326+
this.collector.configure(conf.config.collector);
327+
this.collector.start();
328+
}
329+
}
330+
318331
/**
319332
* Pushes data manually to the nsyslog instance, writing them to the input stream,
320333
* in order to be sent to the flows
@@ -352,6 +365,7 @@ class NSyslog extends ClusterAware {
352365
logger.info(`\tProcessors`,Object.keys(modules.processors));
353366
logger.info(`\tTransporters`,Object.keys(modules.transporters));
354367

368+
await this.startCollector();
355369
await this.startPubSub();
356370
this.startChildren();
357371
this.startProcessorStream();
@@ -387,6 +401,8 @@ class NSyslog extends ClusterAware {
387401

388402
logger.info('Closing child processes');
389403
this.children.forEach(f=>f.fstream.closefork());
404+
logger.info('Stopping machine collector...');
405+
this.collector.stop();
390406
logger.info('Stopping inputs...');
391407
await Promise.all(Object.keys(inputs).map(key=>inputs[key]).filter(m=>m.own).map(m=>prfn(m,'stop')));
392408
logger.info('Stopping processors...');

package-lock.json

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@
6060
"minimatch": "^3.0.4",
6161
"mkdirp": "^0.5.1",
6262
"mongodb": "^3.3.3",
63+
"node-disk-info": "^1.3.0",
6364
"node-forge": "^0.10.0",
6465
"nools": "^0.4.4",
6566
"nsyslog-parser": "^0.9.6",
67+
"os-utils": "0.0.14",
6668
"pemtools": "^0.4.7",
6769
"promise-stream-queue": "^0.4.5",
6870
"request": "^2.87.0",

0 commit comments

Comments
 (0)