5
5
waitForConsistentState ,
6
6
wait ,
7
7
addNodeToDefaultCluster ,
8
+ delay ,
8
9
} from "@cocalc/backend/conat/test/setup" ;
9
10
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client" ;
10
11
@@ -19,24 +20,30 @@ describe("ensure sticky state sync and use is working properly", () => {
19
20
clients = servers . map ( ( x ) => x . client ( ) ) ;
20
21
} ) ;
21
22
22
- const count = 25 ;
23
+ const count = 1 ;
24
+ const subs0 : any [ ] = [ ] ;
25
+ const subs1 : any [ ] = [ ] ;
23
26
it ( `create ${ count } distinct sticky subscriptions and send one message to each to creat sticky routing state on servers[0]` , async ( ) => {
24
27
clients . push ( servers [ 0 ] . client ( ) ) ;
25
28
clients . push ( servers [ 1 ] . client ( ) ) ;
26
29
for ( let i = 0 ; i < count ; i ++ ) {
27
- await clients [ 1 ] . subscribe ( `subject.${ i } .*` , {
28
- queue : STICKY_QUEUE_GROUP ,
29
- } ) ;
30
+ subs0 . push (
31
+ await clients [ 1 ] . subscribe ( `subject.${ i } .*` , {
32
+ queue : STICKY_QUEUE_GROUP ,
33
+ } ) ,
34
+ ) ;
30
35
// wait so above subscription is known to *both* servers:
31
36
// @ts -ignore
32
37
await servers [ 0 ] . waitForInterest (
33
38
`subject.${ i } .0` ,
34
39
5000 ,
35
40
clients [ 0 ] . conn . id ,
36
41
) ;
37
- await clients [ 0 ] . subscribe ( `subject.${ i } .*` , {
38
- queue : STICKY_QUEUE_GROUP ,
39
- } ) ;
42
+ subs1 . push (
43
+ await clients [ 0 ] . subscribe ( `subject.${ i } .*` , {
44
+ queue : STICKY_QUEUE_GROUP ,
45
+ } ) ,
46
+ ) ;
40
47
// publishing causes a choice to be made and saved on servers[0]
41
48
await clients [ 0 ] . publish ( `subject.${ i } .foo` , "hello" ) ;
42
49
expect ( servers [ 0 ] . sticky [ `subject.${ i } .*` ] ) . not . toBe ( undefined ) ;
@@ -85,13 +92,70 @@ describe("ensure sticky state sync and use is working properly", () => {
85
92
expect ( v . length ) . toBe ( 0 ) ;
86
93
} ) ;
87
94
88
- it ( "unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]" , async ( ) => {
95
+ async function deliveryTest ( ) {
96
+ for ( const server of servers ) {
97
+ const { count } = await server . client ( ) . publish ( "subject.0.foo" , "hello" ) ;
98
+ expect ( count ) . toBe ( 1 ) ;
99
+ }
100
+ const ids : string [ ] = [ ] ;
101
+ for ( let i = 0 ; i < servers . length ; i ++ ) {
102
+ // on of the subs will receive it and one will hang forever (which is fine)
103
+ const { value } = await Promise . race ( [ subs0 [ 0 ] . next ( ) , subs1 [ 0 ] . next ( ) ] ) ;
104
+ console . log ( i , value . data ) ;
105
+ expect ( value . data ) . toBe ( "hello" ) ;
106
+ ids . push ( value . client . id ) ;
107
+ }
108
+ // all messages must go to the SAME sub, since sticky
109
+ expect ( ids . length ) . toBe ( 1 ) ;
110
+ }
111
+
112
+ it ( "publish from every node to subject.0.foo" , deliveryTest ) ;
113
+
114
+ it . skip ( "unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]" , async ( ) => {
89
115
await servers [ 1 ] . unjoin ( { address : servers [ 0 ] . address ( ) } ) ;
90
116
const v = Object . keys ( servers [ 1 ] . sticky ) . filter ( ( s ) =>
91
117
s . startsWith ( "subject." ) ,
92
118
) ;
93
119
expect ( v . length ) . toBe ( count ) ;
94
120
} ) ;
121
+
122
+ it ( "rejoin node to cluster" , async ( ) => {
123
+ await servers [ 1 ] . join ( servers [ 0 ] . address ( ) ) ;
124
+ await waitForConsistentState ( servers ) ;
125
+ } ) ;
126
+
127
+ const count2 = 5 ;
128
+ it . skip ( `add ${ count2 } more nodes to the cluster should be reaonably fast and not blow up in a feedback loop` , async ( ) => {
129
+ for ( let i = 0 ; i < count2 ; i ++ ) {
130
+ await addNodeToDefaultCluster ( ) ;
131
+ }
132
+ await waitForConsistentState ( servers ) ;
133
+ } ) ;
134
+
135
+ it . skip ( "double check the links have the sticky state" , ( ) => {
136
+ for ( const server of servers . slice ( 1 ) ) {
137
+ const link = server . clusterLinksByAddress [ servers [ 0 ] . address ( ) ] ;
138
+ const v = Object . keys ( link . sticky ) . filter ( ( s ) =>
139
+ s . startsWith ( "subject." ) ,
140
+ ) ;
141
+ expect ( v . length ) . toBe ( count ) ;
142
+ }
143
+ } ) ;
144
+
145
+ it . skip ( "in bigger, cluster, publish from every node to subject.0.foo" , async ( ) => {
146
+ for ( const server of servers ) {
147
+ await server . client ( ) . publish ( "subject.0.foo" , "hello" ) ;
148
+ }
149
+ const ids : string [ ] = [ ] ;
150
+ for ( const _ of servers ) {
151
+ const { value } = await Promise . race ( [ subs0 [ 0 ] . next ( ) , subs1 [ 0 ] . next ( ) ] ) ;
152
+ console . log ( value . data ) ;
153
+ //expect(value.data).toBe("hello");
154
+ ids . push ( value . client . id ) ;
155
+ }
156
+ // all messages must go to same sub, since sticky
157
+ expect ( ids . length ) . toBe ( 1 ) ;
158
+ } ) ;
95
159
} ) ;
96
160
97
161
afterAll ( after ) ;
0 commit comments