Skip to content

Commit 0d48bfc

Browse files
committed
Merge remote-tracking branch 'origin/master' into store-purchase-course
2 parents cc33693 + 8599c22 commit 0d48bfc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+942
-366
lines changed

src/packages/backend/conat/test/cluster/cluster-basics.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
9090
link = await clusterLink(
9191
server.address(),
9292
server.options.systemAccountPassword,
93-
() => {},
9493
);
9594
await wait({
9695
until: () => {
@@ -156,7 +155,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
156155
const link2 = await clusterLink(
157156
server.address(),
158157
server.options.systemAccountPassword,
159-
() => {},
160158
);
161159
await wait({
162160
until: () => {
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import {
2+
before,
3+
after,
4+
defaultCluster as servers,
5+
waitForConsistentState,
6+
wait,
7+
addNodeToDefaultCluster,
8+
} from "@cocalc/backend/conat/test/setup";
9+
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
10+
import { randomId } from "@cocalc/conat/names";
11+
12+
beforeAll(before);
13+
14+
describe("ensure sticky state sync and use is working properly", () => {
15+
let clients: any;
16+
it("2-node cluster", async () => {
17+
await addNodeToDefaultCluster();
18+
expect(servers.length).toBe(2);
19+
await waitForConsistentState(servers);
20+
clients = servers.map((x) => x.client());
21+
});
22+
23+
const count = 25;
24+
const subs0: any[] = [];
25+
const subs1: any[] = [];
26+
it(`create ${count} distinct sticky subscriptions and send one message to each to create sticky routing state on servers[0]`, async () => {
27+
clients.push(servers[0].client());
28+
clients.push(servers[1].client());
29+
for (let i = 0; i < count; i++) {
30+
subs0.push(
31+
await clients[1].subscribe(`subject.${i}.*`, {
32+
queue: STICKY_QUEUE_GROUP,
33+
}),
34+
);
35+
// wait so above subscription is known to *both* servers:
36+
// @ts-ignore
37+
await servers[0].waitForInterest(
38+
`subject.${i}.0`,
39+
5000,
40+
clients[0].conn.id,
41+
);
42+
subs1.push(
43+
await clients[0].subscribe(`subject.${i}.*`, {
44+
queue: STICKY_QUEUE_GROUP,
45+
}),
46+
);
47+
// publishing causes a choice to be made and saved on servers[0]
48+
await clients[0].publish(`subject.${i}.foo`, "hello");
49+
expect(servers[0].sticky[`subject.${i}.*`]).not.toBe(undefined);
50+
// but no choice on servers[1]
51+
expect(servers[1].sticky[`subject.${i}.*`]).toBe(undefined);
52+
}
53+
});
54+
55+
let chosen;
56+
it("see which subscription got chosen for subject.0.* -- this is useful later", async () => {
57+
const p0 = async () => {
58+
await subs0[0].next();
59+
return 0;
60+
};
61+
const p1 = async () => {
62+
await subs1[0].next();
63+
return 1;
64+
};
65+
chosen = await Promise.race([p0(), p1()]);
66+
});
67+
68+
it(`sticky on servers[0] should have ${count} entries starting in "subject".`, async () => {
69+
const v = Object.keys(servers[0].sticky).filter((s) =>
70+
s.startsWith("subject."),
71+
);
72+
expect(v.length).toBe(count);
73+
});
74+
75+
it(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
76+
const v = Object.keys(servers[1].sticky).filter((s) =>
77+
s.startsWith("subject."),
78+
);
79+
expect(v.length).toBe(0);
80+
});
81+
82+
it(`servers[1]'s link to servers[0] should *eventually* have ${count} entries starting in "subject."`, async () => {
83+
// @ts-ignore
84+
const link = servers[1].clusterLinksByAddress[servers[0].address()];
85+
let v;
86+
await wait({
87+
until: () => {
88+
v = Object.keys(link.sticky).filter((s) => s.startsWith("subject."));
89+
return v.length == count;
90+
},
91+
});
92+
expect(v.length).toBe(count);
93+
});
94+
95+
it("send message from clients[1] to each subject", async () => {
96+
for (let i = 0; i < count; i++) {
97+
await clients[1].publish(`subject.${i}.foo`);
98+
}
99+
});
100+
101+
// Sometimes this fails under very heavy load.
102+
// It's not a good test, because it probably hits some timeouts sometimes, and
103+
// it is testing internal structure/optimizations, not behavior.
104+
// Note also that minimizing sticky state computation is just an optimization so even if this test were failing
105+
// due to a bug, it might just mean things are slightly slower.
106+
// it(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
107+
// const v = Object.keys(servers[1].sticky).filter((s) =>
108+
// s.startsWith("subject."),
109+
// );
110+
// expect(v.length).toBe(0);
111+
// });
112+
113+
async function deliveryTest() {
114+
const sub = chosen == 0 ? subs0[0] : subs1[0];
115+
116+
// clear up the subscription (we sent it stuff above)
117+
const sentinel = randomId();
118+
await clients[0].publish("subject.0.foo", sentinel);
119+
while (true) {
120+
const { value } = await sub.next();
121+
if (value.data == sentinel) {
122+
break;
123+
}
124+
}
125+
for (const server of servers) {
126+
// we randomize the last segment to verify that it is NOT used
127+
// as input to the sticky routing choice.
128+
const { count } = await server
129+
.client()
130+
.publish(`subject.0.${randomId()}`, "delivery-test");
131+
expect(count).toBe(1);
132+
}
133+
const ids = new Set<string>();
134+
for (let i = 0; i < servers.length; i++) {
135+
// on of the subs will receive it and one will hang forever (which is fine)
136+
const { value } = await sub.next();
137+
expect(value.data).toBe("delivery-test");
138+
ids.add(value.client.id);
139+
}
140+
// all messages must go to the SAME subscriber, since sticky
141+
expect(ids.size).toBe(1);
142+
}
143+
144+
it("publish from every node to subject.0.foo", deliveryTest);
145+
146+
const count2 = 5;
147+
it(`add ${count2} more nodes to the cluster should be reaonably fast and not blow up in a feedback loop`, async () => {
148+
for (let i = 0; i < count2; i++) {
149+
await addNodeToDefaultCluster();
150+
}
151+
});
152+
153+
it("wait until cluster is consistent", async () => {
154+
await waitForConsistentState(servers);
155+
});
156+
157+
it("double check the links have the sticky state", () => {
158+
for (const server of servers.slice(1)) {
159+
// @ts-ignore
160+
const link = server.clusterLinksByAddress[servers[0].address()];
161+
const v = Object.keys(link.sticky).filter((s) =>
162+
s.startsWith("subject."),
163+
);
164+
expect(v.length).toBe(count);
165+
}
166+
});
167+
168+
it(
169+
"in bigger, cluster, publish from every node to subject.0.foo",
170+
deliveryTest,
171+
);
172+
173+
it("listen on > and note that it doesn't impact the count", async () => {
174+
const sub = await clients[0].subscribe(">");
175+
for (let i = 0; i < servers.length; i++) {
176+
const { count } = await servers[i]
177+
.client()
178+
.publish("subject.0.foo", "hi");
179+
expect(count).toBe(1);
180+
}
181+
sub.close();
182+
});
183+
184+
it("unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]", async () => {
185+
await servers[1].unjoin({ address: servers[0].address() });
186+
const v = Object.keys(servers[1].sticky).filter((s) =>
187+
s.startsWith("subject."),
188+
);
189+
expect(v.length).toBe(count);
190+
});
191+
});
192+
193+
afterAll(after);

src/packages/backend/conat/test/cluster/node-discovery.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { isEqual } from "lodash";
1010

1111
beforeAll(before);
1212

13-
jest.setTimeout(20000);
13+
jest.setTimeout(30000);
1414
describe("test automatic node discovery (and forgetting)", () => {
1515
const nodes: { client; server }[] = [];
1616
const clusterName = "auto";

src/packages/backend/conat/test/persist/cluster.test.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import {
1515
persistServer as persistServer0,
1616
wait,
1717
setDefaultTimeouts,
18+
setDefaultSocketTimeouts,
19+
setDefaultReconnectDelay,
1820
waitForConsistentState,
1921
} from "../setup";
2022
import { uuid } from "@cocalc/util/misc";
@@ -25,9 +27,15 @@ beforeAll(async () => {
2527
await before();
2628
// this speeds up the automatic failover tests a lot.
2729
setDefaultTimeouts({ request: 1000, publish: 1000 });
30+
setDefaultSocketTimeouts({
31+
command: 1000,
32+
keepAlive: 2000,
33+
keepAliveTimeout: 1000,
34+
});
35+
setDefaultReconnectDelay(1);
2836
});
2937

30-
jest.setTimeout(15000);
38+
jest.setTimeout(10000);
3139
describe("test using multiple persist servers in a cluster", () => {
3240
let client0, server1, client1;
3341
it("add another node", async () => {
@@ -143,17 +151,14 @@ describe("test using multiple persist servers in a cluster", () => {
143151

144152
it("remove one persist server", async () => {
145153
persistServer1.close();
146-
// [ ] TODO: removing this delay leads to very consistent failures
147-
// involving data loss, but things should just be slower, never broken,
148-
// on automatic failover.
149-
await delay(3000);
150154
});
151155

152156
it("creating / opening streams we made above still work with no data lost", async () => {
153157
for (const project_id of project_ids) {
154158
const s = await client0.sync.dstream({
155159
project_id,
156160
name: "foo",
161+
noCache: true,
157162
sync: true,
158163
});
159164
expect(await s.getAll()).toEqual([project_id]);
@@ -169,9 +174,16 @@ describe("test using multiple persist servers in a cluster", () => {
169174
stream0.publish("y");
170175
await stream0.save();
171176
expect(stream0.hasUnsavedChanges()).toBe(false);
177+
172178
const stream1 = openStreams1[i];
173179
expect(stream0.opts.project_id).toEqual(stream1.opts.project_id);
174-
await wait({ until: () => stream1.length >= 2, timeout: 10000 });
180+
await wait({
181+
until: async () => {
182+
return stream1.length >= 2;
183+
},
184+
timeout: 5000,
185+
start: 1000,
186+
});
175187
expect(stream1.length).toBe(2);
176188
}
177189
});

src/packages/backend/conat/test/persist/multiple-servers.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Unit tests of multiple persist servers at once:
44
- making numerous distinct clients and seeing that they are distributed across persist servers
55
- stopping a persist server and seeing that failover happens without data loss
66
7-
pnpm test `pwd`/multiple-servers.test.ts
7+
pnpm test `pwd`/multiple-servers.test.ts
88
99
*/
1010

src/packages/backend/conat/test/setup.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import { join } from "path";
1515
import { wait } from "@cocalc/backend/conat/test/util";
1616
import { delay } from "awaiting";
1717
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
18+
export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";
19+
export { setDefaultReconnectDelay } from "@cocalc/conat/persist/client";
1820
import { once } from "@cocalc/util/async-utils";
1921
import { until } from "@cocalc/util/async-utils";
2022
import { randomId } from "@cocalc/conat/names";

src/packages/backend/conat/test/socket/basic.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
3-
pnpm test `pwd`/basic.test.ts
3+
pnpm test `pwd`/basic.test.ts
44
55
*/
66

src/packages/backend/conat/test/socket/restarts.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ beforeAll(async () => {
1818
setDefaultTimeouts({ request: 500, publish: 500 });
1919
});
2020

21-
//jest.setTimeout(25000);
22-
2321
describe("create a client and server and socket, verify it works, restart conat server, then confirm that socket still works", () => {
2422
const SUBJECT = "reconnect.one";
2523

src/packages/backend/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
"clean": "rm -rf dist node_modules",
1717
"build": "pnpm exec tsc --build",
1818
"tsc": "pnpm exec tsc --watch --pretty --preserveWatchOutput",
19-
"test": "pnpm exec jest --forceExit --maxWorkers=25%",
19+
"test": "pnpm exec jest --forceExit",
2020
"test-conat": " pnpm exec jest --forceExit conat",
21-
"stress-test": " pnpm exec jest --forceExit",
2221
"testp": "pnpm exec jest --forceExit",
2322
"depcheck": "pnpx depcheck --ignores events",
2423
"prepublishOnly": "pnpm test",

0 commit comments

Comments
 (0)