Skip to content

Commit d3d8c3e

Browse files
committed
make listening on > be silent and document the subtle issue
1 parent 48942fe commit d3d8c3e

File tree

6 files changed

+73
-18
lines changed

6 files changed

+73
-18
lines changed

src/packages/backend/conat/test/cluster/cluster-sticky-state.test.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
waitForConsistentState,
66
wait,
77
addNodeToDefaultCluster,
8-
delay,
98
} from "@cocalc/backend/conat/test/setup";
109
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
1110
import { randomId } from "@cocalc/conat/names";
@@ -152,6 +151,7 @@ describe("ensure sticky state sync and use is working properly", () => {
152151

153152
it("double check the links have the sticky state", () => {
154153
for (const server of servers.slice(1)) {
154+
// @ts-ignore
155155
const link = server.clusterLinksByAddress[servers[0].address()];
156156
const v = Object.keys(link.sticky).filter((s) =>
157157
s.startsWith("subject."),
@@ -165,6 +165,17 @@ describe("ensure sticky state sync and use is working properly", () => {
165165
deliveryTest,
166166
);
167167

168+
it("listen on > and note that it doesn't impact the count", async () => {
169+
const sub = await clients[0].subscribe(">");
170+
for (let i = 0; i < servers.length; i++) {
171+
const { count } = await servers[i]
172+
.client()
173+
.publish("subject.0.foo", "hi");
174+
expect(count).toBe(1);
175+
}
176+
sub.close();
177+
});
178+
168179
it("unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]", async () => {
169180
await servers[1].unjoin({ address: servers[0].address() });
170181
const v = Object.keys(servers[1].sticky).filter((s) =>

src/packages/backend/conat/test/persist/multiple-servers.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Unit tests of multiple persist servers at once:
44
- making numerous distinct clients and seeing that they are distributed across persist servers
55
- stopping a persist server and seeing that failover happens without data loss
66
7-
pnpm test `pwd`/multiple-servers.test.ts
7+
pnpm test `pwd`/multiple-servers.test.ts
88
99
*/
1010

src/packages/backend/conat/test/socket/basic.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
3-
pnpm test `pwd`/basic.test.ts
3+
pnpm test `pwd`/basic.test.ts
44
55
*/
66

src/packages/backend/conat/test/socket/restarts.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ beforeAll(async () => {
1818
setDefaultTimeouts({ request: 500, publish: 500 });
1919
});
2020

21-
//jest.setTimeout(25000);
22-
2321
describe("create a client and server and socket, verify it works, restart conat server, then confirm that socket still works", () => {
2422
const SUBJECT = "reconnect.one";
2523

src/packages/conat/core/client.ts

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ much better in so many ways:
2222
- JsonCodec: uses JSON.stringify and TextEncoder. This does not work
2323
with Buffer or Date and is less compact, but can be very fast.
2424
25+
- One BIG DIFFERENCE from Nats is that when a message is sent the sender
26+
can optionally find out how many clients received it. They can also wait
27+
until there is interest and send the message again. This is automated so
28+
it's very easy to use, and it makes writing distributed services without
29+
race conditions making them broken temporarily much easier. HOWEVER,
30+
there is one caveat -- if as an admin you create a "tap", i.e., you
31+
subscribe to all messages matching some pattern just to see what's going
32+
on, then currently that counts in the delivery count and interest, and that
33+
would then cause these race conditions to happen again. E.g., a user
34+
signs in, subscribes to their INBOX, sends a request, and gets a response
35+
to that inbox, but does this all quickly, and in a cluster, the server doesn't
36+
see the inbox yet, so it fails. As a workaround, subscriptions to the
37+
subject pattern '>' are invisible, so you can always tap into '>' for debugging
38+
purposes. TODO: implement a general way of making an invisible subscriber that
39+
doesn't count.
40+
2541
2642
THE CORE API
2743
@@ -604,13 +620,14 @@ export class Client extends EventEmitter {
604620
);
605621
}
606622
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
607-
const response = await this.conn
608-
.timeout(timeout ? timeout : 10000)
609-
.emitWithAck("wait-for-interest", { subject, timeout });
610-
if (response.error) {
611-
throw new ConatError(response.error, { code: response.code });
623+
try {
624+
const response = await this.conn
625+
.timeout(timeout ? timeout : 10000)
626+
.emitWithAck("wait-for-interest", { subject, timeout });
627+
return response;
628+
} catch (err) {
629+
throw toConatError(err);
612630
}
613-
return response;
614631
};
615632

616633
recvStats = (bytes: number) => {
@@ -954,7 +971,7 @@ export class Client extends EventEmitter {
954971
});
955972
}
956973
} catch (err) {
957-
throw new ConatError(`${err}`, { code: 408 });
974+
throw toConatError(err);
958975
}
959976
if (response?.error) {
960977
throw new ConatError(response.error, { code: response.code });
@@ -1300,9 +1317,7 @@ export class Client extends EventEmitter {
13001317
return response;
13011318
}
13021319
} catch (err) {
1303-
throw new ConatError(`timeout - ${subject} - ${err}`, {
1304-
code: 408,
1305-
});
1320+
throw toConatError(err);
13061321
}
13071322
} else {
13081323
return await this.conn.emitWithAck("publish", v);
@@ -1910,3 +1925,15 @@ function isEmpty(obj: object): boolean {
19101925
}
19111926
return true;
19121927
}
1928+
1929+
function toConatError(socketIoError) {
1930+
// only errors are "disconnected" and a timeout
1931+
const e = `${socketIoError}`;
1932+
if (e.includes("disconnected")) {
1933+
return e;
1934+
} else {
1935+
return new ConatError(`timeout - ${e}`, {
1936+
code: 408,
1937+
});
1938+
}
1939+
}

src/packages/conat/core/server.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,9 @@ export class ConatServer extends EventEmitter {
743743
});
744744
if (target !== undefined) {
745745
this.io.to(target).emit(pattern, { subject, data });
746-
count += 1;
746+
if (!isSilentPattern(pattern)) {
747+
count++;
748+
}
747749
}
748750
}
749751
}
@@ -773,7 +775,9 @@ export class ConatServer extends EventEmitter {
773775
if (id == this.id) {
774776
// another client of this server
775777
this.io.to(target).emit(pattern, { subject, data });
776-
count += 1;
778+
if (!isSilentPattern(pattern)) {
779+
count++;
780+
}
777781
} else {
778782
// client connected to a different server -- we note this, and
779783
// will send everything for each server at once, instead of
@@ -803,7 +807,11 @@ export class ConatServer extends EventEmitter {
803807
// use explicit index since length of data depends on
804808
// whether or not there are headers!
805809
data1[7] = outsideTargets[id];
806-
count += 1;
810+
for (const { pattern } of data1[7]) {
811+
if (!isSilentPattern(pattern)) {
812+
count++;
813+
}
814+
}
807815
link?.client.conn.emit("publish", data1);
808816
}
809817

@@ -1797,6 +1805,17 @@ function getServerAddress(options: Options) {
17971805
return `http${options.ssl || port == 443 ? "s" : ""}://${options.clusterIpAddress ?? "localhost"}:${port}${path}`;
17981806
}
17991807

1808+
/*
1809+
Without this, if an admin subscribed to '>' then suddenly all the algorithms
1810+
for responding to messages, sockets, etc., based on "waiting for interest"
1811+
would start failing. The following is a hack to allow subscribing to '>'.
1812+
Really we need something more general for other admin "wire taps", but
1813+
this will have to do for now.
1814+
*/
1815+
function isSilentPattern(pattern: string): boolean {
1816+
return pattern == ">";
1817+
}
1818+
18001819
/*
18011820
const watching = new Set(["xyz"]);
18021821
let last = Date.now();

0 commit comments

Comments
 (0)