Skip to content

Commit b8278f9

Browse files
committed
conat core: changing sticky state to not be redundant
1 parent c9d52b1 commit b8278f9

File tree

6 files changed

+147
-21
lines changed

6 files changed

+147
-21
lines changed

src/packages/backend/conat/test/cluster/cluster-basics.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
9090
link = await clusterLink(
9191
server.address(),
9292
server.options.systemAccountPassword,
93-
() => {},
9493
);
9594
await wait({
9695
until: () => {
@@ -156,7 +155,6 @@ describe("create a cluster enabled socketio server and test that the streams upd
156155
const link2 = await clusterLink(
157156
server.address(),
158157
server.options.systemAccountPassword,
159-
() => {},
160158
);
161159
await wait({
162160
until: () => {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import {
2+
before,
3+
after,
4+
defaultCluster as servers,
5+
waitForConsistentState,
6+
wait,
7+
addNodeToDefaultCluster,
8+
} from "@cocalc/backend/conat/test/setup";
9+
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
10+
11+
beforeAll(before);
12+
13+
describe("ensure sticky state sync and use is working properly", () => {
14+
let clients: any;
15+
it("2-node cluster", async () => {
16+
await addNodeToDefaultCluster();
17+
expect(servers.length).toBe(2);
18+
await waitForConsistentState(servers);
19+
clients = servers.map((x) => x.client());
20+
});
21+
22+
const count = 5;
23+
it(`create ${count} distinct sticky subscriptions and send one message to each to creat sticky routing state on servers[0]`, async () => {
24+
clients.push(servers[0].client());
25+
clients.push(servers[1].client());
26+
for (let i = 0; i < count; i++) {
27+
await clients[1].subscribe(`subject.${i}.*`, {
28+
queue: STICKY_QUEUE_GROUP,
29+
});
30+
// wait so above subscription is known to *both* servers:
31+
// @ts-ignore
32+
await servers[0].waitForInterest(
33+
`subject.${i}.0`,
34+
5000,
35+
clients[0].conn.id,
36+
);
37+
await clients[0].subscribe(`subject.${i}.*`, {
38+
queue: STICKY_QUEUE_GROUP,
39+
});
40+
// cause choice to be made and saved on servers[0]
41+
await clients[0].publish(`subject.${i}.foo`);
42+
}
43+
});
44+
45+
it(`sticky on servers[0] should have ${count} entries starting in "subject".`, async () => {
46+
const v = Object.keys(servers[0].sticky).filter((s) =>
47+
s.startsWith("subject."),
48+
);
49+
expect(v.length).toBe(count);
50+
});
51+
52+
it.skip(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
53+
const v = Object.keys(servers[1].sticky).filter((s) =>
54+
s.startsWith("subject."),
55+
);
56+
expect(v.length).toBe(0);
57+
});
58+
59+
it(`servers[1]'s link to servers[0] should *eventually* have ${count} entries starting in "subject."`, async () => {
60+
// @ts-ignore
61+
const link = servers[1].clusterLinksByAddress[servers[0].address()];
62+
let v;
63+
await wait({
64+
until: () => {
65+
v = Object.keys(link.sticky).filter((s) => s.startsWith("subject."));
66+
return v.length == count;
67+
},
68+
});
69+
expect(v.length).toBe(count);
70+
});
71+
72+
it("send message from clients[1] to each subject", async () => {
73+
for (let i = 0; i < count; i++) {
74+
await clients[1].publish(`subject.${i}.foo`);
75+
}
76+
});
77+
78+
it.skip(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
79+
const v = Object.keys(servers[1].sticky).filter((s) =>
80+
s.startsWith("subject."),
81+
);
82+
expect(v.length).toBe(0);
83+
});
84+
});
85+
86+
afterAll(after);

src/packages/backend/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
"clean": "rm -rf dist node_modules",
1717
"build": "pnpm exec tsc --build",
1818
"tsc": "pnpm exec tsc --watch --pretty --preserveWatchOutput",
19-
"test": "pnpm exec jest --forceExit --maxWorkers=25%",
19+
"test": "pnpm exec jest --forceExit",
2020
"test-conat": " pnpm exec jest --forceExit conat",
21-
"stress-test": " pnpm exec jest --forceExit",
2221
"testp": "pnpm exec jest --forceExit",
2322
"depcheck": "pnpx depcheck --ignores events",
2423
"prepublishOnly": "pnpm test",

src/packages/conat/core/cluster.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const logger = getLogger("conat:core:cluster");
1818
export async function clusterLink(
1919
address: string,
2020
systemAccountPassword: string,
21-
updateStickyLocal: (sticky: StickyUpdate) => void,
2221
timeout = CREATE_LINK_TIMEOUT,
2322
) {
2423
const client = connect({ address, systemAccountPassword });
@@ -43,13 +42,7 @@ export async function clusterLink(
4342
if (!clusterName) {
4443
throw Error("clusterName must be specified");
4544
}
46-
const link = new ClusterLink(
47-
client,
48-
id,
49-
clusterName,
50-
address,
51-
updateStickyLocal,
52-
);
45+
const link = new ClusterLink(client, id, clusterName, address);
5346
await link.init();
5447
return link;
5548
}
@@ -61,7 +54,7 @@ export { type ClusterLink };
6154

6255
class ClusterLink {
6356
public interest: Interest = new Patterns();
64-
private sticky: Sticky = {};
57+
public sticky: Sticky = {};
6558
private streams: ClusterStreams;
6659
private state: "init" | "ready" | "closed" = "init";
6760
private clientStateChanged = Date.now(); // when client status last changed
@@ -71,7 +64,6 @@ class ClusterLink {
7164
public readonly id: string,
7265
public readonly clusterName: string,
7366
public readonly address: string,
74-
private readonly updateStickyLocal: (sticky: StickyUpdate) => void,
7567
) {
7668
if (!client) {
7769
throw Error("client must be specified");
@@ -119,7 +111,6 @@ class ClusterLink {
119111

120112
handleStickyUpdate = (update: StickyUpdate) => {
121113
updateSticky(update, this.sticky);
122-
this.updateStickyLocal(update);
123114
};
124115

125116
private handleClientStateChanged = () => {

src/packages/conat/core/server.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ export interface Options {
137137
// if true, use https when creating an internal client.
138138
ssl?: boolean;
139139

140+
// WARNING: **superclusters are NOT fully iplemented yet.**
141+
//
140142
// if clusterName is set, enable clustering. Each node
141143
// in the cluster must have a different name. systemAccountPassword
142144
// must also be set. This only has an impact when the id is '0'.
@@ -456,8 +458,54 @@ export class ConatServer extends EventEmitter {
456458
}
457459
};
458460

459-
private getStickyTarget = ({ pattern, subject }) => {
460-
return this.sticky[pattern]?.[subject];
461+
private getStickyTarget = ({
462+
pattern,
463+
subject,
464+
targets: targets0,
465+
}: {
466+
pattern: string;
467+
subject: string;
468+
targets: Set<string>; // the current valid choices as defined by subscribers known via interest graph
469+
}) => {
470+
if (!this.cluster || this.clusterName == null) {
471+
return this.sticky[pattern]?.[subject];
472+
}
473+
const targets = new Set<string>();
474+
const target = this.sticky[pattern]?.[subject];
475+
if (target !== undefined && targets0.has(target)) {
476+
targets.add(target);
477+
}
478+
// next check sticky state of other nodes in the cluster
479+
const cluster = this.clusterLinks[this.clusterName];
480+
for (const id in cluster) {
481+
const target = cluster[id].sticky[pattern]?.[subject];
482+
if (target !== undefined && targets0.has(target)) {
483+
targets.add(target);
484+
}
485+
}
486+
if (targets.size == 0) {
487+
return undefined;
488+
}
489+
if (targets.size == 1) {
490+
for (const target of targets) {
491+
return target;
492+
}
493+
}
494+
495+
// problem: there are distinct mutually incompatible
496+
// choices of targets. This can only happen if at least
497+
// two choices were made when the cluster was in an
498+
// inconsistent state.
499+
// We just take the first in alphabetical order.
500+
// Since the sticky maps being used to make
501+
// this list of targets is eventually consistent
502+
// across the cluster, the same choice of target from
503+
// those targets will be made by all nodes.
504+
// The main problem with doing this is its slightly more
505+
// effort. The main advantage is that no communication
506+
// or coordination between nodes is needed to "fix or agree
507+
// on something", and that's a huge advantage!!
508+
return Array.from(targets).sort()[0];
461509
};
462510

463511
///////////////////////////////////////
@@ -678,7 +726,7 @@ export class ConatServer extends EventEmitter {
678726
}
679727

680728
//
681-
// TODO: Supercluster routing.
729+
// TODO: Supercluster routing. NOT IMPLEMENTED YET
682730
//
683731
// // if no matches in local cluster, try the supercluster (if there is one)
684732
// if (count == 0) {
@@ -1122,7 +1170,6 @@ export class ConatServer extends EventEmitter {
11221170
const link = await clusterLink(
11231171
address,
11241172
this.options.systemAccountPassword,
1125-
this.updateSticky,
11261173
timeout,
11271174
);
11281175
const { clusterName, id } = link;

src/packages/conat/core/sticky.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import ConsistentHash from "consistent-hash";
2+
import { hash_string } from "@cocalc/util/misc";
23

34
export function consistentHashingChoice(
45
v: Set<string>,
@@ -18,7 +19,10 @@ export function consistentHashingChoice(
1819
for (const x of w) {
1920
hr.add(x);
2021
}
21-
return hr.get(resource);
22+
// we hash the resource so that the values are randomly distributed even
23+
// if the resources look very similar (e.g., subject.1, subject.2, etc.)
24+
// I thought that "consistent-hash" hashed the resource, but it doesn't really.
25+
return hr.get(hash_string(resource));
2226
}
2327

2428
export function stickyChoice({
@@ -35,11 +39,12 @@ export function stickyChoice({
3539
getStickyTarget: (opts: {
3640
pattern: string;
3741
subject: string;
42+
targets: Set<string>;
3843
}) => string | undefined;
3944
}) {
4045
const v = subject.split(".");
4146
subject = v.slice(0, v.length - 1).join(".");
42-
const currentTarget = getStickyTarget({ pattern, subject });
47+
const currentTarget = getStickyTarget({ pattern, subject, targets });
4348
if (currentTarget === undefined || !targets.has(currentTarget)) {
4449
// we use consistent hashing instead of random to make the choice, because if
4550
// choice is being made by two different socketio servers at the same time,

0 commit comments

Comments
 (0)