Skip to content

Commit 6e0bba7

Browse files
committed
functional worker
1 parent 5332d3a commit 6e0bba7

File tree

6 files changed

+117
-45
lines changed

6 files changed

+117
-45
lines changed

src/config/config.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import * as os from 'os';
2+
3+
export const config = {
4+
redisHost: process.env.REDIS_HOST || '127.0.0.1',
5+
redisPort: Number(process.env.REDIS_PORT) || 6379,
6+
defaultQueue: process.env.DEFAULT_QUEUE || 'zmon:queue:default',
7+
}

src/index.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import { getQueueClient } from './redis/redisHandler';
22
import { startServer } from './server/server';
33
import ZmonWorker from './worker/zmon-worker';
4+
import { config } from './config/config';
45

56
// start health endpoint server
6-
startServer()
7+
const redisClient = getQueueClient(config.redisHost, config.redisPort);
78

8-
// connect to redis
9-
const redisClient = getQueueClient();
10-
const queueName = 'zmon:queue:default';
9+
const cb = () => {
10+
console.log('\nterminate Redis connection');
11+
redisClient.end();
12+
}
13+
14+
startServer(cb);
1115

1216
// start worker
13-
const w = new ZmonWorker(redisClient, queueName);
17+
const w = new ZmonWorker(redisClient, config.defaultQueue);
1418
w.startWorker();

src/server/server.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,29 @@ const app = express()
77

88
app.get('/', (req, res) => {
99
res.send('ok')
10-
})
10+
});
1111

1212
const server = createServer(app)
1313

