Skip to content

Commit 78deaad

Browse files
committed
implement self-contained sticky routing in case of non-cluster
1 parent 2f37929 commit 78deaad

File tree

2 files changed

+142
-26
lines changed

2 files changed

+142
-26
lines changed

src/packages/conat/core/server.ts

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,18 @@ import {
6666
} from "./cluster";
6767
import { type ConatSocketServer } from "@cocalc/conat/socket";
6868
import { throttle } from "lodash";
69-
import { getLogger } from "@cocalc/conat/client";
7069
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
7170
import { type SysConatServer, sysApiSubject, sysApi } from "./sys";
7271
import { forkedConatServer } from "./start-server";
73-
import { stickyChoice } from "./sticky";
72+
import {
73+
stickyChoice,
74+
createStickyRouter,
75+
getStickyTarget,
76+
stickyKey,
77+
} from "./sticky";
7478
import { EventEmitter } from "events";
7579
import { Metrics } from "../types";
80+
import { getLogger } from "@cocalc/conat/client";
7681

7782
const logger = getLogger("conat:core:server");
7883

@@ -171,6 +176,8 @@ export class ConatServer extends EventEmitter {
171176
private getUser: UserFunction;
172177
private isAllowed: AllowFunction;
173178
public readonly options: Partial<Options>;
179+
180+
// cluster = true if and only if this is a cluster:
174181
private cluster?: boolean;
175182

176183
private sockets: { [id: string]: any } = {};
@@ -180,6 +187,7 @@ export class ConatServer extends EventEmitter {
180187

181188
private subscriptions: { [socketId: string]: Set<string> } = {};
182189
public interest: Interest = new Patterns();
190+
private stickyClient: Client;
183191

184192
private clusterStreams?: ClusterStreams;
185193
private clusterLinks: {
@@ -277,6 +285,7 @@ export class ConatServer extends EventEmitter {
277285
}
278286
this.initUsage();
279287
this.io.on("connection", this.handleSocket);
288+
this.initSticky();
280289
this.init();
281290
}
282291

@@ -443,15 +452,21 @@ export class ConatServer extends EventEmitter {
443452
// STICKY QUEUE GROUPS
444453
///////////////////////////////////////
445454

446-
private stickyKey = ({ pattern, subject }) => {
447-
return pattern + " " + subject;
455+
private initSticky = () => {
456+
this.stickyClient = this.client({
457+
systemAccountPassword: this.options.systemAccountPassword,
458+
});
459+
if (!this.cluster) {
460+
// not a cluster, so we can server as the sticky routing serivce
461+
createStickyRouter({ client: this.stickyClient });
462+
}
448463
};
449464

450465
private stickyCache: { [key: string]: { target: string; expire: number } } =
451466
{};
452467
private updateSticky = (sticky: StickyUpdate) => {
453468
// save in the cache
454-
this.stickyCache[this.stickyKey(sticky)] = {
469+
this.stickyCache[stickyKey(sticky)] = {
455470
target: sticky.target,
456471
expire: Date.now() + sticky.ttl,
457472
};
@@ -464,18 +479,7 @@ export class ConatServer extends EventEmitter {
464479
pattern: string;
465480
subject: string;
466481
}): string | undefined => {
467-
const key = this.stickyKey({ pattern, subject });
468-
const x = this.stickyCache[key];
469-
if (x != null) {
470-
if (Date.now() >= x.expire) {
471-
// it's in the cache
472-
return x.target;
473-
} else {
474-
delete this.stickyCache[key];
475-
}
476-
}
477-
// not in the cache or expired
478-
return undefined;
482+
return getStickyTarget({ stickyCache: this.stickyCache, pattern, subject });
479483
};
480484

481485
///////////////////////////////////////
@@ -763,13 +767,27 @@ export class ConatServer extends EventEmitter {
763767
if (targets.size == 0) {
764768
return undefined;
765769
}
766-
return await stickyChoice({
767-
pattern,
768-
subject,
769-
targets,
770-
updateSticky: this.updateSticky,
771-
getStickyTarget: this.getStickyTarget,
772-
});
770+
try {
771+
const target = await stickyChoice({
772+
client: this.stickyClient,
773+
pattern,
774+
subject,
775+
targets,
776+
updateSticky: this.updateSticky,
777+
getStickyTarget: this.getStickyTarget,
778+
});
779+
return target;
780+
} catch (err) {
781+
this.log("WARNING: sticky router is not working", err);
782+
// not possible to make assignment, e.g., not able
783+
// to connect to the sticky service. Can and should
784+
// happen in case of a split brain (say). This will
785+
// make it so messages stop being delivered and hopefully
786+
// client gets an error, and keeps retrying until the
787+
// sticky service is back. This is of course the tradeoff
788+
// of using a centralized algorithm for sticky routing.
789+
return undefined;
790+
}
773791
};
774792

775793
stickyClusterLoadBalance = async ({

src/packages/conat/core/sticky.ts

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import ConsistentHash from "consistent-hash";
22
import { hash_string } from "@cocalc/util/misc";
3+
import { type Client } from "./client";
4+
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
5+
import { getLogger } from "@cocalc/conat/client";
6+
7+
const logger = getLogger("conat:core:sticky");
38

49
export function consistentHashingChoice(
510
v: Set<string>,
@@ -25,13 +30,101 @@ export function consistentHashingChoice(
2530
return hr.get(hash_string(resource));
2631
}
2732

33+
const SUBJECT = "sticky.one";
34+
const DEFAULT_TTL = 15_000;
35+
36+
export function stickyKey({ pattern, subject }) {
37+
return pattern + " " + subject;
38+
}
39+
40+
export function getStickyTarget({
41+
stickyCache,
42+
pattern,
43+
subject,
44+
}: {
45+
stickyCache: { [key: string]: { target: string; expire: number } };
46+
pattern: string;
47+
subject: string;
48+
}): string | undefined {
49+
const key = stickyKey({ pattern, subject });
50+
const x = stickyCache[key];
51+
if (x != null) {
52+
if (Date.now() <= x.expire) {
53+
// it's in the cache
54+
return x.target;
55+
} else {
56+
delete stickyCache[key];
57+
}
58+
}
59+
// not in the cache or expired
60+
return undefined;
61+
}
62+
63+
export async function createStickyRouter({ client }: { client: Client }) {
64+
const sub = await client.subscribe(SUBJECT);
65+
const stickyCache: { [key: string]: { target: string; expire: number } } = {};
66+
67+
const handle = async (mesg) => {
68+
try {
69+
const { pattern, subject, targets } = mesg.data;
70+
const key = stickyKey({ pattern, subject });
71+
let target = getStickyTarget({
72+
stickyCache,
73+
pattern,
74+
subject,
75+
});
76+
if (target == null || !targets.includes(target)) {
77+
// make a new choice
78+
target = consistentHashingChoice(targets, subject);
79+
stickyCache[key] = { target, expire: Date.now() + DEFAULT_TTL };
80+
}
81+
await mesg.respond({ target, ttl: DEFAULT_TTL });
82+
} catch (err) {
83+
logger.debug("WARNING: unable to handle routing message", err);
84+
}
85+
};
86+
const listen = async () => {
87+
for await (const mesg of sub) {
88+
handle(mesg);
89+
}
90+
};
91+
listen();
92+
}
93+
94+
const stickyRequest = reuseInFlight(
95+
async (
96+
client: Client,
97+
{
98+
subject,
99+
pattern,
100+
targets,
101+
}: {
102+
subject: string;
103+
pattern: string;
104+
targets: string[];
105+
},
106+
) => {
107+
return await client.request(SUBJECT, {
108+
pattern,
109+
subject,
110+
targets,
111+
});
112+
},
113+
{
114+
createKey: (args) =>
115+
args[0].id + " " + args[1].subject + " " + args[1].pattern,
116+
},
117+
);
118+
28119
export async function stickyChoice({
120+
client,
29121
subject,
30122
pattern,
31123
targets,
32124
updateSticky,
33125
getStickyTarget,
34126
}: {
127+
client: Client;
35128
subject: string;
36129
pattern: string;
37130
targets: Set<string>;
@@ -45,8 +138,13 @@ export async function stickyChoice({
45138
subject = v.slice(0, v.length - 1).join(".");
46139
const currentTarget = getStickyTarget({ pattern, subject });
47140
if (currentTarget === undefined || !targets.has(currentTarget)) {
48-
const target = consistentHashingChoice(targets, subject);
49-
updateSticky({ pattern, subject, target, ttl: 5_000 });
141+
const resp = await stickyRequest(client, {
142+
pattern,
143+
subject,
144+
targets: Array.from(targets),
145+
});
146+
const { target, ttl = DEFAULT_TTL } = resp.data;
147+
updateSticky({ pattern, subject, target, ttl });
50148
return target;
51149
}
52150
return currentTarget;

0 commit comments

Comments
 (0)