Skip to content

Commit 8bc231a

Browse files
committed
improver cluster unit testing; more testing of consistent state
1 parent 0f37d02 commit 8bc231a

File tree

4 files changed

+40
-11
lines changed

4 files changed

+40
-11
lines changed

src/packages/backend/conat/test/cluster/pubsub.test.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
addNodeToDefaultCluster,
1010
wait,
1111
delay,
12+
waitForConsistentState,
13+
defaultCluster,
1214
} from "@cocalc/backend/conat/test/setup";
1315
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
1416

@@ -79,7 +81,7 @@ describe("same basic test, but in the other direction", () => {
7981
});
8082
});
8183

82-
describe("with three nodes and two subscribers with different queue group on distinct nodes", () => {
84+
describe("three nodes and two subscribers with different queue group on distinct nodes", () => {
8385
let client0, client1, client2, server2;
8486
it("get the clients", async () => {
8587
client0 = server.client();
@@ -116,7 +118,7 @@ describe("with three nodes and two subscribers with different queue group on dis
116118
expect(count).toBe(1);
117119
});
118120

119-
it("make a sticky sub from all nodes and see that all messages go to the same receiver, no matter which node we send from", async () => {
121+
it("make sticky subs from each node, and see that all messages go to the same receiver, no matter which node we send from", async () => {
120122
const sub0 = await client0.subscribe("sticky", {
121123
queue: STICKY_QUEUE_GROUP,
122124
});
@@ -150,20 +152,38 @@ describe("with three nodes and two subscribers with different queue group on dis
150152
await client2.waitForInterest("sticky");
151153

152154
for (let i = 0; i < 20; i++) {
153-
const { count } = await client2.publish("sticky");
155+
const { count } = await client2.publish("sticky", null);
154156
expect(count).toBe(1);
155157
}
156158
for (let i = 0; i < 20; i++) {
157-
const { count } = await client1.publish("sticky");
159+
const { count } = await client1.publish("sticky", null);
158160
expect(count).toBe(1);
159161
}
160162
for (let i = 0; i < 20; i++) {
161-
const { count } = await client0.publish("sticky");
163+
const { count } = await client0.publish("sticky", null);
162164
expect(count).toBe(1);
163165
}
166+
await wait({ until: () => n0 + n1 + n2 == 60 });
164167
expect(n0 * n1).toBe(0);
165168
expect(n0 * n2).toBe(0);
166169
expect(n1 * n2).toBe(0);
170+
expect(n0 + n1 + n2).toBe(60);
171+
172+
await waitForConsistentState(defaultCluster);
173+
174+
// add another node and test
175+
const server3 = await addNodeToDefaultCluster();
176+
const client3 = server3.client();
177+
await waitForConsistentState(defaultCluster);
178+
for (let i = 0; i < 20; i++) {
179+
const { count } = await client3.publish("sticky", null);
180+
expect(count).toBe(1);
181+
}
182+
await wait({ until: () => n0 + n1 + n2 == 80 });
183+
expect(n0 * n1).toBe(0);
184+
expect(n0 * n2).toBe(0);
185+
expect(n1 * n2).toBe(0);
186+
expect(n0 + n1 + n2).toBe(80);
167187
});
168188
});
169189

src/packages/backend/conat/test/persist/cluster.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,13 @@ describe("test using multiple persist servers in a cluster", () => {
138138
expect(t.getAll()).toEqual(["x"]);
139139
openStreams1.push(t);
140140
}
141-
expect(openStreams0.length).toBeGreaterThan(1);
141+
expect(openStreams0.length).toBeGreaterThan(0);
142142
});
143143

144144
it("remove one persist server", async () => {
145145
persistServer1.close();
146146
// [ ] TODO: removing this delay leads to very consistent failures
147-
// involving data loss, but things should just be slower, never broken,
147+
// involving data loss, but things should just be slower, never broken,
148148
// on automatic failover.
149149
await delay(3000);
150150
});

src/packages/backend/conat/test/setup.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ export async function initConatServer(
3636
}
3737

3838
const server = createConatServer(options);
39+
if (server.clusterName == "default") {
40+
defaultCluster.push(server);
41+
}
3942
if (server.state != "ready") {
4043
await once(server, "ready");
4144
}
@@ -64,6 +67,8 @@ export async function createServer(opts?) {
6467

6568
// add another node to the cluster -- this is still in the same process (not forked), which
6669
// is generally good since you can console.log from it, faster, etc.
70+
// this does connect the new node to all existing nodes.
71+
export const defaultCluster: ConatServer[] = [];
6772
export async function addNodeToDefaultCluster(): Promise<ConatServer> {
6873
const port = await getPort();
6974
const node = await initConatServer({
@@ -73,8 +78,10 @@ export async function addNodeToDefaultCluster(): Promise<ConatServer> {
7378
id: getNodeId(),
7479
systemAccountPassword: "secret",
7580
});
76-
await server.join(node.address());
77-
await node.join(server.address());
81+
for (const s of defaultCluster) {
82+
await s.join(node.address());
83+
await node.join(s.address());
84+
}
7885
return node;
7986
}
8087

@@ -171,7 +178,9 @@ export async function waitForConsistentState(
171178
}
172179

173180
if (ids.size != servers.length) {
174-
throw Error("all servers must have distinct ids");
181+
throw Error(
182+
`all servers must have distinct ids -- ${JSON.stringify(servers.map((x) => x.id))}`,
183+
);
175184
}
176185

177186
const start = Date.now();

src/packages/conat/core/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ export class ConatServer extends EventEmitter {
187187
} = {};
188188
private clusterLinksByAddress: { [address: string]: ClusterLink } = {};
189189
private clusterPersistServer?: ConatSocketServer;
190-
private clusterName?: string;
190+
public readonly clusterName?: string;
191191
private queuedClusterUpdates: Update[] = [];
192192

193193
constructor(options: Options) {

0 commit comments

Comments
 (0)