Skip to content

Commit b340405

Browse files
committed
Merge remote-tracking branch 'origin/master' into conat-router-peer-discovery
2 parents 962d24c + e12b8de commit b340405

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+739
-207
lines changed

src/packages/backend/conat/test/cluster/cluster-basics.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
9090
link = await clusterLink(
9191
server.address(),
9292
server.options.systemAccountPassword,
93-
() => {},
9493
);
9594
await wait({
9695
until: () => {
@@ -156,7 +155,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
156155
const link2 = await clusterLink(
157156
server.address(),
158157
server.options.systemAccountPassword,
159-
() => {},
160158
);
161159
await wait({
162160
until: () => {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import {
2+
before,
3+
after,
4+
defaultCluster as servers,
5+
waitForConsistentState,
6+
wait,
7+
addNodeToDefaultCluster,
8+
} from "@cocalc/backend/conat/test/setup";
9+
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
10+
import { randomId } from "@cocalc/conat/names";
11+
12+
beforeAll(before);
13+
14+
describe("ensure sticky state sync and use is working properly", () => {
15+
let clients: any;
16+
it("2-node cluster", async () => {
17+
await addNodeToDefaultCluster();
18+
expect(servers.length).toBe(2);
19+
await waitForConsistentState(servers);
20+
clients = servers.map((x) => x.client());
21+
});
22+
23+
const count = 25;
24+
const subs0: any[] = [];
25+
const subs1: any[] = [];
26+
it(`create ${count} distinct sticky subscriptions and send one message to each to create sticky routing state on servers[0]`, async () => {
27+
clients.push(servers[0].client());
28+
clients.push(servers[1].client());
29+
for (let i = 0; i < count; i++) {
30+
subs0.push(
31+
await clients[1].subscribe(`subject.${i}.*`, {
32+
queue: STICKY_QUEUE_GROUP,
33+
}),
34+
);
35+
// wait so above subscription is known to *both* servers:
36+
// @ts-ignore
37+
await servers[0].waitForInterest(
38+
`subject.${i}.0`,
39+
5000,
40+
clients[0].conn.id,
41+
);
42+
subs1.push(
43+
await clients[0].subscribe(`subject.${i}.*`, {
44+
queue: STICKY_QUEUE_GROUP,
45+
}),
46+
);
47+
// publishing causes a choice to be made and saved on servers[0]
48+
await clients[0].publish(`subject.${i}.foo`, "hello");
49+
expect(servers[0].sticky[`subject.${i}.*`]).not.toBe(undefined);
50+
// but no choice on servers[1]
51+
expect(servers[1].sticky[`subject.${i}.*`]).toBe(undefined);
52+
}
53+
});
54+
55+
let chosen;
56+
it("see which subscription got chosen for subject.0.* -- this is useful later", async () => {
57+
const p0 = async () => {
58+
await subs0[0].next();
59+
return 0;
60+
};
61+
const p1 = async () => {
62+
await subs1[0].next();
63+
return 1;
64+
};
65+
chosen = await Promise.race([p0(), p1()]);
66+
});
67+
68+
it(`sticky on servers[0] should have ${count} entries starting in "subject".`, async () => {
69+
const v = Object.keys(servers[0].sticky).filter((s) =>
70+
s.startsWith("subject."),
71+
);
72+
expect(v.length).toBe(count);
73+
});
74+
75+
it(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
76+
const v = Object.keys(servers[1].sticky).filter((s) =>
77+
s.startsWith("subject."),
78+
);
79+
expect(v.length).toBe(0);
80+
});
81+
82+
it(`servers[1]'s link to servers[0] should *eventually* have ${count} entries starting in "subject."`, async () => {
83+
// @ts-ignore
84+
const link = servers[1].clusterLinksByAddress[servers[0].address()];
85+
let v;
86+
await wait({
87+
until: () => {
88+
v = Object.keys(link.sticky).filter((s) => s.startsWith("subject."));
89+
return v.length == count;
90+
},
91+
});
92+
expect(v.length).toBe(count);
93+
});
94+
95+
it("send message from clients[1] to each subject", async () => {
96+
for (let i = 0; i < count; i++) {
97+
await clients[1].publish(`subject.${i}.foo`);
98+
}
99+
});
100+
101+
it(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
102+
const v = Object.keys(servers[1].sticky).filter((s) =>
103+
s.startsWith("subject."),
104+
);
105+
expect(v.length).toBe(0);
106+
});
107+
108+
async function deliveryTest() {
109+
const sub = chosen == 0 ? subs0[0] : subs1[0];
110+
111+
// clear up the subscription (we sent it stuff above)
112+
const sentinel = randomId();
113+
await clients[0].publish("subject.0.foo", sentinel);
114+
while (true) {
115+
const { value } = await sub.next();
116+
if (value.data == sentinel) {
117+
break;
118+
}
119+
}
120+
for (const server of servers) {
121+
// we randomize the last segment to verify that it is NOT used
122+
// as input to the sticky routing choice.
123+
const { count } = await server
124+
.client()
125+
.publish(`subject.0.${randomId()}`, "delivery-test");
126+
expect(count).toBe(1);
127+
}
128+
const ids = new Set<string>();
129+
for (let i = 0; i < servers.length; i++) {
130+
// on of the subs will receive it and one will hang forever (which is fine)
131+
const { value } = await sub.next();
132+
expect(value.data).toBe("delivery-test");
133+
ids.add(value.client.id);
134+
}
135+
// all messages must go to the SAME subscriber, since sticky
136+
expect(ids.size).toBe(1);
137+
}
138+
139+
it("publish from every node to subject.0.foo", deliveryTest);
140+
141+
const count2 = 5;
142+
it(`add ${count2} more nodes to the cluster should be reaonably fast and not blow up in a feedback loop`, async () => {
143+
for (let i = 0; i < count2; i++) {
144+
await addNodeToDefaultCluster();
145+
}
146+
});
147+
148+
it("wait until cluster is consistent", async () => {
149+
await waitForConsistentState(servers);
150+
});
151+
152+
it("double check the links have the sticky state", () => {
153+
for (const server of servers.slice(1)) {
154+
// @ts-ignore
155+
const link = server.clusterLinksByAddress[servers[0].address()];
156+
const v = Object.keys(link.sticky).filter((s) =>
157+
s.startsWith("subject."),
158+
);
159+
expect(v.length).toBe(count);
160+
}
161+
});
162+
163+
it(
164+
"in bigger, cluster, publish from every node to subject.0.foo",
165+
deliveryTest,
166+
);
167+
168+
it("listen on > and note that it doesn't impact the count", async () => {
169+
const sub = await clients[0].subscribe(">");
170+
for (let i = 0; i < servers.length; i++) {
171+
const { count } = await servers[i]
172+
.client()
173+
.publish("subject.0.foo", "hi");
174+
expect(count).toBe(1);
175+
}
176+
sub.close();
177+
});
178+
179+
it("unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]", async () => {
180+
await servers[1].unjoin({ address: servers[0].address() });
181+
const v = Object.keys(servers[1].sticky).filter((s) =>
182+
s.startsWith("subject."),
183+
);
184+
expect(v.length).toBe(count);
185+
});
186+
});
187+
188+
afterAll(after);

src/packages/backend/conat/test/persist/multiple-servers.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Unit tests of multiple persist servers at once:
44
- making numerous distinct clients and seeing that they are distributed across persist servers
55
- stopping a persist server and seeing that failover happens without data loss
66
7-
pnpm test `pwd`/multiple-servers.test.ts
7+
pnpm test `pwd`/multiple-servers.test.ts
88
99
*/
1010

src/packages/backend/conat/test/socket/basic.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
3-
pnpm test `pwd`/basic.test.ts
3+
pnpm test `pwd`/basic.test.ts
44
55
*/
66

src/packages/backend/conat/test/socket/restarts.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ beforeAll(async () => {
1818
setDefaultTimeouts({ request: 500, publish: 500 });
1919
});
2020

21-
//jest.setTimeout(25000);
22-
2321
describe("create a client and server and socket, verify it works, restart conat server, then confirm that socket still works", () => {
2422
const SUBJECT = "reconnect.one";
2523

src/packages/backend/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
"clean": "rm -rf dist node_modules",
1717
"build": "pnpm exec tsc --build",
1818
"tsc": "pnpm exec tsc --watch --pretty --preserveWatchOutput",
19-
"test": "pnpm exec jest --forceExit --maxWorkers=25%",
19+
"test": "pnpm exec jest --forceExit",
2020
"test-conat": " pnpm exec jest --forceExit conat",
21-
"stress-test": " pnpm exec jest --forceExit",
2221
"testp": "pnpm exec jest --forceExit",
2322
"depcheck": "pnpx depcheck --ignores events",
2423
"prepublishOnly": "pnpm test",

src/packages/conat/core/client.ts

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ much better in so many ways:
2222
- JsonCodec: uses JSON.stringify and TextEncoder. This does not work
2323
with Buffer or Date and is less compact, but can be very fast.
2424
25+
- One BIG DIFFERENCE from Nats is that when a message is sent the sender
26+
can optionally find out how many clients received it. They can also wait
27+
until there is interest and send the message again. This is automated so
28+
it's very easy to use, and it makes writing distributed services without
29+
race conditions making them broken temporarily much easier. HOWEVER,
30+
there is one caveat -- if as an admin you create a "tap", i.e., you
31+
subscribe to all messages matching some pattern just to see what's going
32+
on, then currently that counts in the delivery count and interest, and that
33+
would then cause these race conditions to happen again. E.g., a user
34+
signs in, subscribes to their INBOX, sends a request, and gets a response
35+
to that inbox, but does this all quickly, and in a cluster, the server doesn't
36+
see the inbox yet, so it fails. As a workaround, subscriptions to the
37+
subject pattern '>' are invisible, so you can always tap into '>' for debugging
38+
purposes. TODO: implement a general way of making an invisible subscriber that
39+
doesn't count.
40+
2541
2642
THE CORE API
2743
@@ -604,13 +620,14 @@ export class Client extends EventEmitter {
604620
);
605621
}
606622
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
607-
const response = await this.conn
608-
.timeout(timeout ? timeout : 10000)
609-
.emitWithAck("wait-for-interest", { subject, timeout });
610-
if (response.error) {
611-
throw new ConatError(response.error, { code: response.code });
623+
try {
624+
const response = await this.conn
625+
.timeout(timeout ? timeout : 10000)
626+
.emitWithAck("wait-for-interest", { subject, timeout });
627+
return response;
628+
} catch (err) {
629+
throw toConatError(err);
612630
}
613-
return response;
614631
};
615632

616633
recvStats = (bytes: number) => {
@@ -954,7 +971,7 @@ export class Client extends EventEmitter {
954971
});
955972
}
956973
} catch (err) {
957-
throw new ConatError(`${err}`, { code: 408 });
974+
throw toConatError(err);
958975
}
959976
if (response?.error) {
960977
throw new ConatError(response.error, { code: response.code });
@@ -1300,9 +1317,7 @@ export class Client extends EventEmitter {
13001317
return response;
13011318
}
13021319
} catch (err) {
1303-
throw new ConatError(`timeout - ${subject} - ${err}`, {
1304-
code: 408,
1305-
});
1320+
throw toConatError(err);
13061321
}
13071322
} else {
13081323
return await this.conn.emitWithAck("publish", v);
@@ -1910,3 +1925,15 @@ function isEmpty(obj: object): boolean {
19101925
}
19111926
return true;
19121927
}
1928+
1929+
function toConatError(socketIoError) {
1930+
// only errors are "disconnected" and a timeout
1931+
const e = `${socketIoError}`;
1932+
if (e.includes("disconnected")) {
1933+
return e;
1934+
} else {
1935+
return new ConatError(`timeout - ${e}`, {
1936+
code: 408,
1937+
});
1938+
}
1939+
}

src/packages/conat/core/cluster.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const logger = getLogger("conat:core:cluster");
1818
export async function clusterLink(
1919
address: string,
2020
systemAccountPassword: string,
21-
updateStickyLocal: (sticky: StickyUpdate) => void,
2221
timeout = CREATE_LINK_TIMEOUT,
2322
) {
2423
const client = connect({ address, systemAccountPassword });
@@ -43,13 +42,7 @@ export async function clusterLink(
4342
if (!clusterName) {
4443
throw Error("clusterName must be specified");
4544
}
46-
const link = new ClusterLink(
47-
client,
48-
id,
49-
clusterName,
50-
address,
51-
updateStickyLocal,
52-
);
45+
const link = new ClusterLink(client, id, clusterName, address);
5346
await link.init();
5447
return link;
5548
}
@@ -61,7 +54,7 @@ export { type ClusterLink };
6154

6255
class ClusterLink {
6356
public interest: Interest = new Patterns();
64-
private sticky: Sticky = {};
57+
public sticky: Sticky = {};
6558
private streams: ClusterStreams;
6659
private state: "init" | "ready" | "closed" = "init";
6760
private clientStateChanged = Date.now(); // when client status last changed
@@ -71,7 +64,6 @@ class ClusterLink {
7164
public readonly id: string,
7265
public readonly clusterName: string,
7366
public readonly address: string,
74-
private readonly updateStickyLocal: (sticky: StickyUpdate) => void,
7567
) {
7668
if (!client) {
7769
throw Error("client must be specified");
@@ -119,7 +111,6 @@ class ClusterLink {
119111

120112
handleStickyUpdate = (update: StickyUpdate) => {
121113
updateSticky(update, this.sticky);
122-
this.updateStickyLocal(update);
123114
};
124115

125116
private handleClientStateChanged = () => {

0 commit comments

Comments
 (0)