@@ -18,6 +18,7 @@ import { delay } from "awaiting";
18
18
export { delay } ;
19
19
export { setDefaultTimeouts } from "@cocalc/conat/core/client" ;
20
20
export { once } from "@cocalc/util/async-utils" ;
21
+ import { until } from "@cocalc/util/async-utils" ;
21
22
import { randomId } from "@cocalc/conat/names" ;
22
23
import { isEqual } from "lodash" ;
23
24
@@ -169,8 +170,8 @@ export async function waitForConsistentState(
169
170
}
170
171
171
172
const start = Date . now ( ) ;
172
- await wait ( {
173
- until : ( ) => {
173
+ await until (
174
+ ( ) => {
174
175
for ( let i = 0 ; i < servers . length ; i ++ ) {
175
176
if ( servers [ i ] . state == "closed" ) {
176
177
return true ;
@@ -186,33 +187,50 @@ export async function waitForConsistentState(
186
187
throw Error ( `node ${ j } is not connected to node ${ i } ` ) ;
187
188
}
188
189
const x = link . interest . serialize ( ) . patterns ;
190
+ const showInfo = ( ) => {
191
+ console . log (
192
+ "server stream getAll: " ,
193
+ // @ts -ignore
194
+ servers [ i ] . clusterStreams . interest . stream . client . id ,
195
+ // @ts -ignore
196
+ servers [ i ] . clusterStreams . interest . stream . storage . path ,
197
+ // @ts -ignore
198
+ servers [ i ] . clusterStreams . interest . seqs ( ) ,
199
+ // @ts -ignore
200
+ //servers[i].clusterStreams.interest.getAll(),
201
+ ) ;
202
+ console . log (
203
+ "link stream getAll: " ,
204
+ // @ts -ignore
205
+ link . streams . interest . stream . client . id ,
206
+ // @ts -ignore
207
+ link . streams . interest . stream . storage . path ,
208
+ // @ts -ignore
209
+ link . streams . interest . seqs ( ) ,
210
+ // @ts -ignore
211
+ //link.streams.interest.getAll(),
212
+ ) ;
213
+ console . log ( "waitForConsistentState" , { i, j, a, x } ) ;
214
+ } ;
189
215
if ( ! isEqual ( a , x ) ) {
216
+ // @ts -ignore
217
+ const seqs0 = servers [ i ] . clusterStreams . interest . seqs ( ) ;
218
+ const seqs1 = link . streams . interest . seqs ( ) ;
219
+ if (
220
+ ! isEqual (
221
+ seqs0 . slice ( 0 , seqs1 . length ) ,
222
+ seqs1 . slice ( 0 , seqs0 . length ) ,
223
+ )
224
+ ) {
225
+ showInfo ( ) ;
226
+ throw Error ( `inconsistent initial sequences` ) ;
227
+ }
228
+
190
229
if ( Date . now ( ) - start > 3000 ) {
191
230
// likely going to fail
192
- console . log (
193
- "server stream getAll: " ,
194
- // @ts -ignore
195
- servers [ i ] . clusterStreams . interest . stream . client . id ,
196
- // @ts -ignore
197
- servers [ i ] . clusterStreams . interest . stream . storage . path ,
198
- // @ts -ignore
199
- servers [ i ] . clusterStreams . interest . seqs ( ) ,
200
- // @ts -ignore
201
- //servers[i].clusterStreams.interest.getAll(),
202
- ) ;
203
- console . log (
204
- "link stream getAll: " ,
205
- // @ts -ignore
206
- link . streams . interest . stream . client . id ,
207
- // @ts -ignore
208
- link . streams . interest . stream . storage . path ,
209
- // @ts -ignore
210
- link . streams . interest . seqs ( ) ,
211
- // @ts -ignore
212
- //link.streams.interest.getAll(),
213
- ) ;
214
- console . log ( "waitForConsistentState" , { i, j, a, x } ) ;
231
+ showInfo ( ) ;
215
232
}
233
+
216
234
// not yet equal
217
235
return false ;
218
236
}
@@ -221,8 +239,8 @@ export async function waitForConsistentState(
221
239
}
222
240
return true ;
223
241
} ,
224
- timeout,
225
- } ) ;
242
+ { timeout } ,
243
+ ) ;
226
244
}
227
245
228
246
export async function after ( ) {
0 commit comments