Skip to content

Commit 468bf4f

Browse files
committed
adding better state awareness to conat server
- motivation: I am worried there might be a subtle bug.
1 parent af874cc commit 468bf4f

File tree

4 files changed

+49
-35
lines changed

4 files changed

+49
-35
lines changed

src/packages/backend/conat/test/cluster/node-discovery.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ describe("test automatic node discovery (and forgetting)", () => {
2525
);
2626
};
2727

28-
it("create three distinct servers with cluster support enabled", async () => {
28+
it("create two servers with cluster support enabled", async () => {
2929
await create("node0");
3030
await create("node1");
3131
});

src/packages/backend/conat/test/server/limits.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ describe("test the per user subscription limit", () => {
1010
let server;
1111

1212
it("creates a server with a subscription limit of 3", async () => {
13-
server = await createServer({ maxSubscriptionsPerClient: 3 });
13+
server = await createServer({
14+
maxSubscriptionsPerClient: 3,
15+
clusterName: undefined, // since can't bootstrap with so few subscriptions!
16+
});
1417
});
1518

1619
let client;

src/packages/backend/conat/test/setup.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import { mkdtemp, rm } from "node:fs/promises";
1313
import { tmpdir } from "node:os";
1414
import { join } from "path";
1515
import { wait } from "@cocalc/backend/conat/test/util";
16-
export { wait };
1716
import { delay } from "awaiting";
18-
export { delay };
1917
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
20-
export { once } from "@cocalc/util/async-utils";
18+
import { once } from "@cocalc/util/async-utils";
2119
import { until } from "@cocalc/util/async-utils";
2220
import { randomId } from "@cocalc/conat/names";
2321
import { isEqual } from "lodash";
2422

23+
export { wait, delay, once };
24+
2525
const logger = getLogger("conat:test:setup");
2626

2727
export const path = "/conat";
@@ -35,7 +35,11 @@ export async function initConatServer(
3535
options = { ...options, port };
3636
}
3737

38-
return createConatServer(options);
38+
const server = createConatServer(options);
39+
if (false && server.state != "ready") {
40+
await once(server, "ready");
41+
}
42+
return server;
3943
}
4044

4145
export let tempDir;
@@ -182,7 +186,7 @@ export async function waitForConsistentState(
182186
for (let j = 0; j < servers.length; j++) {
183187
if (i != j) {
184188
// @ts-ignore
185-
const link = servers[j].clusterLinks[clusterName][servers[i].id];
189+
const link = servers[j].clusterLinks[clusterName]?.[servers[i].id];
186190
if (link == null) {
187191
throw Error(`node ${j} is not connected to node ${i}`);
188192
}

src/packages/conat/core/server.ts

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
6969
import { type SysConatServer, sysApiSubject, sysApi } from "./sys";
7070
import { forkedConatServer } from "./start-server";
7171
import { stickyChoice } from "./sticky";
72+
import { EventEmitter } from "events";
7273

7374
const logger = getLogger("conat:core:server");
7475

@@ -151,9 +152,9 @@ export interface Options {
151152
localClusterSize?: number;
152153
}
153154

154-
type State = "ready" | "closed";
155+
type State = "init" | "ready" | "closed";
155156

156-
export class ConatServer {
157+
export class ConatServer extends EventEmitter {
157158
public readonly io;
158159
public readonly id: string;
159160

@@ -165,7 +166,7 @@ export class ConatServer {
165166
private sockets: { [id: string]: any } = {};
166167
private stats: { [id: string]: ConnectionStats } = {};
167168
private usage: UsageMonitor;
168-
public state: State = "ready";
169+
public state: State = "init";
169170

170171
private subscriptions: { [socketId: string]: Set<string> } = {};
171172
private interest: Patterns<{ [queue: string]: Set<string> }> = new Patterns();
@@ -183,6 +184,7 @@ export class ConatServer {
183184
private queuedClusterUpdates: Update[] = [];
184185

185186
constructor(options: Options) {
187+
super();
186188
const {
187189
httpServer,
188190
port = 3000,
@@ -266,13 +268,26 @@ export class ConatServer {
266268
}
267269
this.initUsage();
268270
this.io.on("connection", this.handleSocket);
271+
this.init();
272+
}
273+
274+
private setState = (state: State) => {
275+
if (this.state == state) return;
276+
this.emit(state);
277+
this.state = state;
278+
};
279+
280+
private isClosed = () => this.state == "closed";
281+
282+
private init = async () => {
269283
if (this.options.systemAccountPassword) {
270-
this.initSystemService();
284+
await this.initSystemService();
271285
}
272286
if (this.clusterName) {
273-
this.initCluster();
287+
await this.initCluster();
274288
}
275-
}
289+
this.setState("ready");
290+
};
276291

277292
private initUsage = () => {
278293
this.usage = new UsageMonitor({
@@ -287,17 +302,17 @@ export class ConatServer {
287302
// thought at all about what to do here, really.
288303
// Hopefully experience can teach us.
289304
isHealthy = () => {
290-
if (this.state == "closed") {
305+
if (this.isClosed()) {
291306
return false;
292307
}
293308
return true;
294309
};
295310

296311
close = async () => {
297-
if (this.state == "closed") {
312+
if (this.isClosed()) {
298313
return;
299314
}
300-
this.state = "closed";
315+
this.setState("closed");
301316

302317
if (this.clusterStreams != null) {
303318
for (const name in this.clusterStreams) {
@@ -414,7 +429,7 @@ export class ConatServer {
414429
///////////////////////////////////////
415430

416431
private updateInterest = async (interest: InterestUpdate) => {
417-
if (this.state != "ready") return;
432+
if (this.isClosed()) return;
418433
// publish to the stream
419434
this.updateClusterStream({ interest });
420435
// update our local state
@@ -812,7 +827,7 @@ export class ConatServer {
812827
const count = await this.publish({ subject, data, from: user });
813828
respond?.({ count });
814829
} catch (err) {
815-
console.log(this.id, "ERROR", err);
830+
// console.log(this.id, "ERROR", err);
816831
if (err.code == 403) {
817832
socket.emit("permission", {
818833
message: err.message,
@@ -1016,11 +1031,11 @@ export class ConatServer {
10161031
}
10171032
this.log(`Cluster autoscan interval ${this.options.autoscanInterval}ms`);
10181033
let lastCount = 1;
1019-
while (this.state != "closed") {
1034+
while (!this.isClosed()) {
10201035
let x;
10211036
try {
10221037
x = await this.scan();
1023-
if (this.state == ("closed" as any)) return;
1038+
if (this.isClosed()) return;
10241039
} catch (err) {
10251040
// this should never happen unless there is a serious bug (?).
10261041
this.log(`WARNING/BUG?: serious problem scanning -- ${err}`);
@@ -1048,7 +1063,7 @@ export class ConatServer {
10481063

10491064
private scanSoon = throttle(
10501065
async () => {
1051-
if (this.state == "closed" || !this.options.autoscanInterval) {
1066+
if (this.isClosed() || !this.options.autoscanInterval) {
10521067
return;
10531068
}
10541069
let x;
@@ -1278,7 +1293,7 @@ export class ConatServer {
12781293
// the number of links created (count), so if it returns 0 when called on all nodes, then
12791294
// we're done until new nodes are added.
12801295
scan = reuseInFlight(async (): Promise<{ count: number; errors: any[] }> => {
1281-
if (this.state == "closed") {
1296+
if (this.isClosed()) {
12821297
return { count: 0, errors: [] };
12831298
}
12841299
const knownByUs = new Set(this.clusterAddresses(this.clusterName));
@@ -1296,7 +1311,7 @@ export class ConatServer {
12961311
const knownByRemoteNode = new Set(
12971312
await sys.clusterAddresses(this.clusterName),
12981313
);
1299-
if (this.state == "closed") return;
1314+
if (this.isClosed()) return;
13001315
for (const address of knownByRemoteNode) {
13011316
if (!knownByUs.has(address)) {
13021317
unknownToUs.add(address);
@@ -1305,7 +1320,7 @@ export class ConatServer {
13051320
if (!knownByRemoteNode.has(this.address())) {
13061321
// we know about them, but they don't know about us, so ask them to link to us.
13071322
await sys.join(this.address());
1308-
if (this.state == ("closed" as any)) return;
1323+
if (this.isClosed()) return;
13091324
count += 1;
13101325
}
13111326
} catch (err) {
@@ -1338,7 +1353,7 @@ export class ConatServer {
13381353
})
13391354
.map((link) => f(link.client)),
13401355
);
1341-
if (unknownToUs.size == 0 || this.state == ("closed" as any)) {
1356+
if (unknownToUs.size == 0 || this.isClosed()) {
13421357
return { count, errors };
13431358
}
13441359

@@ -1471,11 +1486,7 @@ export class ConatServer {
14711486
timeout = MAX_INTEREST_TIMEOUT;
14721487
}
14731488
const start = Date.now();
1474-
while (
1475-
this.state != "closed" &&
1476-
this.sockets[socketId] &&
1477-
!signal?.aborted
1478-
) {
1489+
while (!this.isClosed() && this.sockets[socketId] && !signal?.aborted) {
14791490
if (Date.now() - start >= timeout) {
14801491
throw Error("timeout");
14811492
}
@@ -1485,11 +1496,7 @@ export class ConatServer {
14851496
} catch {
14861497
continue;
14871498
}
1488-
if (
1489-
(this.state as any) == "closed" ||
1490-
!this.sockets[socketId] ||
1491-
signal?.aborted
1492-
) {
1499+
if (this.isClosed() || !this.sockets[socketId] || signal?.aborted) {
14931500
return false;
14941501
}
14951502
const hasMatch = this.interest.hasMatch(subject);

0 commit comments

Comments
 (0)