Skip to content

Commit c726df4

Browse files
committed
conat cluster: timeout adding new node to cluster
- before it would just keep trying to connect to a nonexisting node forever. That didn't break anything, but was annoying in the logs.
1 parent 3b8f81a commit c726df4

File tree

7 files changed

+107
-56
lines changed

7 files changed

+107
-56
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { before, after } from "@cocalc/backend/conat/test/setup";
2+
import { createClusterNode } from "./util";
3+
import getPort from "@cocalc/backend/get-port";
4+
5+
beforeAll(before);
6+
7+
describe("adding a node times out if we can't connect to it, rather than trying forever", () => {
8+
it("tries to add a link to a node that doesn't exist", async () => {
9+
const { server } = await createClusterNode({
10+
clusterName: "cluster0",
11+
id: "1",
12+
});
13+
const port = await getPort();
14+
expect(async () => {
15+
await server.join(`localhost:${port}`, { timeout: 500 });
16+
}).rejects.toThrow("timeout");
17+
});
18+
});
19+
20+
afterAll(after);

src/packages/backend/conat/test/sync/cluster.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ describe("using various sync data structures with a cluster", () => {
4141
});
4242

4343
it("wait until both servers in the cluster have the same state", async () => {
44-
await waitForConsistentState([server, server2], 5000);
44+
await waitForConsistentState([server, server2], 15000);
4545
});
4646

4747
let dstream2;

src/packages/backend/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"build": "pnpm exec tsc --build",
1818
"tsc": "pnpm exec tsc --watch --pretty --preserveWatchOutput",
1919
"test": "pnpm exec jest --forceExit --maxWorkers=25%",
20-
"test-conat": " pnpm exec jest --forceExit --maxWorkers=50% conat",
20+
"test-conat": " pnpm exec jest --forceExit conat",
2121
"stress-test": " pnpm exec jest --forceExit",
2222
"testp": "pnpm exec jest --forceExit",
2323
"depcheck": "pnpx depcheck --ignores events",

src/packages/conat/core/client.ts

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -546,27 +546,30 @@ export class Client extends EventEmitter {
546546
setTimeout(() => this.conn.io.disconnect(), 1);
547547
};
548548

549-
waitUntilSignedIn = reuseInFlight(async () => {
550-
// not "signed in" if --
551-
// - not connected, or
552-
// - no info at all (which gets sent on sign in)
553-
// - or the user is {error:....}, which is what happens when sign in fails
554-
// e.g., do to an expired cookie
555-
if (
556-
this.info == null ||
557-
this.state != "connected" ||
558-
this.info?.user?.error
559-
) {
560-
await once(this, "info");
561-
}
562-
if (
563-
this.info == null ||
564-
this.state != "connected" ||
565-
this.info?.user?.error
566-
) {
567-
throw Error("failed to sign in");
568-
}
569-
});
549+
// this has NO timeout by default
550+
waitUntilSignedIn = reuseInFlight(
551+
async ({ timeout }: { timeout?: number } = {}) => {
552+
// not "signed in" if --
553+
// - not connected, or
554+
// - no info at all (which gets sent on sign in)
555+
// - or the user is {error:....}, which is what happens when sign in fails
556+
// e.g., do to an expired cookie
557+
if (
558+
this.info == null ||
559+
this.state != "connected" ||
560+
this.info?.user?.error
561+
) {
562+
await once(this, "info", timeout);
563+
}
564+
if (
565+
this.info == null ||
566+
this.state != "connected" ||
567+
this.info?.user?.error
568+
) {
569+
throw Error("failed to sign in");
570+
}
571+
},
572+
);
570573

