@@ -512,7 +512,6 @@ mod reconf_tests {
512512 ( ( x_tick, x_publisher_task) , ( y_tick, y_publisher_task) )
513513 }
514514
515- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
516515 #[ apply( async_test) ]
517516 async fn test_reconf_simple_add_no_reconf ( executor : Rc < LocalExecutor < ' static > > ) {
518517 // Tests the ReconfSemiSyncMonitor with the simple add monitor, without actually sending a
@@ -647,7 +646,6 @@ mod reconf_tests {
647646 . expect ( "y publisher task should finish" ) ;
648647 }
649648
650- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
651649 #[ apply( async_test) ]
652650 async fn test_reconf_no_change_of_streams ( executor : Rc < LocalExecutor < ' static > > ) {
653651 // Tests the ReconfSemiSyncMonitor with the simple add monitor, where we reconfigure but do
@@ -873,7 +871,6 @@ mod reconf_tests {
873871 . expect ( "y publisher task should finish" ) ;
874872 }
875873
876- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
877874 #[ apply( async_test) ]
878875 async fn test_reconf_delete_input_stream ( executor : Rc < LocalExecutor < ' static > > ) {
879876 // Tests the ReconfSemiSyncMonitor with the simple add monitor, where we reconfigure to a
@@ -1069,7 +1066,6 @@ mod reconf_tests {
10691066 . expect ( "x publisher task should finish" ) ;
10701067 }
10711068
1072- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
10731069 #[ apply( async_test) ]
10741070 async fn test_reconf_add_input_stream ( executor : Rc < LocalExecutor < ' static > > ) {
10751071 // Tests the ReconfSemiSyncMonitor with the acc spec, where we reconfigure to
@@ -1263,7 +1259,6 @@ mod reconf_tests {
12631259 . expect ( "y publisher task should finish" ) ;
12641260 }
12651261
1266- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
12671262 #[ apply( async_test) ]
12681263 async fn test_reconf_delete_output_stream ( executor : Rc < LocalExecutor < ' static > > ) {
12691264 // Tests the ReconfSemiSyncMonitor with the where we initally have two output streams,
@@ -1429,7 +1424,6 @@ mod reconf_tests {
14291424 . expect ( "x publisher task should finish" ) ;
14301425 }
14311426
1432- #[ ignore = "Ignore while using Paho MQTT client. This does not go well with creating multiple MQTT clients making test too flaky." ]
14331427 #[ apply( async_test) ]
14341428 async fn test_reconf_add_output_stream ( executor : Rc < LocalExecutor < ' static > > ) {
14351429 // Tests the ReconfSemiSyncMonitor with the where we initally have one output streams,
@@ -1587,94 +1581,4 @@ mod reconf_tests {
15871581 . await
15881582 . expect ( "x publisher task should finish" ) ;
15891583 }
1590-
1591- #[ ignore = "Read the note" ]
1592- #[ apply( async_test) ]
1593- async fn test_paho_bench ( executor : Rc < LocalExecutor < ' static > > ) -> anyhow:: Result < ( ) > {
1594- // NOTE: This is not really a test but doing a Criterion test with the exact same code of the paho client does not
1595- // work.
1596- //
1597- // This benchmarks the hypothesis that generating publishers and subscribers
1598- // disproportionally slow down performance. Note that size only changes number of
1599- // pubs/subs.
1600- // There is still the possibility that it is due to us doing something wrong in either
1601- // get_mqtt_outputs (and in the call to connect_and_receive) or dummy_stream_mqtt_publisher.
1602- //
1603- // Unignore to run locally - but it takes a while to run. I recommend running with
1604- // nocapture.
1605- //
1606- // Results:
1607- // bench_paho_clients(1) completed in 2.275242525s
1608- // bench_paho_clients(2) completed in 3.264509601s
1609- // bench_paho_clients(3) completed in 70.210330716s
1610- // bench_paho_clients(4) completed in 105.445208314s
1611- // bench_paho_clients(5) completed in 141.807053791s
1612- // bench_paho_clients(6) completed in 174.717985402s
1613-
1614- const MQTT_PORT : u16 = 1883 ;
1615- const PUBLISH_VALUES : [ i64 ; 4 ] = [ 1 , 2 , 3 , 4 ] ;
1616-
1617- async fn bench_paho_clients (
1618- executor : Rc < LocalExecutor < ' static > > ,
1619- size : usize ,
1620- ) -> anyhow:: Result < ( ) > {
1621- let topics: Vec < String > = ( 0 ..size)
1622- . map ( |idx| format ! ( "bench/paho/topic/x{}" , idx) )
1623- . collect ( ) ;
1624-
1625- let mut subscribers: Vec < trustworthiness_checker:: OutputStream < Value > > = Vec :: new ( ) ;
1626- let mut publishers = Vec :: new ( ) ;
1627- let mut ticks: Vec < TickSender > = Vec :: new ( ) ;
1628-
1629- let values: Vec < Value > = PUBLISH_VALUES . iter ( ) . copied ( ) . map ( Value :: Int ) . collect ( ) ;
1630- for ( idx, topic) in topics. iter ( ) . enumerate ( ) {
1631- let subscriber = get_mqtt_outputs (
1632- topic. clone ( ) ,
1633- format ! ( "bench_subscriber_{}" , idx) ,
1634- MQTT_PORT ,
1635- )
1636- . await ;
1637- subscribers. push ( subscriber) ;
1638- let ( mut tick, pub_stream) = tick_stream ( stream:: iter ( values. clone ( ) ) . boxed ( ) ) ;
1639- let publisher_task = executor. spawn ( dummy_stream_mqtt_publisher (
1640- format ! ( "bench_publisher_{}" , idx) ,
1641- topic. clone ( ) ,
1642- pub_stream,
1643- values. len ( ) ,
1644- MQTT_PORT ,
1645- ) ) ;
1646- publishers. push ( publisher_task) ;
1647- ticks. push ( tick) ;
1648- }
1649-
1650- for value in values {
1651- for tick in ticks. iter_mut ( ) {
1652- with_timeout_res ( tick. send ( ( ) ) , 3 , "tick.send" ) . await ?;
1653- }
1654- for ( idx, topic) in topics. iter ( ) . enumerate ( ) {
1655- let subscriber = subscribers. get_mut ( idx) . expect ( "Subscriber not found" ) ;
1656- let received = with_timeout ( subscriber. next ( ) , 120 , "subscriber.next" ) . await ?;
1657- let received = received. expect ( "Subscriber stream ended" ) ;
1658-
1659- assert_eq ! ( received, value) ;
1660- }
1661- }
1662-
1663- Ok ( ( ) )
1664- }
1665-
1666- let mut times = std:: collections:: HashMap :: new ( ) ;
1667- for i in 1 ..=5 {
1668- let start = std:: time:: Instant :: now ( ) ;
1669- bench_paho_clients ( executor. clone ( ) , i)
1670- . await
1671- . expect ( & format ! ( "Bench with {} clients failed" , i) ) ;
1672- let elapsed = start. elapsed ( ) ;
1673- println ! ( "bench_paho_clients({}) completed in {:?}" , i, elapsed) ;
1674- times. insert ( i, elapsed) ;
1675- }
1676- let start = std:: time:: Instant :: now ( ) ;
1677- assert ! ( false ) ;
1678- Ok ( ( ) )
1679- }
16801584}
0 commit comments