Skip to content

Commit 194aa2d

Browse files
authored
Merge pull request #8451 from sagemathinc/conat-router-metrics
conat/server: add prometheus metrics
2 parents 048b6a1 + 39258f4 commit 194aa2d

File tree

6 files changed

+179
-55
lines changed

6 files changed

+179
-55
lines changed

src/packages/conat/core/server.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import { type SysConatServer, sysApiSubject, sysApi } from "./sys";
7474
import { forkedConatServer } from "./start-server";
7575
import { stickyChoice } from "./sticky";
7676
import { EventEmitter } from "events";
77+
import { Metrics } from "../types";
7778

7879
const logger = getLogger("conat:core:server");
7980

@@ -310,6 +311,10 @@ export class ConatServer extends EventEmitter {
310311
});
311312
};
312313

314+
public getUsage = (): Metrics => {
315+
return this.usage.getMetrics();
316+
};
317+
313318
// this is for the Kubernetes health check -- I haven't
314319
// thought at all about what to do here, really.
315320
// Hopefully experience can teach us.
@@ -602,7 +607,9 @@ export class ConatServer extends EventEmitter {
602607
return;
603608
}
604609
if (!(await this.isAllowed({ user, subject, type: "sub" }))) {
605-
const message = `permission denied subscribing to '${subject}' from ${JSON.stringify(user)}`;
610+
const message = `permission denied subscribing to '${subject}' from ${JSON.stringify(
611+
user,
612+
)}`;
606613
this.log(message);
607614
throw new ConatError(message, {
608615
code: 403,
@@ -706,7 +713,9 @@ export class ConatServer extends EventEmitter {
706713
}
707714

708715
if (!(await this.isAllowed({ user: from, subject, type: "pub" }))) {
709-
const message = `permission denied publishing to '${subject}' from ${JSON.stringify(from)}`;
716+
const message = `permission denied publishing to '${subject}' from ${JSON.stringify(
717+
from,
718+
)}`;
710719
this.log(message);
711720
throw new ConatError(message, {
712721
// this is the http code for permission denied, and having this
@@ -950,7 +959,9 @@ export class ConatServer extends EventEmitter {
950959
return;
951960
}
952961
if (!(await this.isAllowed({ user, subject, type: "pub" }))) {
953-
const message = `permission denied waiting for interest in '${subject}' from ${JSON.stringify(user)}`;
962+
const message = `permission denied waiting for interest in '${subject}' from ${JSON.stringify(
963+
user,
964+
)}`;
954965
this.log(message);
955966
respond({ error: message, code: 403 });
956967
}
@@ -1791,7 +1802,9 @@ export function updateSticky(update: StickyUpdate, sticky: Sticky): boolean {
17911802
function getServerAddress(options: Options) {
17921803
const port = options.port;
17931804
const path = options.path?.slice(0, -"/conat".length) ?? "";
1794-
return `http${options.ssl || port == 443 ? "s" : ""}://${options.clusterIpAddress ?? "localhost"}:${port}${path}`;
1805+
return `http${options.ssl || port == 443 ? "s" : ""}://${
1806+
options.clusterIpAddress ?? "localhost"
1807+
}:${port}${path}`;
17951808
}
17961809

17971810
/*

src/packages/conat/monitor/usage.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import json from "json-stable-stringify";
21
import { EventEmitter } from "events";
3-
import type { JSONValue } from "@cocalc/util/types";
4-
import { ConatError } from "@cocalc/conat/core/client";
2+
import json from "json-stable-stringify";
3+
54
import { getLogger } from "@cocalc/conat/client";
5+
import { ConatError } from "@cocalc/conat/core/client";
6+
import type { JSONValue } from "@cocalc/util/types";
7+
import { Metrics } from "../types";
68

79
const logger = getLogger("monitor:usage");
810

@@ -17,6 +19,9 @@ export class UsageMonitor extends EventEmitter {
1719
private options: Options;
1820
private total = 0;
1921
private perUser: { [user: string]: number } = {};
22+
// metrics will be picked up periodically and exposed via e.g. prometheus
23+
private countDeny = 0;
24+
private metrics: Metrics = {};
2025

2126
constructor(options: Options) {
2227
super();
@@ -38,27 +43,53 @@ export class UsageMonitor extends EventEmitter {
3843

3944
private initLogging = () => {
4045
const { log } = this.options;
41-
if (log == null) {
42-
return;
43-
}
46+
47+
// Record metrics for all events (even if logging is disabled)
4448
this.on("total", (total, limit) => {
45-
log("usage", this.options.resource, { total, limit });
49+
this.metrics["total:count"] = total;
50+
this.metrics["total:limit"] = limit;
51+
if (log) {
52+
log("usage", this.options.resource, { total, limit });
53+
}
4654
});
4755
this.on("add", (user, count, limit) => {
48-
log("usage", this.options.resource, "add", { user, count, limit });
56+
// this.metrics["add:count"] = count;
57+
// this.metrics["add:limit"] = limit;
58+
if (log) {
59+
log("usage", this.options.resource, "add", { user, count, limit });
60+
}
4961
});
5062
this.on("delete", (user, count, limit) => {
51-
log("usage", this.options.resource, "delete", { user, count, limit });
63+
// this.metrics["delete:count"] = count;
64+
// this.metrics["delete:limit"] = limit;
65+
if (log) {
66+
log("usage", this.options.resource, "delete", { user, count, limit });
67+
}
5268
});
5369
this.on("deny", (user, limit, type) => {
54-
log("usage", this.options.resource, "not allowed due to hitting limit", {
55-
type,
56-
user,
57-
limit,
58-
});
70+
this.countDeny += 1;
71+
this.metrics["deny:count"] = this.countDeny;
72+
this.metrics["deny:limit"] = limit;
73+
if (log) {
74+
log(
75+
"usage",
76+
this.options.resource,
77+
"not allowed due to hitting limit",
78+
{
79+
type,
80+
user,
81+
limit,
82+
},
83+
);
84+
}
5985
});
6086
};
6187

88+
// we return a copy
89+
getMetrics = () => {
90+
return { ...this.metrics };
91+
};
92+
6293
add = (user: JSONValue) => {
6394
const u = this.toJson(user);
6495
let count = this.perUser[u] ?? 0;

src/packages/conat/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@ export interface Location {
99

1010
path?: string;
1111
}
12+
13+
type EventType = "total" | "add" | "delete" | "deny";
14+
type ValueType = "count" | "limit";
15+
type MetricKey = `${EventType}:${ValueType}`;
16+
export type Metrics = { [K in MetricKey]?: number };

src/packages/server/conat/socketio/health.ts

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,23 @@ Health check for use in Kubernetes.
33
*/
44

55
import type { ConatServer } from "@cocalc/conat/core/server";
6-
import { conatClusterHealthPort } from "@cocalc/backend/data";
7-
import { createServer } from "http";
6+
import type { IncomingMessage, ServerResponse } from "http";
87
import { getLogger } from "@cocalc/backend/logger";
98

109
const logger = getLogger("conat:socketio:health");
1110

12-
export async function health(server: ConatServer) {
13-
logger.debug(
14-
`starting /health socketio server on port ${conatClusterHealthPort}`,
15-
);
16-
const healthServer = createServer();
17-
healthServer.listen(conatClusterHealthPort);
18-
healthServer.on("request", (req, res) => {
19-
if (req.method === "GET") {
20-
if (server.isHealthy()) {
21-
res.statusCode = 200;
22-
res.end("healthy");
23-
} else {
24-
res.statusCode = 500;
25-
res.end("Unhealthy");
26-
}
27-
}
28-
});
11+
export function handleHealth(
12+
server: ConatServer,
13+
_req: IncomingMessage,
14+
res: ServerResponse,
15+
) {
16+
const healthy = server.isHealthy();
17+
logger.debug("/health reporting conat is healthy=${healthy}");
18+
if (healthy) {
19+
res.statusCode = 200;
20+
res.end("healthy");
21+
} else {
22+
res.statusCode = 500;
23+
res.end("Unhealthy");
24+
}
2925
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Metrics endpoint for use in Kubernetes.
3+
*/
4+
5+
import { delay } from "awaiting";
6+
import type { IncomingMessage, ServerResponse } from "http";
7+
import { Gauge, register } from "prom-client";
8+
9+
import { getLogger } from "@cocalc/backend/logger";
10+
import type { ConatServer } from "@cocalc/conat/core/server";
11+
import { Metrics } from "@cocalc/conat/types";
12+
13+
const logger = getLogger("conat:socketio:metrics");
14+
15+
const DELAY_MS = 10_000;
16+
17+
const usageMetric = new Gauge({
18+
name: "cocalc_conat_usage",
19+
help: "Conat server usage metrics",
20+
labelNames: ["event", "value"],
21+
});
22+
23+
// periodically grab the metrics and set the Gauge, avoids an event emitter callback memory leak
24+
export async function initMetrics(server: ConatServer) {
25+
logger.debug("metrics endpoint initialized");
26+
27+
await delay(DELAY_MS);
28+
while (server.state != "closed") {
29+
try {
30+
const usage: Metrics = server.getUsage();
31+
if (usage) {
32+
for (const [key, val] of Object.entries(usage)) {
33+
const [event, value] = key.split(":");
34+
usageMetric.set({ event, value }, val);
35+
}
36+
}
37+
} catch (err) {
38+
logger.debug(`WARNING: error retrieving metrics -- ${err}`);
39+
}
40+
await delay(DELAY_MS);
41+
}
42+
}
43+
44+
export async function handleMetrics(
45+
_req: IncomingMessage,
46+
res: ServerResponse,
47+
) {
48+
try {
49+
const metricsData = await register.metrics();
50+
res.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
51+
res.statusCode = 200;
52+
res.end(metricsData);
53+
} catch (error) {
54+
logger.error("Error getting metrics:", error);
55+
res.statusCode = 500;
56+
res.end("Internal server error");
57+
}
58+
}

src/packages/server/conat/socketio/server.ts

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,31 @@ Then in another terminal, make a client connected to each:
3131
3232
*/
3333

34+
import { hostname } from "node:os";
35+
import { join } from "node:path";
36+
37+
import basePath from "@cocalc/backend/base-path";
38+
import "@cocalc/backend/conat";
39+
import "@cocalc/backend/conat/persist"; // initializes context
3440
import {
35-
init as createConatServer,
36-
type Options,
37-
} from "@cocalc/conat/core/server";
38-
import { getUser, isAllowed } from "./auth";
39-
import { secureRandomString } from "@cocalc/backend/misc";
40-
import {
41+
conatClusterName as clusterName,
42+
conatClusterHealthPort,
43+
conatClusterPort,
4144
conatPassword,
4245
conatSocketioCount,
43-
conatClusterPort,
44-
conatClusterName as clusterName,
4546
} from "@cocalc/backend/data";
46-
import basePath from "@cocalc/backend/base-path";
47+
import { getLogger } from "@cocalc/backend/logger";
48+
import { secureRandomString } from "@cocalc/backend/misc";
4749
import port from "@cocalc/backend/port";
48-
import { join } from "path";
49-
import "@cocalc/backend/conat";
50-
import "@cocalc/backend/conat/persist"; // initializes context
50+
import type { ConatServer } from "@cocalc/conat/core/server";
51+
import {
52+
init as createConatServer,
53+
type Options,
54+
} from "@cocalc/conat/core/server";
55+
import { getUser, isAllowed } from "./auth";
5156
import { dnsScan, localAddress, SCAN_INTERVAL } from "./dns-scan";
52-
import { health } from "./health";
53-
import { hostname } from "node:os";
54-
import { getLogger } from "@cocalc/backend/logger";
57+
import { handleHealth } from "./health";
58+
import { initMetrics, handleMetrics } from "./metrics";
5559

5660
const logger = getLogger("conat-server");
5761

@@ -95,9 +99,8 @@ export async function init(
9599
opts.forgetClusterNodeInterval = 4 * SCAN_INTERVAL;
96100
const server = createConatServer(opts);
97101
// enable dns scanner
98-
dnsScan(server);
99-
// enable health checks
100-
health(server);
102+
dnsScan(server); // we don't await it, it runs forever
103+
await startVitalsServer(server);
101104
return server;
102105
}
103106

@@ -115,3 +118,21 @@ export async function init(
115118
});
116119
}
117120
}
121+
122+
async function startVitalsServer(server: ConatServer) {
123+
// create shared HTTP server for health and metrics endpoints
124+
const { createServer } = await import("http");
125+
const vitalsServer = createServer((req, res) => {
126+
if (req.method === "GET" && req.url === "/health") {
127+
handleHealth(server, req, res);
128+
} else if (req.method === "GET" && req.url === "/metrics") {
129+
handleMetrics(req, res);
130+
} else {
131+
res.statusCode = 404;
132+
res.end("Not Found");
133+
}
134+
});
135+
vitalsServer.listen(conatClusterHealthPort);
136+
logger.debug(`starting vitals server on port ${conatClusterHealthPort}`);
137+
initMetrics(server); // start prometheus metrics collection
138+
}

0 commit comments

Comments
 (0)