571574
private statsLoop = async () => {
572575
await until(

src/packages/conat/core/cluster.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,30 @@ import { once } from "@cocalc/util/async-utils";
1111
import { server as createPersistServer } from "@cocalc/conat/persist/server";
1212
import { getLogger } from "@cocalc/conat/client";
1313
import { hash_string } from "@cocalc/util/misc";
14+
const CREATE_LINK_TIMEOUT = 45_000;
1415

1516
const logger = getLogger("conat:core:cluster");
1617

1718
export async function clusterLink(
1819
address: string,
1920
systemAccountPassword: string,
2021
updateStickyLocal: (sticky: StickyUpdate) => void,
22+
timeout = CREATE_LINK_TIMEOUT,
2123
) {
2224
const client = connect({ address, systemAccountPassword });
2325
if (client.info == null) {
24-
await client.waitUntilSignedIn();
25-
if (client.info == null) throw Error("bug -- failed to sign in");
26+
try {
27+
await client.waitUntilSignedIn({
28+
timeout: timeout ?? CREATE_LINK_TIMEOUT,
29+
});
30+
} catch (err) {
31+
client.close();
32+
throw err;
33+
}
34+
if (client.info == null) {
35+
// this is impossible
36+
throw Error("BUG -- failed to sign in");
37+
}
2638
}
2739
const { id, clusterName } = client.info;
2840
if (!id) {

src/packages/conat/core/server.ts

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,36 +1103,46 @@ export class ConatServer extends EventEmitter {
11031103
// - the address obviously must be reachable over the network
11041104
// - the systemAccountPassword of this node and the one with the given
11051105
// address must be the same.
1106-
join = reuseInFlight(async (address: string): Promise<ClusterLink> => {
1107-
if (!this.options.systemAccountPassword) {
1108-
throw Error("systemAccountPassword must be set");
1109-
}
1110-
logger.debug("join: connecting to ", address);
1111-
const link0 = this.clusterLinksByAddress[address];
1112-
if (link0 != null) {
1113-
logger.debug("join: already connected to ", address);
1114-
return link0;
1115-
}
1116-
try {
1117-
const link = await clusterLink(
1118-
address,
1119-
this.options.systemAccountPassword,
1120-
this.updateSticky,
1121-
);
1122-
const { clusterName, id } = link;
1123-
if (this.clusterLinks[clusterName] == null) {
1124-
this.clusterLinks[clusterName] = {};
1106+
join = reuseInFlight(
1107+
async (
1108+
address: string,
1109+
{ timeout }: { timeout?: number } = {},
1110+
): Promise<ClusterLink> => {
1111+
if (!this.options.systemAccountPassword) {
1112+
throw Error("systemAccountPassword must be set");
11251113
}
1126-
this.clusterLinks[clusterName][id] = link;
1127-
this.clusterLinksByAddress[address] = link;
1128-
this.scanSoon();
1129-
logger.debug("join: successfully created new connection to ", address);
1130-
return link;
1131-
} catch (err) {
1132-
logger.debug("join: FAILED creating a new connection to ", address, err);
1133-
throw err;
1134-
}
1135-
});
1114+
logger.debug("join: connecting to ", address);
1115+
const link0 = this.clusterLinksByAddress[address];
1116+
if (link0 != null) {
1117+
logger.debug("join: already connected to ", address);
1118+
return link0;
1119+
}
1120+
try {
1121+
const link = await clusterLink(
1122+
address,
1123+
this.options.systemAccountPassword,
1124+
this.updateSticky,
1125+
timeout,
1126+
);
1127+
const { clusterName, id } = link;
1128+
if (this.clusterLinks[clusterName] == null) {
1129+
this.clusterLinks[clusterName] = {};
1130+
}
1131+
this.clusterLinks[clusterName][id] = link;
1132+
this.clusterLinksByAddress[address] = link;
1133+
this.scanSoon();
1134+
logger.debug("join: successfully created new connection to ", address);
1135+
return link;
1136+
} catch (err) {
1137+
logger.debug(
1138+
"join: FAILED creating a new connection to ",
1139+
address,
1140+
err,
1141+
);
1142+
throw err;
1143+
}
1144+
},
1145+
);
11361146

11371147
unjoin = ({
11381148
id,

src/packages/util/async-utils.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,13 @@ export class TimeoutError extends Error {
181181
export async function once(
182182
obj: EventEmitter,
183183
event: string,
184-
timeout_ms: number = 0,
184+
timeout_ms: number | undefined = 0,
185185
): Promise<any> {
186186
if (obj == null) throw Error("once -- obj is undefined");
187+
if (timeout_ms == null) {
188+
// clients might explicitly pass in undefined, but below we expect 0 to mean "no timeout"
189+
timeout_ms = 0;
190+
}
187191
if (typeof obj.once != "function")
188192
throw Error("once -- obj.once must be a function");
189193

@@ -209,7 +213,9 @@ export async function once(
209213
function onTimeout() {
210214
cleanup();
211215
reject(
212-
new TimeoutError(`once: timeout of ${timeout_ms}ms waiting for "${event}"`),
216+
new TimeoutError(
217+
`once: timeout of ${timeout_ms}ms waiting for "${event}"`,
218+
),
213219
);
214220
}
215221

0 commit comments

Comments
 (0)