Skip to content

Commit bae3816

Browse files
committed
implement computing hash of interest and sticky, so we can be sure conat cluster state doesn't "drift"
1 parent f8ec6d2 commit bae3816

File tree

3 files changed

+56
-14
lines changed

3 files changed

+56
-14
lines changed

src/packages/conat/core/cluster.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import type { DStream } from "@cocalc/conat/sync/dstream";
1010
import { once } from "@cocalc/util/async-utils";
1111
import { server as createPersistServer } from "@cocalc/conat/persist/server";
1212
import { getLogger } from "@cocalc/conat/client";
13+
import { hash_string } from "@cocalc/util/misc";
1314

1415
const logger = getLogger("conat:core:cluster");
1516

@@ -41,11 +42,14 @@ export async function clusterLink(
4142
return link;
4243
}
4344

45+
export type Sticky = { [pattern: string]: { [subject: string]: string } };
46+
export type Interest = Patterns<{ [queue: string]: Set<string> }>;
47+
4448
export { type ClusterLink };
4549

4650
class ClusterLink {
47-
public interest: Patterns<{ [queue: string]: Set<string> }> = new Patterns();
48-
private sticky: { [pattern: string]: { [subject: string]: string } } = {};
51+
public interest: Interest = new Patterns();
52+
private sticky: Sticky = {};
4953
private streams: ClusterStreams;
5054
private state: "init" | "ready" | "closed" = "init";
5155
private clientStateChanged = Date.now(); // when client status last changed
@@ -322,3 +326,37 @@ export async function trimClusterStreams(
322326

323327
return { seqsInterest: seqs, seqsSticky: seqs2 };
324328
}
329+
330+
function hashSet(X: Set<string>): number {
331+
let h = 0;
332+
for (const a of X) {
333+
h += hash_string(a); // integers, and not too many, so should commute
334+
}
335+
return h;
336+
}
337+
338+
function hashInterestValue(X: { [queue: string]: Set<string> }): number {
339+
let h = 0;
340+
for (const queue in X) {
341+
h += hashSet(X[queue]); // integers, and not too many, so should commute
342+
}
343+
return h;
344+
}
345+
346+
export function hashInterest(
347+
interest: Patterns<{ [queue: string]: Set<string> }>,
348+
): number {
349+
return interest.hash(hashInterestValue);
350+
}
351+
352+
export function hashSticky(sticky: Sticky): number {
353+
let h = 0;
354+
for (const pattern in sticky) {
355+
h += hash_string(pattern);
356+
const x = sticky[pattern];
357+
for (const subject in x) {
358+
h += hash_string(x[subject]);
359+
}
360+
}
361+
return h;
362+
}

src/packages/conat/core/patterns.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { isEqual } from "lodash";
22
import { getLogger } from "@cocalc/conat/client";
33
import { EventEmitter } from "events";
4+
import { hash_string } from "@cocalc/util/misc";
45

56
type Index = { [pattern: string]: Index | string };
67

@@ -21,6 +22,14 @@ export class Patterns<T> extends EventEmitter {
2122
this.index = {};
2223
};
2324

25+
hash = (hashT: (x: T) => number): number => {
26+
let h = 0;
27+
for (const pattern in this.patterns) {
28+
h += hash_string(pattern) + hashT(this.patterns[pattern]);
29+
}
30+
return h;
31+
};
32+
2433
serialize = (fromT?: (x: T) => any) => {
2534
let patterns: { [pattern: string]: any };
2635
if (fromT != null) {

src/packages/conat/core/server.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ import {
6161
type ClusterStreams,
6262
trimClusterStreams,
6363
createClusterPersistServer,
64+
Sticky,
65+
Interest,
6466
} from "./cluster";
6567
import { type ConatSocketServer } from "@cocalc/conat/socket";
6668
import { throttle } from "lodash";
@@ -172,10 +174,10 @@ export class ConatServer extends EventEmitter {
172174
public state: State = "init";
173175

174176
private subscriptions: { [socketId: string]: Set<string> } = {};
175-
private interest: Patterns<{ [queue: string]: Set<string> }> = new Patterns();
177+
private interest: Interest = new Patterns();
176178
// the target string is JSON.stringify({ id: string; subject: string }),
177179
// which is the socket.io room to send the messages to.
178-
private sticky: { [pattern: string]: { [subject: string]: string } } = {};
180+
private sticky: Sticky = {};
179181

180182
private clusterStreams?: ClusterStreams;
181183
private clusterLinks: {
@@ -1585,10 +1587,8 @@ function getAddress(socket) {
15851587

15861588
export function updateInterest(
15871589
update: InterestUpdate,
1588-
interest: Patterns<{ [queue: string]: Set<string> }>,
1589-
sticky: {
1590-
[pattern: string]: { [subject: string]: string };
1591-
},
1590+
interest: Interest,
1591+
sticky: Sticky,
15921592
) {
15931593
const { op, subject, queue, room } = update;
15941594
const groups = interest.get(subject);
@@ -1626,12 +1626,7 @@ export function updateInterest(
16261626
}
16271627

16281628
// returns true if this update actually causes a change to sticky
1629-
export function updateSticky(
1630-
update: StickyUpdate,
1631-
sticky: {
1632-
[pattern: string]: { [subject: string]: string };
1633-
},
1634-
): boolean {
1629+
export function updateSticky(update: StickyUpdate, sticky: Sticky): boolean {
16351630
const { pattern, subject, target } = update;
16361631
if (sticky[pattern] === undefined) {
16371632
sticky[pattern] = {};

0 commit comments

Comments
 (0)