Skip to content

Commit 4c8dfab

Browse files
committed
wip: implementing queue length metric server
1 parent e76452e commit 4c8dfab

File tree

4 files changed

+127
-101
lines changed

4 files changed

+127
-101
lines changed

package-lock.json

Lines changed: 42 additions & 95 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"author": "imqueue.com <[email protected]> (https://imqueue.com)",
3838
"license": "GPL-3.0-only",
3939
"dependencies": {
40-
"@imqueue/core": "^2.0.11",
40+
"@imqueue/core": "file:../core",
4141
"@types/node": "^24.3.0",
4242
"acorn": "^8.15.0",
4343
"farmhash": "^4.0.2",

src/IMQRPCOptions.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,16 @@ export interface IMQAfterCall<T> {
3535
(req: IMQRPCRequest, res?: IMQRPCResponse): Promise<void>;
3636
}
3737

38+
export interface IMQMetricsServerOptions {
39+
enabled?: boolean;
40+
port?: number;
41+
queueLengthFormatter?: (length: number, metricName: string) => string,
42+
}
43+
3844
export interface IMQServiceOptions extends IMQOptions {
3945
multiProcess: boolean;
4046
childrenPerCore: number;
47+
metricsServer?: IMQMetricsServerOptions;
4148
beforeCall?: IMQBeforeCall<IMQService>;
4249
afterCall?: IMQAfterCall<IMQService>;
4350
}
@@ -65,6 +72,20 @@ export const DEFAULT_IMQ_SERVICE_OPTIONS: IMQServiceOptions = {
6572
childrenPerCore: 1,
6673
};
6774

75+
/**
76+
* Default metric server options
77+
*
78+
* @type {NonNullable<IMQMetricsServerOptions>}
79+
*/
80+
export const DEFAULT_IMQ_METRICS_SERVER_OPTIONS: NonNullable<
81+
IMQMetricsServerOptions
82+
> = {
83+
enabled: false,
84+
port: 9090,
85+
queueLengthFormatter: (length, metricName) =>
86+
`${ metricName } {} ${ length }`,
87+
};
88+
6889
/**
6990
* Default client options
7091
*

src/IMQService.ts

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ import {
4242
DEFAULT_IMQ_SERVICE_OPTIONS,
4343
AFTER_HOOK_ERROR,
4444
BEFORE_HOOK_ERROR,
45-
SIGNALS,
45+
SIGNALS, DEFAULT_IMQ_METRICS_SERVER_OPTIONS,
4646
} from '.';
4747
import * as os from 'os';
4848
import { ArgDescription } from './IMQRPCDescription';
4949
import { IMQBeforeCall, IMQAfterCall } from './IMQRPCOptions';
50+
import * as http from 'node:http';
5051

5152
const cluster: any = require('cluster');
5253

@@ -85,7 +86,7 @@ function getClassMethods(className: string): MethodsCollectionDescription {
8586
}
8687

8788
/**
88-
* Checks if given args match given args description at least by args count
89+
* Checks if given args match the given args description at least by args count
8990
*
9091
* @param {ArgDescription[]} argsInfo
9192
* @param {any[]} args
@@ -110,6 +111,7 @@ export abstract class IMQService {
110111
protected imq: IMessageQueue;
111112
protected logger: ILogger;
112113
protected cache: ICache;
114+
protected metricsServer?: http.Server<any, any>;
113115

114116
public name: string;
115117
public options: IMQServiceOptions;
@@ -130,14 +132,26 @@ export abstract class IMQService {
130132
'be instantiated directly!');
131133
}
132134

133-
this.options = { ...DEFAULT_IMQ_SERVICE_OPTIONS, ...options };
135+
this.options = {
136+
...DEFAULT_IMQ_SERVICE_OPTIONS,
137+
...options,
138+
metricsServer: {
139+
...DEFAULT_IMQ_METRICS_SERVER_OPTIONS,
140+
...(options?.metricsServer || {}),
141+
},
142+
};
134143
this.logger = this.options.logger || /* istanbul ignore next */ console;
135144
this.imq = IMQ.create(this.name, this.options);
136145

137146
this.handleRequest = this.handleRequest.bind(this);
138147

139148
SIGNALS.forEach((signal: any) => process.on(signal, async () => {
140149
this.destroy().catch(this.logger.error);
150+
151+
if (this.metricsServer) {
152+
this.metricsServer.close();
153+
}
154+
141155
// istanbul ignore next
142156
setTimeout(() => process.exit(0), IMQ_SHUTDOWN_TIMEOUT);
143157
}));
@@ -254,7 +268,7 @@ export abstract class IMQService {
254268

255269
this.describe();
256270

257-
return this.imq.start();
271+
return this.startWithMetricsServer();
258272
}
259273

260274
if (cluster.isMaster) {
@@ -287,8 +301,10 @@ export abstract class IMQService {
287301

288302
this.describe();
289303

290-
return this.imq.start();
304+
return this.startWithMetricsServer();
291305
}
306+
307+
return this.startWithMetricsServer();
292308
}
293309

294310
// noinspection JSUnusedGlobalSymbols
@@ -346,6 +362,48 @@ export abstract class IMQService {
346362
return description;
347363
}
348364

365+
private async startWithMetricsServer(): Promise<IMessageQueue | undefined> {
366+
const service = this.imq.start();
367+
const metricServerOptions = this.options.metricsServer;
368+
369+
if (!(metricServerOptions && metricServerOptions.enabled)) {
370+
return service;
371+
}
372+
373+
this.logger.log('Starting metrics server...');
374+
375+
this.metricsServer = http.createServer(async (req, res) => {
376+
if (req.url === '/metrics') {
377+
const length = await this.imq.queueLength();
378+
const content = metricServerOptions.queueLengthFormatter?.(
379+
length, 'queue_length',
380+
) || String(length);
381+
382+
res.setHeader('Content-Type', 'plain/text');
383+
res.setHeader('Content-Length', Buffer.byteLength(content));
384+
res.writeHead(200);
385+
res.end(content);
386+
387+
return;
388+
}
389+
390+
res.writeHead(404);
391+
res.end();
392+
});
393+
this.metricsServer.listen(
394+
metricServerOptions.port,
395+
'0.0.0.0',
396+
() => {
397+
this.logger.info(
398+
'%s: metrics server started on port %s',
399+
this.name,
400+
metricServerOptions.port,
401+
);
402+
},
403+
);
404+
405+
return service;
406+
}
349407
}
350408

351409
/**

0 commit comments

Comments
 (0)