Skip to content

Commit 1671cda

Browse files
committed
clean up structure
1 parent 8f8635e commit 1671cda

File tree

9 files changed

+185
-100
lines changed

9 files changed

+185
-100
lines changed

src/app/app.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { RedisService } from '../services/redis-service';
2+
import ZmonWorker from '../worker/zmon-worker';
3+
import { config } from '../config/config';
4+
import ApiServer from '../server/server';
5+
6+
export default class App {
7+
redisService: RedisService;
8+
zmonWorker: ZmonWorker;
9+
server: ApiServer;
10+
11+
constructor() {
12+
this.redisService = new RedisService();
13+
this.server = new ApiServer(this.redisService);
14+
this.zmonWorker = new ZmonWorker(this.redisService, config.defaultQueue);
15+
}
16+
17+
run() {
18+
this.server.startServer();
19+
this.zmonWorker.startWorker();
20+
}
21+
}

src/index.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,8 @@
1-
import { getQueueClient } from './redis/redisHandler';
2-
import { startServer } from './server/server';
3-
import ZmonWorker from './worker/zmon-worker';
4-
import { config } from './config/config';
1+
import App from './app/app';
52

6-
// start health endpoint server
7-
const redisClient = getQueueClient(config.redisHost, config.redisPort);
8-
9-
const cb = () => {
10-
console.log('\nterminate Redis connection');
11-
redisClient.end();
3+
function main() {
4+
const app = new App();
5+
app.run();
126
}
137

14-
startServer(cb);
15-
16-
// start worker
17-
const w = new ZmonWorker(redisClient, config.defaultQueue);
18-
w.startWorker();
8+
main();

src/plugins/http.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import fetch from 'node-fetch';
22

3-
const rp = require('request-promise');
4-
import { get } from 'http';
5-
63
export async function http(url: string, Oauth?: boolean) {
7-
return await fetch(url).then((res: any) => res.json())
4+
return await fetch(url).then((res: any) => res.json());
85
}

src/redis/redisHandler.ts

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/server/server.ts

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,47 @@
1-
import {createServer} from 'http';
2-
import express = require('express')
3-
import terminus = require('@godaddy/terminus')
4-
import { createTerminus } from '@godaddy/terminus';
1+
import { createServer } from 'http';
2+
import { createTerminus, TerminusOptions } from '@godaddy/terminus';
3+
import { Express } from 'express';
4+
import express = require('express');
5+
import { RedisService } from '../services/redis-service';
6+
7+
export default class ApiServer {
8+
app: Express;
9+
10+
constructor(private redisService: RedisService) {
11+
this.app = express();
12+
this.setupRoutes();
13+
}
14+
15+
startServer() {
16+
const server = createServer(this.app);
17+
const opt: TerminusOptions = {
18+
signal: 'SIGINT',
19+
healthChecks: {'/healthcheck': this.onHealthCheck},
20+
onSignal: this.onSignal,
21+
};
522

6-
const app = express()
23+
createTerminus(server, opt);
24+
server.listen(3000);
25+
}
726

8-
app.get('/', (req, res) => {
9-
res.send('ok')
10-
});
1127

12-
const server = createServer(app)
28+
private setupRoutes() {
29+
this.app.get('/', (req, res) => {
30+
res.send('ok');
31+
});
32+
}
1333

14-
function onSignal(cb: () => any): () => Promise<any>{
15-
console.log('server is starting cleanup');
16-
return function (): Promise<any> {
34+
private onSignal() {
35+
console.log('server is starting cleanup');
1736
return Promise.all([
18-
cb()
37+
this.redisService.terminate()
1938
]);
2039
}
21-
}
22-
23-
async function onHealthCheck() {
24-
// checks if the system is healthy, like the db connection is live
25-
// resolves, if health, rejects if not
26-
}
2740

28-
export function startServer(cb: () => any) {
29-
createTerminus(server, {
30-
signal: 'SIGINT',
31-
healthChecks: { '/healthcheck': onHealthCheck },
32-
onSignal: onSignal(cb),
33-
});
34-
35-
server.listen(3000)
41+
private onHealthCheck(): Promise<any> {
42+
console.log('checking worker health');
43+
return Promise.all([
44+
this.redisService.checkHealth()
45+
]);
46+
}
3647
}
37-
38-

src/services/redis-service.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { createClient, RedisClient } from 'redis';
2+
import { config } from '../config/config';
3+
import { promisify } from 'util';
4+
5+
6+
export class RedisService {
7+
private redisClient: RedisClient;
8+
9+
// promisified redis client commands
10+
private brpopAsync: Function;
11+
private pingAsync: Function;
12+
13+
constructor() {
14+
this.redisClient = this.getRedisClient();
15+
this.brpopAsync = promisify(this.redisClient.brpop).bind(this.redisClient);
16+
this.pingAsync = promisify(this.redisClient.ping).bind(this.redisClient);
17+
18+
}
19+
20+
async checkRedisConnection(): Promise<boolean> {
21+
return await this.pingAsync();
22+
}
23+
24+
private getRedisClient() {
25+
let client: RedisClient;
26+
const {redisHost, redisPort} = config;
27+
28+
if (redisHost && redisPort) {
29+
client = createClient(redisPort, redisHost);
30+
} else {
31+
client = createClient();
32+
}
33+
34+
client.on('error', err => {
35+
console.log("ERR");
36+
console.error(err);
37+
});
38+
39+
return client;
40+
}
41+
42+
async terminate(): Promise<boolean> {
43+
return await this.redisClient.quit();
44+
}
45+
46+
async checkHealth(): Promise<boolean> {
47+
return await this.redisClient.ping();
48+
}
49+
50+
registerWorker(workerName: string) {
51+
this.redisClient.sadd('zmon:metrics', workerName);
52+
}
53+
54+
async getTask(queueName: string) {
55+
return await this.brpopAsync(queueName, 0);
56+
}
57+
58+
deleteAlert(alertKey: string) {
59+
this.redisClient.del(alertKey)
60+
}
61+
62+
setAlert(alertKey: string, value: any) {
63+
this.redisClient.set(alertKey, value);
64+
}
65+
66+
storeCheckResult(key: string, value: string) {
67+
this.redisClient.lpush(key, value)
68+
}
69+
}
File renamed without changes.

src/worker/exec.spec.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ describe('Worker', () => {
88
const entity: any = {type: 'kube_pod'};
99
const check: string = `
1010
function check() {
11-
return http("http://cool.website.com")
11+
return plugin("http://cool.website.com")
1212
}
1313
`;
1414

@@ -24,8 +24,8 @@ describe('Worker', () => {
2424
`;
2525
const ctx = {};
2626

27-
const res = execZmonScript(ctx, check, {entity});
28-
expect(res).toEqual('kube_pod');
27+
const { result } = execZmonScript(ctx, check, {entity});
28+
expect(result).toEqual('kube_pod');
2929
});
3030

3131
it('should fail executing the check because it is trying to access fields from an undefined object', () => {
@@ -62,14 +62,14 @@ describe('Worker', () => {
6262
`;
6363
const ctx = {};
6464

65-
const res = execZmonScript(ctx, check, {entity});
66-
expect(res).toEqual({type: entity.type, team: entity.team});
65+
const { result } = execZmonScript(ctx, check, {entity});
66+
expect(result).toEqual({type: entity.type, team: entity.team});
6767
});
6868

6969
it('should make an http call', () => {
70-
const ctx = {http: mockHttp}
71-
const res = execZmonScript(ctx, check, {entity});
72-
expect(res).toEqual({response: true});
70+
const ctx = {plugin: mockHttp}
71+
const { result } = execZmonScript(ctx, check, {entity});
72+
expect(result).toEqual({response: true});
7373
});
7474

7575
it('should fail executing the check because eval should not be used', () => {

src/worker/zmon-worker.ts

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,75 @@ import { promisify } from 'util';
44

55
import execCtx from '../exec-ctx/ctx';
66
import { execZmonScript } from './exec';
7+
import { hostname } from "os";
8+
import { RedisService } from '../services/redis-service';
79

810

911
export default class ZmonWorker {
10-
private queueConn: RedisClient;
1112
private execCtx: Context;
12-
private brpopAsync: Function;
13-
private queueName: string;
13+
private workerName: string;
1414

1515
private tasks = {
1616
'check_and_notify': this.checkAndNotify,
1717
// 'trial_run': trialRun,
1818
// 'cleanup': cleanup,
1919
};
2020

21-
constructor(client: RedisClient, queueName: string) {
22-
this.queueConn = client;
21+
constructor(private redisService: RedisService, private queueName: string) {
2322
this.execCtx = execCtx;
24-
this.brpopAsync = promisify(client.brpop).bind(client);
23+
2524
this.queueName = queueName;
25+
this.workerName = `plocal.${hostname()}`;
26+
this.registerWorker();
27+
}
28+
29+
private registerWorker() {
30+
this.redisService.registerWorker(this.workerName)
2631
}
2732

2833
async startWorker() {
2934
console.log('worker started');
3035
let i = 0;
3136
while (true) {
3237
const res = await this.consumeQueue();
33-
const [queueName, msg] = res;
38+
const [ _, msg ] = res;
3439
const parsedMsg = JSON.parse(msg);
3540
const {check_id, entity, command} = parsedMsg.body.args[0];
3641
const alertList = parsedMsg.body.args[1];
42+
3743
const checkResult = this.executeZmonTask(command, check_id, entity);
44+
3845
this.storeCheckResult(check_id, entity.id, JSON.stringify(checkResult));
46+
3947
alertList.forEach((alert: any) => {
40-
const startTime = Date.now();
41-
const {td, result: alertRes} = execZmonScript(this.execCtx, alert.condition, {value: checkResult});
42-
const alertObj = {
43-
exc: 1,
44-
downtimes: [],
45-
captures: {},
46-
start_time: startTime,
47-
worker: 'nodejs-worker',
48-
ts: Date.now(),
49-
value: alertRes,
50-
td,
51-
};
52-
console.log(alertObj);
53-
if (!!alertRes) {
54-
this.storeAlertResult(alert.id, entity.id, JSON.stringify(alertObj));
55-
console.log(`alert ${alert.id} triggered on entity ${entity.id}`);
56-
} else {
57-
this.deleteAlertResult(alert.id, entity.id);
58-
}
48+
this.handleAlerts(alert, checkResult, entity);
5949
});
6050
}
6151
}
6252

53+
private handleAlerts(alert: any, checkResult: any, entity: any) {
54+
const startTime = Date.now();
55+
const {td, result: alertRes} = execZmonScript(this.execCtx, alert.condition, {value: checkResult});
56+
const alertObj = {
57+
exc: 1,
58+
downtimes: [],
59+
captures: {},
60+
start_time: startTime,
61+
worker: 'nodejs-worker',
62+
ts: Date.now(),
63+
value: alertRes,
64+
td,
65+
};
66+
if (!!alertRes) {
67+
this.storeAlertResult(alert.id, entity.id, JSON.stringify(alertObj));
68+
console.log(`alert ${alert.id} triggered on entity ${entity.id}`);
69+
} else {
70+
this.deleteAlertResult(alert.id, entity.id);
71+
}
72+
}
73+
6374
private async consumeQueue() {
64-
const checkId = '';
65-
const entityId = '';
66-
return await this.brpopAsync(this.queueName, 0);
75+
return await this.redisService.getTask(this.queueName);
6776
}
6877

6978
private executeZmonTask(checkScript: string, checkId: string, entity: any): any {
@@ -82,21 +91,21 @@ export default class ZmonWorker {
8291
// execZmonScript(this.execCtx, script, entity)
8392
}
8493

85-
private storeCheckResult(checkId: number, entityId: string, res: string) {
94+
private storeCheckResult(checkId: number, entityId: string, value: string) {
8695
const key = `zmon:checks:${checkId}:${entityId}`;
87-
this.queueConn.lpush(key, res, console.log);
88-
this.queueConn.ltrim(key, 0, 20 - 1);
96+
97+
this.redisService.storeCheckResult(key, value);
8998
}
9099

91100
private storeAlertResult(alertID: number, entityID: string, value: string) {
92101
const storageName = `zmon:alerts:${alertID}:${entityID}`;
93102

94-
this.queueConn.set(storageName, value, console.log);
103+
this.redisService.setAlert(storageName, value);
95104
}
96105

97106
private deleteAlertResult(alertID: number, entityID: string) {
98107
const storageName = `zmon:alerts:${alertID}:${entityID}`;
99108

100-
this.queueConn.del(storageName, console.log);
109+
this.redisService.deleteAlert(storageName);
101110
}
102111
}

0 commit comments

Comments
 (0)