14-
function onSignal(): Promise<any> {
15-
console.log('server is starting cleanup')
16-
return Promise.all([
17-
null
18-
]);
14+
function onSignal(cb: () => any): () => Promise<any>{
15+
console.log('server is starting cleanup');
16+
return function (): Promise<any> {
17+
return Promise.all([
18+
cb()
19+
]);
20+
}
1921
}
2022

2123
async function onHealthCheck() {
2224
// checks if the system is healthy, like the db connection is live
2325
// resolves, if health, rejects if not
2426
}
2527

26-
export function startServer() {
28+
export function startServer(cb: () => any) {
2729
createTerminus(server, {
2830
signal: 'SIGINT',
2931
healthChecks: { '/healthcheck': onHealthCheck },
30-
onSignal,
32+
onSignal: onSignal(cb),
3133
});
3234

3335
server.listen(3000)

src/worker/exec.spec.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { execCheck } from './exec';
1+
import { execZmonScript } from './exec';
22

33
function mockHttp() {
44
return {response: true};
@@ -24,7 +24,7 @@ describe('Worker', () => {
2424
`;
2525
const ctx = {};
2626

27-
const res = execCheck(ctx, check, entity);
27+
const res = execZmonScript(ctx, check, {entity});
2828
expect(res).toEqual('kube_pod');
2929
});
3030

@@ -36,8 +36,9 @@ describe('Worker', () => {
3636
`;
3737
let res;
3838
let err;
39+
3940
try {
40-
res = execCheck({}, check, entity);
41+
res = execZmonScript({}, check, {entity});
4142
} catch (e) {
4243
err = e;
4344
}
@@ -61,13 +62,13 @@ describe('Worker', () => {
6162
`;
6263
const ctx = {};
6364

64-
const res = execCheck(ctx, check, entity);
65+
const res = execZmonScript(ctx, check, {entity});
6566
expect(res).toEqual({type: entity.type, team: entity.team});
6667
});
6768

6869
it('should make an http call', () => {
6970
const ctx = {http: mockHttp}
70-
const res = execCheck(ctx, check, entity);
71+
const res = execZmonScript(ctx, check, {entity});
7172
expect(res).toEqual({response: true});
7273
});
7374

@@ -82,7 +83,7 @@ describe('Worker', () => {
8283
let err;
8384

8485
try {
85-
res = execCheck(ctx, check, entity);
86+
res = execZmonScript(ctx, check, {entity});
8687
} catch (e) {
8788
err = e;
8889
}
@@ -95,7 +96,7 @@ describe('Worker', () => {
9596
const ctx = {}
9697
let res;
9798
try {
98-
res = execCheck(ctx, check, entity);
99+
res = execZmonScript(ctx, check, {entity});
99100
} catch (e) {}
100101

101102
expect(res).toBeUndefined();

src/worker/exec.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
import { Context, createContext, CreateContextOptions, runInNewContext, RunningScriptOptions } from 'vm';
2+
import { performance } from 'perf_hooks';
23

3-
export function execCheck(ctx: Context, checkDefinition: string, entity: any) {
4-
const script = `(${checkDefinition})();`
4+
export function execZmonScript(ctx: Context, script: string, additionalCtx: {value: any} | {entity: Object}) {
5+
const scriptToExec = `(${script})();`
56
const opt: CreateContextOptions = {
67
name: 'checkId: xxx',
78
codeGeneration: {
89
strings: false
910
},
1011
};
1112

12-
const localCtx = {...ctx, entity};
13+
const localCtx = {...ctx, ...additionalCtx};
1314
const newCtx = createContext(localCtx, opt);
1415

15-
return runInNewContext(script, newCtx);
16+
const t0 = Date.now();
17+
const result = runInNewContext(scriptToExec, newCtx);
18+
const t1 = Date.now();
19+
20+
return {
21+
td: t1 - t0,
22+
result,
23+
};
1624
}
1725

1826

src/worker/zmon-worker.ts

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,102 @@
11
import { RedisClient } from 'redis';
2-
import { execCheck } from './exec';
3-
import execCtx from '../exec-ctx/ctx';
42
import { Context } from 'vm';
3+
import { promisify } from 'util';
4+
5+
import execCtx from '../exec-ctx/ctx';
6+
import { execZmonScript } from './exec';
7+
58

69
export default class ZmonWorker {
7-
private queueName: string
810
private queueConn: RedisClient;
911
private execCtx: Context;
12+
private brpopAsync: Function;
13+
private queueName: string;
14+
1015
private tasks = {
1116
'check_and_notify': this.checkAndNotify,
1217
// 'trial_run': trialRun,
1318
// 'cleanup': cleanup,
14-
}
19+
};
1520

1621
constructor(client: RedisClient, queueName: string) {
1722
this.queueConn = client;
18-
this.queueName = queueName;
1923
this.execCtx = execCtx;
24+
this.brpopAsync = promisify(client.brpop).bind(client);
25+
this.queueName = queueName;
2026
}
2127

22-
startWorker() {
23-
console.log("worker started")
24-
this.consumeQueue();
25-
// while (true) {
26-
// this.consumeQueue();
27-
// }
28+
async startWorker() {
29+
console.log('worker started');
30+
let i = 0;
31+
while (true) {
32+
const res = await this.consumeQueue();
33+
const [queueName, msg] = res;
34+
const parsedMsg = JSON.parse(msg);
35+
const {check_id, entity, command} = parsedMsg.body.args[0];
36+
const alertList = parsedMsg.body.args[1];
37+
const checkResult = this.executeZmonTask(command, check_id, entity);
38+
this.storeCheckResult(check_id, entity.id, JSON.stringify(checkResult));
39+
alertList.forEach((alert: any) => {
40+
const startTime = Date.now();
41+
const {td, result: alertRes} = execZmonScript(this.execCtx, alert.condition, {value: checkResult});
42+
const alertObj = {
43+
exc: 1,
44+
downtimes: [],
45+
captures: {},
46+
start_time: startTime,
47+
worker: 'nodejs-worker',
48+
ts: Date.now(),
49+
value: alertRes,
50+
td,
51+
};
52+
if (!!alertRes) {
53+
this.storeAlertResult(alert.id, entity.id, JSON.stringify(alertObj));
54+
console.log(`alert ${alert.id} triggered on entity ${entity.id}`);
55+
} else {
56+
this.deleteAlertResult(alert.id, entity.id);
57+
}
58+
});
59+
}
2860
}
2961

30-
consumeQueue() {
31-
this.queueConn.brpop(this.queueName, 0, this.executeZmonTask);
62+
private async consumeQueue() {
63+
const checkId = '';
64+
const entityId = '';
65+
return await this.brpopAsync(this.queueName, 0);
3266
}
3367

34-
executeZmonTask(err: Error | null, res: [string, string]) {
35-
if (err !== null) {
36-
return err;
37-
}
68+
private executeZmonTask(checkScript: string, checkId: string, entity: any): any {
69+
70+
const {td, result} = execZmonScript(execCtx, checkScript, {entity});
71+
return result;
3872

39-
const [queueName, msg] = res;
40-
console.log(msg);
4173
}
4274

43-
checkAndNotify(req: any) {
75+
private checkAndNotify(req: any) {
4476
console.log(req);
4577
// const script = '';
4678
// const entity = {hello: 'world'};
4779
// const checkId = req['check_id'];
4880
// const entityId = req['entity']['id'];
4981
//
50-
// execCheck(this.execCtx, script, entity)
82+
// execZmonScript(this.execCtx, script, entity)
83+
}
84+
85+
private storeCheckResult(checkId: number, entityId: string, res: string) {
86+
const key = `zmon:checks:${checkId}:${entityId}`;
87+
this.queueConn.lpush(key, res, console.log);
88+
this.queueConn.ltrim(key, 0, 20 - 1);
89+
}
90+
91+
private storeAlertResult(alertID: number, entityID: string, value: string) {
92+
const storageName = `zmon:alerts:${alertID}:${entityID}`;
93+
94+
this.queueConn.set(storageName, value, console.log);
95+
}
96+
97+
private deleteAlertResult(alertID: number, entityID: string) {
98+
const storageName = `zmon:alerts:${alertID}:${entityID}`;
99+
100+
this.queueConn.del(storageName, console.log);
51101
}
52102
}

0 commit comments

Comments
 (0)