Skip to content

Commit 3dabed3

Browse files
committed
fix off-by-one issue that made publishing in a cluster less efficient when there were no headers
1 parent 9f062e9 commit 3dabed3

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

src/packages/backend/conat/test/cluster/cluster-sticky-state.test.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
waitForConsistentState,
66
wait,
77
addNodeToDefaultCluster,
8+
delay,
89
} from "@cocalc/backend/conat/test/setup";
910
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
1011

@@ -19,7 +20,7 @@ describe("ensure sticky state sync and use is working properly", () => {
1920
clients = servers.map((x) => x.client());
2021
});
2122

22-
const count = 10;
23+
const count = 1;
2324
it(`create ${count} distinct sticky subscriptions and send one message to each to creat sticky routing state on servers[0]`, async () => {
2425
clients.push(servers[0].client());
2526
clients.push(servers[1].client());
@@ -37,8 +38,11 @@ describe("ensure sticky state sync and use is working properly", () => {
3738
await clients[0].subscribe(`subject.${i}.*`, {
3839
queue: STICKY_QUEUE_GROUP,
3940
});
40-
// cause choice to be made and saved on servers[0]
41-
await clients[0].publish(`subject.${i}.foo`);
41+
// publishing causes a choice to be made and saved on servers[0]
42+
await clients[0].publish(`subject.${i}.foo`, "hello");
43+
expect(servers[0].sticky[`subject.${i}.*`]).not.toBe(undefined);
44+
// but no choice on servers[1]
45+
expect(servers[1].sticky[`subject.${i}.*`]).toBe(undefined);
4246
}
4347
});
4448

@@ -49,7 +53,7 @@ describe("ensure sticky state sync and use is working properly", () => {
4953
expect(v.length).toBe(count);
5054
});
5155

52-
it.skip(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
56+
it(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
5357
const v = Object.keys(servers[1].sticky).filter((s) =>
5458
s.startsWith("subject."),
5559
);
@@ -72,7 +76,6 @@ describe("ensure sticky state sync and use is working properly", () => {
7276
it("send message from clients[1] to each subject", async () => {
7377
for (let i = 0; i < count; i++) {
7478
await clients[1].publish(`subject.${i}.foo`);
75-
console.log(servers[1].sticky);
7679
}
7780
});
7881

src/packages/conat/core/server.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,9 +715,10 @@ export class ConatServer extends EventEmitter {
715715
});
716716
}
717717

718-
// note -- position 6 of data is a no-forward flag, to avoid
718+
// note -- position 6 of data is special cluster delivery data, to avoid
719719
// a message bouncing back and forth in case the interest stream
720-
// were slightly out of sync.
720+
// were slightly out of sync and also so we no exactly where
721+
// to deliver the message.
721722
const targets = data[6];
722723
if (targets != null) {
723724
return this.deliver({ subject, data, targets });
@@ -798,7 +799,10 @@ export class ConatServer extends EventEmitter {
798799
// sending this...
799800
for (const id in outsideTargets) {
800801
const link = this.clusterLinks[this.clusterName]?.[id];
801-
const data1 = [subject, ...data, outsideTargets[id]];
802+
const data1 = [subject, ...data];
803+
// use explicit index since length of data depends on
804+
// whether or not there are headers!
805+
data1[7] = outsideTargets[id];
802806
count += 1;
803807
link?.client.conn.emit("publish", data1);
804808
}

0 commit comments

Comments
 (0)