@@ -76,34 +76,34 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima
7676 . await
7777 . unwrap ( ) ;
7878
79+ type Rxs = [ mpsc:: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ;
7980
80- fn clear_rxs ( rxs : & mut [ mpsc :: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ) {
81+ fn clear_rxs ( rxs : & mut Rxs ) {
8182 for rx in rxs. iter_mut ( ) {
8283 while rx. try_recv ( ) . is_ok ( ) { }
8384 }
8485 }
8586
86- async fn assert_all_replicas_queried ( rxs : & mut [ mpsc:: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ) {
87- for rx in rxs. iter_mut ( ) {
88- rx. recv ( ) . await . unwrap ( ) ;
89- }
87+ fn get_num_queried ( rxs : & mut Rxs ) -> usize {
88+ let num_queried = rxs. iter_mut ( ) . filter_map ( |rx| rx. try_recv ( ) . ok ( ) ) . count ( ) ;
9089 clear_rxs ( rxs) ;
90+ num_queried
9191 }
9292
93- fn assert_one_replica_queried ( rxs : & mut [ mpsc :: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ) {
94- let mut found_queried = false ;
95- for rx in rxs . iter_mut ( ) {
96- if rx . try_recv ( ) . is_ok ( ) {
97- assert ! ( !found_queried ) ;
98- found_queried = true ;
99- } ;
100- }
101- assert ! ( found_queried ) ;
102- clear_rxs ( rxs ) ;
93+ fn assert_multiple_replicas_queried ( rxs : & mut Rxs ) {
94+ let num_queried = get_num_queried ( rxs ) ;
95+
96+ assert ! ( num_queried > 1 ) ;
97+ }
98+
99+ fn assert_one_replica_queried ( rxs : & mut Rxs ) {
100+ let num_queried = get_num_queried ( rxs ) ;
101+
102+ assert ! ( num_queried == 1 ) ;
103103 }
104104
105105 #[ allow( unused) ]
106- fn who_was_queried ( rxs : & mut [ mpsc :: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ) {
106+ fn who_was_queried ( rxs : & mut Rxs ) {
107107 for ( i, rx) in rxs. iter_mut ( ) . enumerate ( ) {
108108 if rx. try_recv ( ) . is_ok ( ) {
109109 println ! ( "{} was queried." , i) ;
@@ -113,7 +113,7 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima
113113 println ! ( "NOBODY was queried!" ) ;
114114 }
115115
116- // We will check which nodes where queries , for both LWT and non-LWT prepared statements.
116+ // We will check which nodes were queried , for both LWT and non-LWT prepared statements.
117117 let prepared_non_lwt = session. prepare ( "INSERT INTO t (a, b) VALUES (?, 1)" ) . await . unwrap ( ) ;
118118 let prepared_lwt = session. prepare ( "UPDATE t SET b=3 WHERE a=? IF b=2" ) . await . unwrap ( ) ;
119119
@@ -126,12 +126,20 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima
126126 assert ! ( prepared_non_lwt. is_token_aware( ) ) ;
127127 assert ! ( prepared_lwt. is_token_aware( ) ) ;
128128
129- // We execute non-LWT statements and ensure that all nodes were queried (due to RoundRobin).
130- for _ in 0 ..15 {
129+ // We execute non-LWT statements and ensure that multiple nodes were queried.
130+ //
131+ // Note that our DefaultPolicy no longer performs round robin, but instead randomly picks a replica.
132+ // To see multiple replicas here, we cannot choose a fixed pick seed, so we must rely on randomness.
133+ // It happened several times in CI that *not all* replicas were queried, but now we only
134+ // assert that *more than one* replica is queried. Moreover, we increased iterations
135+ // from 15 to 30 in hope this will suffice to prevent flakiness.
136+ // Alternatively, we could give up this part of the test and only test LWT part, but then
137+ // we couldn't be sure that in non-LWT case the driver truly chooses various replicas.
138+ for _ in 0 ..30 {
131139 session. execute ( & prepared_non_lwt, ( MAGIC_MARK , ) ) . await . unwrap ( ) ;
132140 }
133141
134- assert_all_replicas_queried ( & mut prepared_rxs) . await ;
142+ assert_multiple_replicas_queried ( & mut prepared_rxs) ;
135143
136144 // We execute LWT statements, and...
137145 for _ in 0 ..15 {
@@ -143,7 +151,7 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima
143151 assert_one_replica_queried ( & mut prepared_rxs) ;
144152 } else {
145153 // ...else we assert that replicas were shuffled as in case of non-LWT.
146- assert_all_replicas_queried ( & mut prepared_rxs) . await ;
154+ assert_multiple_replicas_queried ( & mut prepared_rxs) ;
147155 }
148156
149157 running_proxy
0 commit comments