8
8
delay ,
9
9
} from "@cocalc/backend/conat/test/setup" ;
10
10
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client" ;
11
+ import { randomId } from "@cocalc/conat/names" ;
11
12
12
13
beforeAll ( before ) ;
13
14
@@ -20,10 +21,10 @@ describe("ensure sticky state sync and use is working properly", () => {
20
21
clients = servers . map ( ( x ) => x . client ( ) ) ;
21
22
} ) ;
22
23
23
- const count = 1 ;
24
+ const count = 25 ;
24
25
const subs0 : any [ ] = [ ] ;
25
26
const subs1 : any [ ] = [ ] ;
26
- it ( `create ${ count } distinct sticky subscriptions and send one message to each to creat sticky routing state on servers[0]` , async ( ) => {
27
+ it ( `create ${ count } distinct sticky subscriptions and send one message to each to create sticky routing state on servers[0]` , async ( ) => {
27
28
clients . push ( servers [ 0 ] . client ( ) ) ;
28
29
clients . push ( servers [ 1 ] . client ( ) ) ;
29
30
for ( let i = 0 ; i < count ; i ++ ) {
@@ -52,6 +53,19 @@ describe("ensure sticky state sync and use is working properly", () => {
52
53
}
53
54
} ) ;
54
55
56
+ let chosen ;
57
+ it ( "see which subscription got chosen for subject.0.* -- this is useful later" , async ( ) => {
58
+ const p0 = async ( ) => {
59
+ await subs0 [ 0 ] . next ( ) ;
60
+ return 0 ;
61
+ } ;
62
+ const p1 = async ( ) => {
63
+ await subs1 [ 0 ] . next ( ) ;
64
+ return 1 ;
65
+ } ;
66
+ chosen = await Promise . race ( [ p0 ( ) , p1 ( ) ] ) ;
67
+ } ) ;
68
+
55
69
it ( `sticky on servers[0] should have ${ count } entries starting in "subject".` , async ( ) => {
56
70
const v = Object . keys ( servers [ 0 ] . sticky ) . filter ( ( s ) =>
57
71
s . startsWith ( "subject." ) ,
@@ -93,46 +107,48 @@ describe("ensure sticky state sync and use is working properly", () => {
93
107
} ) ;
94
108
95
109
async function deliveryTest ( ) {
110
+ const sub = chosen == 0 ? subs0 [ 0 ] : subs1 [ 0 ] ;
111
+
112
+ // clear up the subscription (we sent it stuff above)
113
+ const sentinel = randomId ( ) ;
114
+ await clients [ 0 ] . publish ( "subject.0.foo" , sentinel ) ;
115
+ while ( true ) {
116
+ const { value } = await sub . next ( ) ;
117
+ if ( value . data == sentinel ) {
118
+ break ;
119
+ }
120
+ }
96
121
for ( const server of servers ) {
97
- const { count } = await server . client ( ) . publish ( "subject.0.foo" , "hello" ) ;
122
+ const { count } = await server
123
+ . client ( )
124
+ . publish ( "subject.0.foo" , "delivery-test" ) ;
98
125
expect ( count ) . toBe ( 1 ) ;
99
126
}
100
- const ids : string [ ] = [ ] ;
127
+ const ids = new Set < string > ( ) ;
101
128
for ( let i = 0 ; i < servers . length ; i ++ ) {
102
129
// on of the subs will receive it and one will hang forever (which is fine)
103
- const { value } = await Promise . race ( [ subs0 [ 0 ] . next ( ) , subs1 [ 0 ] . next ( ) ] ) ;
104
- console . log ( i , value . data ) ;
105
- expect ( value . data ) . toBe ( "hello" ) ;
106
- ids . push ( value . client . id ) ;
130
+ const { value } = await sub . next ( ) ;
131
+ expect ( value . data ) . toBe ( "delivery-test" ) ;
132
+ ids . add ( value . client . id ) ;
107
133
}
108
- // all messages must go to the SAME sub , since sticky
109
- expect ( ids . length ) . toBe ( 1 ) ;
134
+ // all messages must go to the SAME subscriber , since sticky
135
+ expect ( ids . size ) . toBe ( 1 ) ;
110
136
}
111
137
112
138
it ( "publish from every node to subject.0.foo" , deliveryTest ) ;
113
139
114
- it . skip ( "unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]" , async ( ) => {
115
- await servers [ 1 ] . unjoin ( { address : servers [ 0 ] . address ( ) } ) ;
116
- const v = Object . keys ( servers [ 1 ] . sticky ) . filter ( ( s ) =>
117
- s . startsWith ( "subject." ) ,
118
- ) ;
119
- expect ( v . length ) . toBe ( count ) ;
120
- } ) ;
121
-
122
- it ( "rejoin node to cluster" , async ( ) => {
123
- await servers [ 1 ] . join ( servers [ 0 ] . address ( ) ) ;
124
- await waitForConsistentState ( servers ) ;
125
- } ) ;
126
-
127
140
const count2 = 5 ;
128
- it . skip ( `add ${ count2 } more nodes to the cluster should be reaonably fast and not blow up in a feedback loop` , async ( ) => {
141
+ it ( `add ${ count2 } more nodes to the cluster should be reaonably fast and not blow up in a feedback loop` , async ( ) => {
129
142
for ( let i = 0 ; i < count2 ; i ++ ) {
130
143
await addNodeToDefaultCluster ( ) ;
131
144
}
145
+ } ) ;
146
+
147
+ it ( "wait until cluster is consistent" , async ( ) => {
132
148
await waitForConsistentState ( servers ) ;
133
149
} ) ;
134
150
135
- it . skip ( "double check the links have the sticky state" , ( ) => {
151
+ it ( "double check the links have the sticky state" , ( ) => {
136
152
for ( const server of servers . slice ( 1 ) ) {
137
153
const link = server . clusterLinksByAddress [ servers [ 0 ] . address ( ) ] ;
138
154
const v = Object . keys ( link . sticky ) . filter ( ( s ) =>
@@ -142,20 +158,19 @@ describe("ensure sticky state sync and use is working properly", () => {
142
158
}
143
159
} ) ;
144
160
145
- it . skip ( "in bigger, cluster, publish from every node to subject.0.foo" , async ( ) => {
146
- for ( const server of servers ) {
147
- await server . client ( ) . publish ( "subject.0.foo" , "hello" ) ;
148
- }
149
- const ids : string [ ] = [ ] ;
150
- for ( const _ of servers ) {
151
- const { value } = await Promise . race ( [ subs0 [ 0 ] . next ( ) , subs1 [ 0 ] . next ( ) ] ) ;
152
- console . log ( value . data ) ;
153
- //expect(value.data).toBe("hello");
154
- ids . push ( value . client . id ) ;
155
- }
156
- // all messages must go to same sub, since sticky
157
- expect ( ids . length ) . toBe ( 1 ) ;
161
+ it (
162
+ "in bigger, cluster, publish from every node to subject.0.foo" ,
163
+ deliveryTest ,
164
+ ) ;
165
+
166
+ it ( "unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]" , async ( ) => {
167
+ await servers [ 1 ] . unjoin ( { address : servers [ 0 ] . address ( ) } ) ;
168
+ const v = Object . keys ( servers [ 1 ] . sticky ) . filter ( ( s ) =>
169
+ s . startsWith ( "subject." ) ,
170
+ ) ;
171
+ expect ( v . length ) . toBe ( count ) ;
158
172
} ) ;
173
+
159
174
} ) ;
160
175
161
176
afterAll ( after ) ;
0 commit comments