@@ -22,10 +22,9 @@ use std::time::Duration;
2222use zenoh_config:: WhatAmI :: { Client , Peer , Router } ;
2323use zenoh_core:: { lazy_static, ztimeout} ;
2424
25- use crate :: { count, json, loc, Node } ;
25+ use crate :: { count, json, loc, skip_fmt , Node , SubUtils } ;
2626
27- const TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
28- const SLEEP : Duration = Duration :: from_secs ( 5 ) ;
27+ const TIMEOUT : Duration = Duration :: from_secs ( 10 ) ;
2928
3029lazy_static ! {
3130 static ref STORAGE : tracing_capture:: SharedStorage = tracing_capture:: SharedStorage :: default ( ) ;
@@ -94,39 +93,46 @@ async fn test_regions_scenario1_order1_putsub() {
9493 . multicast( "224.1.1.1:9300" )
9594 . open( ) ) ;
9695
97- let s9110 = _z9110. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
98- let s9120 = _z9120. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
99- let s9130 = _z9130. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
100- let s9210 = _z9210. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
101- let s9220 = _z9220. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
102- let s9230 = _z9230. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
103- let s9310 = _z9310. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
104- let s9320 = _z9320. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
105- let s9330 = _z9330. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
106-
107- tokio:: time:: sleep ( SLEEP ) . await ;
108-
109- _z9110. put ( "test/9110" , "9110" ) . await . unwrap ( ) ;
110- _z9120. put ( "test/9120" , "9120" ) . await . unwrap ( ) ;
111- _z9130. put ( "test/9130" , "9130" ) . await . unwrap ( ) ;
112- _z9210. put ( "test/9210" , "9210" ) . await . unwrap ( ) ;
113- _z9220. put ( "test/9220" , "9220" ) . await . unwrap ( ) ;
114- _z9230. put ( "test/9230" , "9230" ) . await . unwrap ( ) ;
115- _z9310. put ( "test/9310" , "9310" ) . await . unwrap ( ) ;
116- _z9320. put ( "test/9320" , "9320" ) . await . unwrap ( ) ;
117- _z9330. put ( "test/9330" , "9330" ) . await . unwrap ( ) ;
118-
119- tokio:: time:: sleep ( SLEEP ) . await ;
120-
121- assert_eq ! ( s9110. drain( ) . count( ) , 9 ) ;
122- assert_eq ! ( s9120. drain( ) . count( ) , 9 ) ;
123- assert_eq ! ( s9130. drain( ) . count( ) , 9 ) ;
124- assert_eq ! ( s9210. drain( ) . count( ) , 9 ) ;
125- assert_eq ! ( s9220. drain( ) . count( ) , 9 ) ;
126- assert_eq ! ( s9230. drain( ) . count( ) , 9 ) ;
127- assert_eq ! ( s9310. drain( ) . count( ) , 9 ) ;
128- assert_eq ! ( s9320. drain( ) . count( ) , 9 ) ;
129- assert_eq ! ( s9330. drain( ) . count( ) , 9 ) ;
96+ skip_fmt ! {
97+ let s9110 = _z9110. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
98+ let s9120 = _z9120. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
99+ let s9130 = _z9130. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
100+ let s9210 = _z9210. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
101+ let s9220 = _z9220. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
102+ let s9230 = _z9230. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
103+ let s9310 = _z9310. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
104+ let s9320 = _z9320. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
105+ let s9330 = _z9330. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
106+ }
107+
108+ ztimeout ! ( async {
109+ loop {
110+ _z9110. put( "test/9110" , "9110" ) . await . unwrap( ) ;
111+ _z9120. put( "test/9120" , "9120" ) . await . unwrap( ) ;
112+ _z9130. put( "test/9130" , "9130" ) . await . unwrap( ) ;
113+ _z9210. put( "test/9210" , "9210" ) . await . unwrap( ) ;
114+ _z9220. put( "test/9220" , "9220" ) . await . unwrap( ) ;
115+ _z9230. put( "test/9230" , "9230" ) . await . unwrap( ) ;
116+ _z9310. put( "test/9310" , "9310" ) . await . unwrap( ) ;
117+ _z9320. put( "test/9320" , "9320" ) . await . unwrap( ) ;
118+ _z9330. put( "test/9330" , "9330" ) . await . unwrap( ) ;
119+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
120+
121+ if true
122+ && s9110. count_keys( ) == 9
123+ && s9120. count_keys( ) == 9
124+ && s9130. count_keys( ) == 9
125+ && s9210. count_keys( ) == 9
126+ && s9220. count_keys( ) == 9
127+ && s9230. count_keys( ) == 9
128+ && s9310. count_keys( ) == 9
129+ && s9320. count_keys( ) == 9
130+ && s9330. count_keys( ) == 9
131+ {
132+ break ;
133+ }
134+ }
135+ } ) ;
130136
131137 let s = STORAGE . lock ( ) ;
132138
@@ -193,15 +199,17 @@ async fn test_regions_scenario1_order1_pubsub() {
193199 . multicast( "224.1.1.2:9300" )
194200 . open( ) ) ;
195201
196- let s9110 = _z9110. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
197- let s9120 = _z9120. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
198- let s9130 = _z9130. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
199- let s9210 = _z9210. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
200- let s9220 = _z9220. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
201- let s9230 = _z9230. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
202- let s9310 = _z9310. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
203- let s9320 = _z9320. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
204- let s9330 = _z9330. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
202+ skip_fmt ! {
203+ let s9110 = _z9110. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
204+ let s9120 = _z9120. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
205+ let s9130 = _z9130. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
206+ let s9210 = _z9210. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
207+ let s9220 = _z9220. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
208+ let s9230 = _z9230. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
209+ let s9310 = _z9310. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
210+ let s9320 = _z9320. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
211+ let s9330 = _z9330. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
212+ }
205213
206214 let p9110 = _z9110. declare_publisher ( "test/9110" ) . await . unwrap ( ) ;
207215 let p9120 = _z9120. declare_publisher ( "test/9120" ) . await . unwrap ( ) ;
@@ -211,35 +219,36 @@ async fn test_regions_scenario1_order1_pubsub() {
211219 let p9230 = _z9230. declare_publisher ( "test/9230" ) . await . unwrap ( ) ;
212220 let p9310 = _z9310. declare_publisher ( "test/9310" ) . await . unwrap ( ) ;
213221 let p9320 = _z9320. declare_publisher ( "test/9320" ) . await . unwrap ( ) ;
214- let p9330 = _z9330
215- . declare_publisher ( "test/9330" )
216- . express ( true )
217- . await
218- . unwrap ( ) ;
219-
220- tokio:: time:: sleep ( SLEEP ) . await ;
221-
222- p9110. put ( "9110" ) . await . unwrap ( ) ;
223- p9120. put ( "9120" ) . await . unwrap ( ) ;
224- p9130. put ( "9130" ) . await . unwrap ( ) ;
225- p9210. put ( "9210" ) . await . unwrap ( ) ;
226- p9220. put ( "9220" ) . await . unwrap ( ) ;
227- p9230. put ( "9230" ) . await . unwrap ( ) ;
228- p9310. put ( "9310" ) . await . unwrap ( ) ;
229- p9320. put ( "9320" ) . await . unwrap ( ) ;
230- p9330. put ( "9330" ) . await . unwrap ( ) ;
231-
232- tokio:: time:: sleep ( SLEEP ) . await ;
233-
234- assert_eq ! ( s9110. drain( ) . count( ) , 9 ) ;
235- assert_eq ! ( s9120. drain( ) . count( ) , 9 ) ;
236- assert_eq ! ( s9130. drain( ) . count( ) , 9 ) ;
237- assert_eq ! ( s9210. drain( ) . count( ) , 9 ) ;
238- assert_eq ! ( s9220. drain( ) . count( ) , 9 ) ;
239- assert_eq ! ( s9230. drain( ) . count( ) , 9 ) ;
240- assert_eq ! ( s9310. drain( ) . count( ) , 9 ) ;
241- assert_eq ! ( s9320. drain( ) . count( ) , 9 ) ;
242- assert_eq ! ( s9330. drain( ) . count( ) , 9 ) ;
222+ let p9330 = _z9330. declare_publisher ( "test/9330" ) . await . unwrap ( ) ;
223+
224+ ztimeout ! ( async {
225+ loop {
226+ p9110. put( "9110" ) . await . unwrap( ) ;
227+ p9120. put( "9120" ) . await . unwrap( ) ;
228+ p9130. put( "9130" ) . await . unwrap( ) ;
229+ p9210. put( "9210" ) . await . unwrap( ) ;
230+ p9220. put( "9220" ) . await . unwrap( ) ;
231+ p9230. put( "9230" ) . await . unwrap( ) ;
232+ p9310. put( "9310" ) . await . unwrap( ) ;
233+ p9320. put( "9320" ) . await . unwrap( ) ;
234+ p9330. put( "9330" ) . await . unwrap( ) ;
235+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
236+
237+ if true
238+ && s9110. count_keys( ) == 9
239+ && s9120. count_keys( ) == 9
240+ && s9130. count_keys( ) == 9
241+ && s9210. count_keys( ) == 9
242+ && s9220. count_keys( ) == 9
243+ && s9230. count_keys( ) == 9
244+ && s9310. count_keys( ) == 9
245+ && s9320. count_keys( ) == 9
246+ && s9330. count_keys( ) == 9
247+ {
248+ break ;
249+ }
250+ }
251+ } ) ;
243252
244253 let s = STORAGE . lock ( ) ;
245254
@@ -288,15 +297,17 @@ async fn test_regions_scenario1_order2_putsub() {
288297 . multicast( "224.1.2.1:9300" )
289298 . open( ) ) ;
290299
291- let s9110 = _z9110. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
292- let s9120 = _z9120. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
293- let s9130 = _z9130. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
294- let s9210 = _z9210. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
295- let s9220 = _z9220. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
296- let s9230 = _z9230. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
297- let s9310 = _z9310. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
298- let s9320 = _z9320. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
299- let s9330 = _z9330. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
300+ skip_fmt ! {
301+ let s9110 = _z9110. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
302+ let s9120 = _z9120. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
303+ let s9130 = _z9130. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
304+ let s9210 = _z9210. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
305+ let s9220 = _z9220. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
306+ let s9230 = _z9230. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
307+ let s9310 = _z9310. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
308+ let s9320 = _z9320. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
309+ let s9330 = _z9330. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
310+ }
300311
301312 let _z9000 = ztimeout ! ( Node :: new( Router , "12aa9000" ) . listen( "tcp/[::]:0" ) . open( ) ) ;
302313
@@ -316,29 +327,34 @@ async fn test_regions_scenario1_order2_putsub() {
316327 . gateway( json!( { "south" : [ { "filters" : [ { "modes" : [ "client" , "peer" ] } ] } ] } ) )
317328 . open( ) ) ;
318329
319- tokio:: time:: sleep ( Duration :: from_secs ( 8 ) ) . await ;
320-
321- _z9110. put ( "test/9110" , "9110" ) . await . unwrap ( ) ;
322- _z9120. put ( "test/9120" , "9120" ) . await . unwrap ( ) ;
323- _z9130. put ( "test/9130" , "9130" ) . await . unwrap ( ) ;
324- _z9210. put ( "test/9210" , "9210" ) . await . unwrap ( ) ;
325- _z9220. put ( "test/9220" , "9220" ) . await . unwrap ( ) ;
326- _z9230. put ( "test/9230" , "9230" ) . await . unwrap ( ) ;
327- _z9310. put ( "test/9310" , "9310" ) . await . unwrap ( ) ;
328- _z9320. put ( "test/9320" , "9320" ) . await . unwrap ( ) ;
329- _z9330. put ( "test/9330" , "9330" ) . await . unwrap ( ) ;
330-
331- tokio:: time:: sleep ( SLEEP ) . await ;
332-
333- assert_eq ! ( s9110. drain( ) . count( ) , 9 ) ;
334- assert_eq ! ( s9120. drain( ) . count( ) , 9 ) ;
335- assert_eq ! ( s9130. drain( ) . count( ) , 9 ) ;
336- assert_eq ! ( s9210. drain( ) . count( ) , 9 ) ;
337- assert_eq ! ( s9220. drain( ) . count( ) , 9 ) ;
338- assert_eq ! ( s9230. drain( ) . count( ) , 9 ) ;
339- assert_eq ! ( s9310. drain( ) . count( ) , 9 ) ;
340- assert_eq ! ( s9320. drain( ) . count( ) , 9 ) ;
341- assert_eq ! ( s9330. drain( ) . count( ) , 9 ) ;
330+ ztimeout ! ( async {
331+ loop {
332+ _z9110. put( "test/9110" , "9110" ) . await . unwrap( ) ;
333+ _z9120. put( "test/9120" , "9120" ) . await . unwrap( ) ;
334+ _z9130. put( "test/9130" , "9130" ) . await . unwrap( ) ;
335+ _z9210. put( "test/9210" , "9210" ) . await . unwrap( ) ;
336+ _z9220. put( "test/9220" , "9220" ) . await . unwrap( ) ;
337+ _z9230. put( "test/9230" , "9230" ) . await . unwrap( ) ;
338+ _z9310. put( "test/9310" , "9310" ) . await . unwrap( ) ;
339+ _z9320. put( "test/9320" , "9320" ) . await . unwrap( ) ;
340+ _z9330. put( "test/9330" , "9330" ) . await . unwrap( ) ;
341+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
342+
343+ if true
344+ && s9110. count_keys( ) == 9
345+ && s9120. count_keys( ) == 9
346+ && s9130. count_keys( ) == 9
347+ && s9210. count_keys( ) == 9
348+ && s9220. count_keys( ) == 9
349+ && s9230. count_keys( ) == 9
350+ && s9310. count_keys( ) == 9
351+ && s9320. count_keys( ) == 9
352+ && s9330. count_keys( ) == 9
353+ {
354+ break ;
355+ }
356+ }
357+ } ) ;
342358
343359 let s = STORAGE . lock ( ) ;
344360
@@ -387,15 +403,17 @@ async fn test_regions_scenario1_order2_pubsub() {
387403 . multicast( "224.1.2.2:9300" )
388404 . open( ) ) ;
389405
390- let s9110 = _z9110. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
391- let s9120 = _z9120. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
392- let s9130 = _z9130. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
393- let s9210 = _z9210. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
394- let s9220 = _z9220. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
395- let s9230 = _z9230. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
396- let s9310 = _z9310. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
397- let s9320 = _z9320. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
398- let s9330 = _z9330. declare_subscriber ( "test/**" ) . await . unwrap ( ) ;
406+ skip_fmt ! {
407+ let s9110 = _z9110. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
408+ let s9120 = _z9120. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
409+ let s9130 = _z9130. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
410+ let s9210 = _z9210. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
411+ let s9220 = _z9220. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
412+ let s9230 = _z9230. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
413+ let s9310 = _z9310. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
414+ let s9320 = _z9320. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
415+ let s9330 = _z9330. declare_subscriber( "test/**" ) . with( flume:: unbounded( ) ) . await . unwrap( ) ;
416+ }
399417
400418 let p9110 = _z9110. declare_publisher ( "test/9110" ) . await . unwrap ( ) ;
401419 let p9120 = _z9120. declare_publisher ( "test/9120" ) . await . unwrap ( ) ;
@@ -425,29 +443,34 @@ async fn test_regions_scenario1_order2_pubsub() {
425443 . gateway( json!( { "south" : [ { "filters" : [ { "modes" : [ "client" , "peer" ] } ] } ] } ) )
426444 . open( ) ) ;
427445
428- tokio:: time:: sleep ( Duration :: from_secs ( 8 ) ) . await ;
429-
430- p9110. put ( "9110" ) . await . unwrap ( ) ;
431- p9120. put ( "9120" ) . await . unwrap ( ) ;
432- p9130. put ( "9130" ) . await . unwrap ( ) ;
433- p9210. put ( "9210" ) . await . unwrap ( ) ;
434- p9220. put ( "9220" ) . await . unwrap ( ) ;
435- p9230. put ( "9230" ) . await . unwrap ( ) ;
436- p9310. put ( "9310" ) . await . unwrap ( ) ;
437- p9320. put ( "9320" ) . await . unwrap ( ) ;
438- p9330. put ( "9330" ) . await . unwrap ( ) ;
439-
440- tokio:: time:: sleep ( SLEEP ) . await ;
441-
442- assert_eq ! ( s9110. drain( ) . count( ) , 9 ) ;
443- assert_eq ! ( s9120. drain( ) . count( ) , 9 ) ;
444- assert_eq ! ( s9130. drain( ) . count( ) , 9 ) ;
445- assert_eq ! ( s9210. drain( ) . count( ) , 9 ) ;
446- assert_eq ! ( s9220. drain( ) . count( ) , 9 ) ;
447- assert_eq ! ( s9230. drain( ) . count( ) , 9 ) ;
448- assert_eq ! ( s9310. drain( ) . count( ) , 9 ) ;
449- assert_eq ! ( s9320. drain( ) . count( ) , 9 ) ;
450- assert_eq ! ( s9330. drain( ) . count( ) , 9 ) ;
446+ ztimeout ! ( async {
447+ loop {
448+ p9110. put( "9110" ) . await . unwrap( ) ;
449+ p9120. put( "9120" ) . await . unwrap( ) ;
450+ p9130. put( "9130" ) . await . unwrap( ) ;
451+ p9210. put( "9210" ) . await . unwrap( ) ;
452+ p9220. put( "9220" ) . await . unwrap( ) ;
453+ p9230. put( "9230" ) . await . unwrap( ) ;
454+ p9310. put( "9310" ) . await . unwrap( ) ;
455+ p9320. put( "9320" ) . await . unwrap( ) ;
456+ p9330. put( "9330" ) . await . unwrap( ) ;
457+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
458+
459+ if true
460+ && s9110. count_keys( ) == 9
461+ && s9120. count_keys( ) == 9
462+ && s9130. count_keys( ) == 9
463+ && s9210. count_keys( ) == 9
464+ && s9220. count_keys( ) == 9
465+ && s9230. count_keys( ) == 9
466+ && s9310. count_keys( ) == 9
467+ && s9320. count_keys( ) == 9
468+ && s9330. count_keys( ) == 9
469+ {
470+ break ;
471+ }
472+ }
473+ } ) ;
451474
452475 let s = STORAGE . lock ( ) ;
453476
0 commit comments