1
1
import { type Client , connect } from "./client" ;
2
2
import { Patterns } from "./patterns" ;
3
- import { updateInterest , type InterestUpdate } from "@cocalc/conat/core/server" ;
3
+ import {
4
+ updateInterest ,
5
+ updateSticky ,
6
+ type InterestUpdate ,
7
+ type StickyUpdate ,
8
+ } from "@cocalc/conat/core/server" ;
4
9
import type { DStream } from "@cocalc/conat/sync/dstream" ;
5
10
import { once } from "@cocalc/util/async-utils" ;
6
11
import { server as createPersistServer } from "@cocalc/conat/persist/server" ;
@@ -42,12 +47,14 @@ export async function clusterLink(
42
47
return link ;
43
48
}
44
49
50
+ export type Sticky = { [ pattern : string ] : { [ subject : string ] : string } } ;
45
51
export type Interest = Patterns < { [ queue : string ] : Set < string > } > ;
46
52
47
53
export { type ClusterLink } ;
48
54
49
55
class ClusterLink {
50
56
public interest : Interest = new Patterns ( ) ;
57
+ public sticky : Sticky = { } ;
51
58
private streams : ClusterStreams ;
52
59
private state : "init" | "ready" | "closed" = "init" ;
53
60
private clientStateChanged = Date . now ( ) ; // when client status last changed
@@ -78,7 +85,10 @@ class ClusterLink {
78
85
clusterName : this . clusterName ,
79
86
} ) ;
80
87
for ( const update of this . streams . interest . getAll ( ) ) {
81
- updateInterest ( update , this . interest ) ;
88
+ updateInterest ( update , this . interest , this . sticky ) ;
89
+ }
90
+ for ( const update of this . streams . sticky . getAll ( ) ) {
91
+ updateSticky ( update , this . sticky ) ;
82
92
}
83
93
// I have a slight concern about this because updates might not
84
94
// arrive in order during automatic failover. That said, maybe
@@ -87,6 +97,7 @@ class ClusterLink {
87
97
// it is about, and when that server goes down none of this state
88
98
// matters anymore.
89
99
this . streams . interest . on ( "change" , this . handleInterestUpdate ) ;
100
+ this . streams . sticky . on ( "change" , this . handleStickyUpdate ) ;
90
101
this . state = "ready" ;
91
102
} ;
92
103
@@ -95,7 +106,11 @@ class ClusterLink {
95
106
} ;
96
107
97
108
handleInterestUpdate = ( update : InterestUpdate ) => {
98
- updateInterest ( update , this . interest ) ;
109
+ updateInterest ( update , this . interest , this . sticky ) ;
110
+ } ;
111
+
112
+ handleStickyUpdate = ( update : StickyUpdate ) => {
113
+ updateSticky ( update , this . sticky ) ;
99
114
} ;
100
115
101
116
private handleClientStateChanged = ( ) => {
@@ -119,6 +134,7 @@ class ClusterLink {
119
134
if ( this . streams != null ) {
120
135
this . streams . interest . removeListener ( "change" , this . handleInterestUpdate ) ;
121
136
this . streams . interest . close ( ) ;
137
+ this . streams . sticky . close ( ) ;
122
138
// @ts -ignore
123
139
delete this . streams ;
124
140
}
@@ -162,9 +178,10 @@ class ClusterLink {
162
178
return false ;
163
179
} ;
164
180
165
- hash = ( ) : { interest : number } => {
181
+ hash = ( ) : { interest : number ; sticky : number } => {
166
182
return {
167
183
interest : hashInterest ( this . interest ) ,
184
+ sticky : hashSticky ( this . sticky ) ,
168
185
} ;
169
186
} ;
170
187
}
@@ -178,6 +195,7 @@ function clusterStreamNames({
178
195
} ) {
179
196
return {
180
197
interest : `cluster/${ clusterName } /${ id } /interest` ,
198
+ sticky : `cluster/${ clusterName } /${ id } /sticky` ,
181
199
} ;
182
200
}
183
201
@@ -207,6 +225,7 @@ export async function createClusterPersistServer({
207
225
208
226
export interface ClusterStreams {
209
227
interest : DStream < InterestUpdate > ;
228
+ sticky : DStream < StickyUpdate > ;
210
229
}
211
230
212
231
export async function clusterStreams ( {
@@ -233,21 +252,27 @@ export async function clusterStreams({
233
252
name : names . interest ,
234
253
...opts ,
235
254
} ) ;
255
+ const sticky = await client . sync . dstream < StickyUpdate > ( {
256
+ noInventory : true ,
257
+ name : names . sticky ,
258
+ ...opts ,
259
+ } ) ;
236
260
logger . debug ( "clusterStreams: got them" , { clusterName } ) ;
237
- return { interest } ;
261
+ return { interest, sticky } ;
238
262
}
239
263
240
264
// Periodically delete not-necessary updates from the interest stream
241
265
export async function trimClusterStreams (
242
266
streams : ClusterStreams ,
243
267
data : {
244
268
interest : Patterns < { [ queue : string ] : Set < string > } > ;
269
+ sticky : { [ pattern : string ] : { [ subject : string ] : string } } ;
245
270
links : { interest : Patterns < { [ queue : string ] : Set < string > } > } [ ] ;
246
271
} ,
247
272
// don't delete anything that isn't at lest minAge ms old.
248
273
minAge : number ,
249
- ) : Promise < { seqsInterest : number [ ] } > {
250
- const { interest } = streams ;
274
+ ) : Promise < { seqsInterest : number [ ] ; seqsSticky : number [ ] } > {
275
+ const { interest, sticky } = streams ;
251
276
// First deal with interst
252
277
// we iterate over the interest stream checking for subjects
253
278
// with no current interest at all; in such cases it is safe
@@ -275,7 +300,45 @@ export async function trimClusterStreams(
275
300
logger . debug ( "trimClusterStream: successfully trimmed interest" , { seqs } ) ;
276
301
}
277
302
278
- return { seqsInterest : seqs } ;
303
+ // Next deal with sticky -- trim ones where the pattern is no longer of interest.
304
+ // There could be other reasons to trim but it gets much trickier. This one is more
305
+ // obvious, except we have to check for any interest in the whole cluster, not
306
+ // just this node.
307
+ const seqs2 : number [ ] = [ ] ;
308
+ function noInterest ( pattern : string ) {
309
+ if ( data . interest . hasPattern ( pattern ) ) {
310
+ return false ;
311
+ }
312
+ for ( const link of data . links ) {
313
+ if ( link . interest . hasPattern ( pattern ) ) {
314
+ return false ;
315
+ }
316
+ }
317
+ // nobody cares
318
+ return true ;
319
+ }
320
+ for ( let n = 0 ; n < sticky . length ; n ++ ) {
321
+ const time = sticky . time ( n ) ;
322
+ if ( time == null ) continue ;
323
+ if ( now - time . valueOf ( ) <= minAge ) {
324
+ break ;
325
+ }
326
+ const update = sticky . get ( n ) as StickyUpdate ;
327
+ if ( noInterest ( update . pattern ) ) {
328
+ const seq = sticky . seq ( n ) ;
329
+ if ( seq != null ) {
330
+ seqs2 . push ( seq ) ;
331
+ }
332
+ }
333
+ }
334
+ if ( seqs2 . length > 0 ) {
335
+ // [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
336
+ logger . debug ( "trimClusterStream: trimming sticky" , { seqs2 } ) ;
337
+ await sticky . delete ( { seqs : seqs2 } ) ;
338
+ logger . debug ( "trimClusterStream: successfully trimmed sticky" , { seqs2 } ) ;
339
+ }
340
+
341
+ return { seqsInterest : seqs , seqsSticky : seqs2 } ;
279
342
}
280
343
281
344
function hashSet ( X : Set < string > ) : number {
@@ -299,3 +362,15 @@ export function hashInterest(
299
362
) : number {
300
363
return interest . hash ( hashInterestValue ) ;
301
364
}
365
+
366
+ export function hashSticky ( sticky : Sticky ) : number {
367
+ let h = 0 ;
368
+ for ( const pattern in sticky ) {
369
+ h += hash_string ( pattern ) ;
370
+ const x = sticky [ pattern ] ;
371
+ for ( const subject in x ) {
372
+ h += hash_string ( x [ subject ] ) ;
373
+ }
374
+ }
375
+ return h ;
376
+ }
0 commit comments