Skip to content

Commit 8074630

Browse files
committed
rewrite conat router scanning to use k8s api instead of dns
- less secure and bigger dep, but it seems to be the only way to do this right.
1 parent 520ac2c commit 8074630

File tree

4 files changed

+54
-15
lines changed

4 files changed

+54
-15
lines changed

src/packages/conat/core/cluster.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ class ClusterLink {
147147
// @ts-ignore
148148
delete this.streams;
149149
}
150+
this.client.close();
151+
// @ts-ignore
152+
delete this.client;
150153
};
151154

152155
hasInterest = (subject) => {

src/packages/conat/core/server.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@ const DEFAULT_AUTOSCAN_INTERVAL = 15_000;
8181
const DEFAULT_LONG_AUTOSCAN_INTERVAL = 60_000;
8282

8383
// If a cluster node has been disconnected for this long,
84-
// unjoin it, thus freeing the stream tracking its state
85-
// and also if it comes back it will have to explicitly
84+
// unjoin it, thus freeing the streams tracking its state (so memory),
85+
// If it comes back it will have to explicitly
8686
// join the cluster again. This is primarily to not leak RAM
87-
// when nodes are removed on purpose. Supercluster nodes
88-
// are never automatically forgetton.
89-
const DEFAULT_FORGET_CLUSTER_NODE_INTERVAL = 30 * 60_000; // 30 minutes
87+
// when nodes are removed on purpose, and also avoid a cluttered log
88+
// of constant reconnect errors. Supercluster nodes are never
89+
// automatically forgotten.
90+
const DEFAULT_FORGET_CLUSTER_NODE_INTERVAL = 30 * 60_000; // 30 minutes by default
9091

9192
const DEBUG = false;
9293

@@ -1469,7 +1470,9 @@ export class ConatServer extends EventEmitter {
14691470
return await f;
14701471
} catch (err) {
14711472
if (!done) {
1472-
logger.debug(`WARNING: waitForInterestInLinks -- ${err}`);
1473+
logger.debug(
1474+
`WARNING: waitForInterestInLinks ${subject} -- ${err}`,
1475+
);
14731476
}
14741477
}
14751478
return false;

src/packages/server/conat/socketio/dns-scan.ts

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import { lookup } from "dns/promises";
1010
import port from "@cocalc/backend/port";
1111
import { hostname } from "node:os";
1212
import { getLogger } from "@cocalc/backend/logger";
13+
import { executeCode } from "@cocalc/backend/execute-code";
14+
import { split } from "@cocalc/util/misc";
1315

14-
const SCAN_INTERVAL = 15_000;
16+
export const SCAN_INTERVAL = 15_000;
1517

1618
const logger = getLogger("conat:socketio:dns-scan");
1719

@@ -64,15 +66,41 @@ export async function localAddress(): Promise<string> {
6466
return address;
6567
}
6668

69+
/*
70+
71+
hub@hub-conat-router-5cbc9576f-44sl2:/tmp$ hostname
72+
hub-conat-router-5cbc9576f-44sl2
73+
74+
# figured this out by reading the docs at https://kubernetes.io/docs/reference/kubectl/jsonpath/
75+
76+
hub@hub-conat-router-5cbc9576f-44sl2:/tmp$ kubectl get pods -l run=hub-conat-router -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.podIP}{"\n"}{end}'
77+
hub-conat-router-5cbc9576f-44sl2 192.168.39.103
78+
hub-conat-router-5cbc9576f-n99x7 192.168.236.174
79+
*/
80+
6781
export async function getAddresses(): Promise<string[]> {
6882
const v: string[] = [];
69-
const self = await localAddress();
70-
for (const { address } of await lookup(
71-
process.env.COCALC_SERVICE ?? "hub-conat-router",
72-
{ all: true },
73-
)) {
74-
if (address == self) continue;
75-
v.push(`http://${address}:${port}`);
83+
const h = hostname();
84+
const i = h.lastIndexOf("-");
85+
const prefix = h.slice(0, i);
86+
const { stdout } = await executeCode({
87+
command: "kubectl",
88+
args: [
89+
"get",
90+
"pods",
91+
"-l",
92+
"run=hub-conat-router",
93+
"-o",
94+
`jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.podIP}{"\n"}{end}'`,
95+
],
96+
});
97+
for (const x of stdout.split("\n")) {
98+
const row = split(x);
99+
if (row.length == 2) {
100+
if (row[0] != h && row[0].startsWith(prefix)) {
101+
v.push(`http://${row[1]}:${port}`);
102+
}
103+
}
76104
}
77105
return v;
78106
}

src/packages/server/conat/socketio/server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import port from "@cocalc/backend/port";
4848
import { join } from "path";
4949
import "@cocalc/backend/conat";
5050
import "@cocalc/backend/conat/persist"; // initializes context
51-
import { dnsScan, localAddress } from "./dns-scan";
51+
import { dnsScan, localAddress, SCAN_INTERVAL } from "./dns-scan";
5252
import { health } from "./health";
5353
import { hostname } from "node:os";
5454
import { getLogger } from "@cocalc/backend/logger";
@@ -88,6 +88,11 @@ export async function init(
8888
if (!opts.id) {
8989
opts.id = hostname().split("-").slice(-1)[0];
9090
}
91+
// make this very short in k8s because we use the k8s api to get
92+
// the exact nodes frequently, so even if there was a temporary split
93+
// brain and each side stopped trying to connect to the other side,
94+
// things would get fixed by k8s within SCAN_INTERVAL.
95+
opts.forgetClusterNodeInterval = 4 * SCAN_INTERVAL;
9196
const server = createConatServer(opts);
9297
// enable dns scanner
9398
dnsScan(server);

0 commit comments

Comments
 (0)