Skip to content

Commit bbba0dd

Browse files
author
David Gómez Matarrodona
committed
GlobalDB
1 parent e417fd7 commit bbba0dd

File tree

8 files changed

+67
-14
lines changed

8 files changed

+67
-14
lines changed

examples/config/test047_global.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"console" : {
3939
"type" : "console",
4040
"config" : {
41-
"format" : "Message from flow1 : ${global.test}"
41+
"format" : "Message from flow1 : ${_.test} / ${_.itop.asamblea.clientId}"
4242
}
4343
},
4444
"global" : {

lib/cli/basic.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ else {
6666
command('global').
6767
description('Shows global memory').
6868
action(function(args,callback){
69-
vorpal.log(global);
69+
vorpal.log(global._);
7070
callback();
7171
});
7272

lib/config/index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ async function read(file,callback,options) {
175175

176176
// Set defaults for TLS
177177
TLS.defaults({},json.$path);
178-
sanitizeModules(json); // Extend modules config
178+
sanitizeModules(json); // Extend modules config
179179
let sflowErrors = sanitizeFlows(json); // Sanitize flow configs
180180

181181
let inputErrors = await processInputs(json); // Get Inputs
182-
let filterErrors = processFilters(json); // Get Filters
182+
let filterErrors = processFilters(json); // Get Filters
183183
let flowErrors = await processFlows(json); // Get Flows
184184

185185
let errors = flatten([].concat(

lib/nsyslog/globaldb.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
const
2+
fs = require('fs-extra'),
3+
path = require('path'),
4+
Shm = require('../shm'),
5+
logger = require('../logger'),
6+
{timer} = require('../util'),
7+
chokidar = require('chokidar');
8+
9+
class GlobalDB {
10+
constructor(datadir) {
11+
this.file = path.resolve(datadir,'global.json');
12+
this.watcher = chokidar.watch(path.resolve(datadir,'global.json'));
13+
this.watcher.on('all',(event, path)=>{
14+
logger.warn('GlobalDB file ',this.file, 'has changed');
15+
this.reloadDatabase();
16+
});
17+
}
18+
19+
async reloadDatabase() {
20+
try {
21+
await timer(500);
22+
let raw = await fs.readFile(this.file, 'utf-8');
23+
let json = JSON.parse(raw);
24+
25+
Object.keys(json).forEach(k=>{
26+
Shm.hset('global',k,json[k]);
27+
});
28+
}catch(err) {
29+
logger.error(err);
30+
}
31+
}
32+
}
33+
34+
module.exports = GlobalDB;

lib/nsyslog/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const
33
Logger = require("../logger"),
44
mingo = require('../mingo'),
55
ClusterAware = require("./clusteraware"),
6+
GlobalDB = require('./globaldb'),
67
FileQueue = require("fileq"),
78
Reemiter = require("../reemiter"),
89
Config = require("../config"),
@@ -52,6 +53,7 @@ class NSyslog extends ClusterAware {
5253
this.allconfigs = this.config.split();
5354
this.config = this.allconfigs[Config.MASTER_PROC];
5455
this.modules = this.config.modules;
56+
this.globaldb = new GlobalDB(this.datadir);
5557
}
5658

5759
this.mingo = mingo;

lib/processor/parser/sm.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class StateMachine {
5858
// A la hora de parsear, __ almacena el fragmento de linea a parsear por la regla
5959
let obj = {_:line,__:line};
6060
let results = [];
61+
62+
this.state = this.multi? this.state : START;
6163

6264
while(true) {
6365
let state = this.rules[this.state] || this.rules[START];

lib/shm.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ else {
153153
switch(msg.cmd) {
154154
case 'subscribe' :
155155
Object.assign(SHM,msg.res);
156+
this.shm.emit('init',msg.res);
156157
break;
157158
default :
158159
this[msg.cmd].apply(this,msg.args.concat(true));

lib/transporter/global.js

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,28 @@ const DEF_CONF = {
1010
"output" : "nsyslog"
1111
};
1212

13-
const OUTCACHE = {};
14-
Shm.shm.on('hset',(parent,key,value)=>{
15-
if(parent!='global') return;
16-
if(!OUTCACHE[key]) {
17-
OUTCACHE[key] = jsexpr.assign(key);
18-
}
19-
OUTCACHE[key](global,value);
20-
logger.silly("HSET",{parent,key,value});
21-
});
13+
function initialize() {
14+
const OUTCACHE = {};
15+
global._ = global._ || {};
16+
17+
Shm.shm.on('hset',(parent,key,value)=>{
18+
if(parent!='global') return;
19+
if(!OUTCACHE[key]) {
20+
OUTCACHE[key] = jsexpr.assign(key);
21+
}
22+
OUTCACHE[key](global._, value);
23+
logger.silly("SHM HSET",{parent,key,value});
24+
});
25+
26+
Shm.shm.on('init',data=>{
27+
if(data['global']) {
28+
global._ = data.global;
29+
logger.silly('SHM INIT',global._);
30+
}
31+
});
32+
33+
34+
}
2235

2336
class GlobalTransporter extends Transporter {
2437
constructor(id,type) {
@@ -38,10 +51,11 @@ class GlobalTransporter extends Transporter {
3851

3952
transport(entry, callback) {
4053
var msg = this.input(entry);
41-
this.output(global, msg);
54+
this.output(global._, msg);
4255
Shm.hset('global',this.outkey,msg);
4356
callback(null,entry);
4457
}
4558
}
4659

60+
initialize();
4761
module.exports = GlobalTransporter;

0 commit comments

Comments
 (0)