Skip to content

Commit 9f062e9

Browse files
committed
implement algorithm for saving sticky routing data when node leaves cluster
1 parent b8278f9 commit 9f062e9

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

src/packages/backend/conat/test/cluster/cluster-sticky-state.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ describe("ensure sticky state sync and use is working properly", () => {
1919
clients = servers.map((x) => x.client());
2020
});
2121

22-
const count = 5;
22+
const count = 10;
2323
it(`create ${count} distinct sticky subscriptions and send one message to each to creat sticky routing state on servers[0]`, async () => {
2424
clients.push(servers[0].client());
2525
clients.push(servers[1].client());
@@ -72,10 +72,11 @@ describe("ensure sticky state sync and use is working properly", () => {
7272
it("send message from clients[1] to each subject", async () => {
7373
for (let i = 0; i < count; i++) {
7474
await clients[1].publish(`subject.${i}.foo`);
75+
console.log(servers[1].sticky);
7576
}
7677
});
7778

78-
it.skip(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
79+
it(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
7980
const v = Object.keys(servers[1].sticky).filter((s) =>
8081
s.startsWith("subject."),
8182
);

src/packages/conat/core/server.ts

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,87 @@ export class ConatServer extends EventEmitter {
505505
// effort. The main advantage is that no communication
506506
// or coordination between nodes is needed to "fix or agree
507507
// on something", and that's a huge advantage!!
508+
509+
// This choice algorithm is used in saveNonredundantStickyState below!
510+
// **Don't change this without changing that!!**
508511
return Array.from(targets).sort()[0];
509512
};
510513

514+
private saveNonredundantStickyState = (link: ClusterLink) => {
515+
// When a link is about to be closed (e.g., the node died or is lost),
516+
// we store its non-redundant sticky state in our own sticky state,
517+
// so those routing choices aren't lost. Hopefully only a single
518+
// node in the cluster stores this info (due to the randomness of removing,
519+
// it'll be the one that happens to do this first), but if more than one
520+
// does, it's just less efficient.
521+
522+
if (link.clusterName != this.clusterName) {
523+
// only worry about nodes in the same cluster
524+
return;
525+
}
526+
const cluster = this.clusterLinks[this.clusterName];
527+
const isRedundant = (pattern, subject, target) => {
528+
let t = this.sticky[pattern]?.[subject];
529+
if (t == target) {
530+
// we already have it -- not redundnant
531+
return true;
532+
}
533+
534+
for (const id in cluster) {
535+
const link = cluster[id];
536+
if (id == link.id) {
537+
continue;
538+
}
539+
const s = cluster[id].sticky[pattern]?.subject;
540+
if (s !== undefined) {
541+
if (s == target) {
542+
// someone else has it, so definitely not redundant
543+
return true;
544+
}
545+
if (t === undefined || s < t) {
546+
t = s;
547+
}
548+
}
549+
}
550+
// nobody else has this mapping... but maybe it's not used
551+
// due to other conflicting ones?
552+
if (t !== undefined && t < target) {
553+
// target wouldn't be used since there's conflicting ones that are smaller
554+
return true;
555+
}
556+
// target *would* be used, but nobody else knows it, so we probably must save it.
557+
// Make sure the pattern is still of interest first
558+
if (this.interest.hasPattern(pattern)) {
559+
// we need it!
560+
return false;
561+
}
562+
for (const id in cluster) {
563+
const link = cluster[id];
564+
if (id == link.id) {
565+
continue;
566+
}
567+
if (link.interest.hasPattern(pattern)) {
568+
return false;
569+
}
570+
}
571+
// nothing in the remaining cluster is subscribed to this pattern, so
572+
// no point in preserving this sticky routing info
573+
return true;
574+
};
575+
576+
// { [pattern: string]: { [subject: string]: string } }
577+
for (const pattern in link.sticky) {
578+
const x = link.sticky[pattern];
579+
for (const subject in x) {
580+
const target = x[subject];
581+
if (!isRedundant(pattern, subject, target)) {
582+
// we save the assignment
583+
this.updateSticky({ pattern, subject, target });
584+
}
585+
}
586+
}
587+
};
588+
511589
///////////////////////////////////////
512590
// SUBSCRIBE and PUBLISH
513591
///////////////////////////////////////
@@ -1223,6 +1301,8 @@ export class ConatServer extends EventEmitter {
12231301
// already gone
12241302
return;
12251303
}
1304+
1305+
this.saveNonredundantStickyState(link);
12261306
link.close();
12271307
delete this.clusterLinks[link.clusterName][link.id];
12281308
delete this.clusterLinksByAddress[link.address];
@@ -1474,7 +1554,7 @@ export class ConatServer extends EventEmitter {
14741554
signal,
14751555
);
14761556
}
1477-
// check if there is already interest in the local cluster
1557+
// check if there is already known interest
14781558
const links = this.superclusterLinks();
14791559
for (const link of links) {
14801560
if (link.hasInterest(subject)) {

0 commit comments

Comments
 (0)