Skip to content

Commit 2f37929

Browse files
committed
conat server: delete all of the distributed state involving stickiness -- we will redo this next to be much simpler
1 parent 45bcc84 commit 2f37929

File tree

3 files changed

+44
-265
lines changed

3 files changed

+44
-265
lines changed

src/packages/conat/core/cluster.ts

Lines changed: 8 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
import { type Client, connect } from "./client";
22
import { Patterns } from "./patterns";
3-
import {
4-
updateInterest,
5-
updateSticky,
6-
type InterestUpdate,
7-
type StickyUpdate,
8-
} from "@cocalc/conat/core/server";
3+
import { updateInterest, type InterestUpdate } from "@cocalc/conat/core/server";
94
import type { DStream } from "@cocalc/conat/sync/dstream";
105
import { once } from "@cocalc/util/async-utils";
116
import { server as createPersistServer } from "@cocalc/conat/persist/server";
@@ -47,14 +42,12 @@ export async function clusterLink(
4742
return link;
4843
}
4944

50-
export type Sticky = { [pattern: string]: { [subject: string]: string } };
5145
export type Interest = Patterns<{ [queue: string]: Set<string> }>;
5246

5347
export { type ClusterLink };
5448

5549
class ClusterLink {
5650
public interest: Interest = new Patterns();
57-
public sticky: Sticky = {};
5851
private streams: ClusterStreams;
5952
private state: "init" | "ready" | "closed" = "init";
6053
private clientStateChanged = Date.now(); // when client status last changed
@@ -85,10 +78,7 @@ class ClusterLink {
8578
clusterName: this.clusterName,
8679
});
8780
for (const update of this.streams.interest.getAll()) {
88-
updateInterest(update, this.interest, this.sticky);
89-
}
90-
for (const update of this.streams.sticky.getAll()) {
91-
updateSticky(update, this.sticky);
81+
updateInterest(update, this.interest);
9282
}
9383
// I have a slight concern about this because updates might not
9484
// arrive in order during automatic failover. That said, maybe
@@ -97,7 +87,6 @@ class ClusterLink {
9787
// it is about, and when that server goes down none of this state
9888
// matters anymore.
9989
this.streams.interest.on("change", this.handleInterestUpdate);
100-
this.streams.sticky.on("change", this.handleStickyUpdate);
10190
this.state = "ready";
10291
};
10392

@@ -106,11 +95,7 @@ class ClusterLink {
10695
};
10796

10897
handleInterestUpdate = (update: InterestUpdate) => {
109-
updateInterest(update, this.interest, this.sticky);
110-
};
111-
112-
handleStickyUpdate = (update: StickyUpdate) => {
113-
updateSticky(update, this.sticky);
98+
updateInterest(update, this.interest);
11499
};
115100

116101
private handleClientStateChanged = () => {
@@ -134,7 +119,6 @@ class ClusterLink {
134119
if (this.streams != null) {
135120
this.streams.interest.removeListener("change", this.handleInterestUpdate);
136121
this.streams.interest.close();
137-
this.streams.sticky.close();
138122
// @ts-ignore
139123
delete this.streams;
140124
}
@@ -178,10 +162,9 @@ class ClusterLink {
178162
return false;
179163
};
180164

181-
hash = (): { interest: number; sticky: number } => {
165+
hash = (): { interest: number } => {
182166
return {
183167
interest: hashInterest(this.interest),
184-
sticky: hashSticky(this.sticky),
185168
};
186169
};
187170
}
@@ -195,7 +178,6 @@ function clusterStreamNames({
195178
}) {
196179
return {
197180
interest: `cluster/${clusterName}/${id}/interest`,
198-
sticky: `cluster/${clusterName}/${id}/sticky`,
199181
};
200182
}
201183

@@ -225,7 +207,6 @@ export async function createClusterPersistServer({
225207

226208
export interface ClusterStreams {
227209
interest: DStream<InterestUpdate>;
228-
sticky: DStream<StickyUpdate>;
229210
}
230211

231212
export async function clusterStreams({
@@ -252,27 +233,21 @@ export async function clusterStreams({
252233
name: names.interest,
253234
...opts,
254235
});
255-
const sticky = await client.sync.dstream<StickyUpdate>({
256-
noInventory: true,
257-
name: names.sticky,
258-
...opts,
259-
});
260236
logger.debug("clusterStreams: got them", { clusterName });
261-
return { interest, sticky };
237+
return { interest };
262238
}
263239

264240
// Periodically delete not-necessary updates from the interest stream
265241
export async function trimClusterStreams(
266242
streams: ClusterStreams,
267243
data: {
268244
interest: Patterns<{ [queue: string]: Set<string> }>;
269-
sticky: { [pattern: string]: { [subject: string]: string } };
270245
links: { interest: Patterns<{ [queue: string]: Set<string> }> }[];
271246
},
272247
// don't delete anything that isn't at lest minAge ms old.
273248
minAge: number,
274-
): Promise<{ seqsInterest: number[]; seqsSticky: number[] }> {
275-
const { interest, sticky } = streams;
249+
): Promise<{ seqsInterest: number[] }> {
250+
const { interest } = streams;
276251
// First deal with interst
277252
// we iterate over the interest stream checking for subjects
278253
// with no current interest at all; in such cases it is safe
@@ -300,45 +275,7 @@ export async function trimClusterStreams(
300275
logger.debug("trimClusterStream: successfully trimmed interest", { seqs });
301276
}
302277

303-
// Next deal with sticky -- trim ones where the pattern is no longer of interest.
304-
// There could be other reasons to trim but it gets much trickier. This one is more
305-
// obvious, except we have to check for any interest in the whole cluster, not
306-
// just this node.
307-
const seqs2: number[] = [];
308-
function noInterest(pattern: string) {
309-
if (data.interest.hasPattern(pattern)) {
310-
return false;
311-
}
312-
for (const link of data.links) {
313-
if (link.interest.hasPattern(pattern)) {
314-
return false;
315-
}
316-
}
317-
// nobody cares
318-
return true;
319-
}
320-
for (let n = 0; n < sticky.length; n++) {
321-
const time = sticky.time(n);
322-
if (time == null) continue;
323-
if (now - time.valueOf() <= minAge) {
324-
break;
325-
}
326-
const update = sticky.get(n) as StickyUpdate;
327-
if (noInterest(update.pattern)) {
328-
const seq = sticky.seq(n);
329-
if (seq != null) {
330-
seqs2.push(seq);
331-
}
332-
}
333-
}
334-
if (seqs2.length > 0) {
335-
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
336-
logger.debug("trimClusterStream: trimming sticky", { seqs2 });
337-
await sticky.delete({ seqs: seqs2 });
338-
logger.debug("trimClusterStream: successfully trimmed sticky", { seqs2 });
339-
}
340-
341-
return { seqsInterest: seqs, seqsSticky: seqs2 };
278+
return { seqsInterest: seqs };
342279
}
343280

344281
function hashSet(X: Set<string>): number {
@@ -362,15 +299,3 @@ export function hashInterest(
362299
): number {
363300
return interest.hash(hashInterestValue);
364301
}
365-
366-
export function hashSticky(sticky: Sticky): number {
367-
let h = 0;
368-
for (const pattern in sticky) {
369-
h += hash_string(pattern);
370-
const x = sticky[pattern];
371-
for (const subject in x) {
372-
h += hash_string(x[subject]);
373-
}
374-
}
375-
return h;
376-
}

0 commit comments

Comments
 (0)