@@ -4,6 +4,10 @@ use crate::ccm::cluster::{Cluster, ClusterOptions};
44use crate :: ccm:: { run_ccm_test, CLUSTER_VERSION } ;
55use crate :: common:: utils:: { setup_tracing, unique_keyspace_name} ;
66
7+ use scylla:: client:: execution_profile:: ExecutionProfile ;
8+ use scylla:: cluster:: { ClusterState , Node , NodeRef } ;
9+ use scylla:: policies:: load_balancing:: { FallbackPlan , LoadBalancingPolicy , RoutingInfo } ;
10+ use scylla:: query:: Query ;
711use tokio:: sync:: Mutex ;
812
913fn cluster_3_nodes ( ) -> ClusterOptions {
@@ -15,6 +19,33 @@ fn cluster_3_nodes() -> ClusterOptions {
1519 }
1620}
1721
22+ #[ derive( Debug ) ]
23+ struct SingleTargetLBP {
24+ target : ( Arc < Node > , Option < u32 > ) ,
25+ }
26+
27+ impl LoadBalancingPolicy for SingleTargetLBP {
28+ fn pick < ' a > (
29+ & ' a self ,
30+ _query : & ' a RoutingInfo ,
31+ _cluster : & ' a ClusterState ,
32+ ) -> Option < ( NodeRef < ' a > , Option < u32 > ) > {
33+ Some ( ( & self . target . 0 , self . target . 1 ) )
34+ }
35+
36+ fn fallback < ' a > (
37+ & ' a self ,
38+ _query : & ' a RoutingInfo ,
39+ _cluster : & ' a ClusterState ,
40+ ) -> FallbackPlan < ' a > {
41+ Box :: new ( std:: iter:: empty ( ) )
42+ }
43+
44+ fn name ( & self ) -> String {
45+ "SingleTargetLBP" . to_owned ( )
46+ }
47+ }
48+
1849#[ tokio:: test]
1950#[ cfg_attr( not( ccm_tests) , ignore) ]
2051async fn test_schema_agreement_all_nodes ( ) {
@@ -105,15 +136,13 @@ async fn test_schema_agreement_with_stopped_node() {
105136 run_ccm_test ( cluster_3_nodes, test) . await ;
106137}
107138
108-
109139#[ tokio:: test]
110140#[ cfg_attr( not( ccm_tests) , ignore) ]
111141async fn test_schema_agreement_with_paused_node ( ) {
112142 setup_tracing ( ) ;
113143 async fn test ( cluster : Arc < Mutex < Cluster > > ) {
114144 let cluster = cluster. lock ( ) . await ;
115145
116- // Create keyspace
117146 let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
118147
119148 let keyspace = unique_keyspace_name ( ) ;
@@ -128,35 +157,41 @@ async fn test_schema_agreement_with_paused_node() {
128157 . await
129158 . unwrap ( ) ;
130159
131- // Use keyspace
132160 session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
133161
134162 // Stop node 2
135- let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
136- node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
137-
138- // Try a simple query to verify database is responsive
139- let result = session
140- . query_unpaged ( "SELECT * FROM system.local" , & [ ] )
141- . await
142- . unwrap ( ) ;
143- println ! ( "Result after pause: {result:?}" ) ;
144-
163+ let node_id = 2 ;
164+ let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
165+ let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
166+ ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
167+
168+ // Find the corresponding Scylla node from the session to avoid querying it directly
169+ let cluster_state = session. get_cluster_state ( ) ;
170+ let scylla_node = cluster_state
171+ . get_nodes_info ( )
172+ . iter ( )
173+ . find ( |n| n. address . ip ( ) != ccm_node_addr)
174+ . expect ( "Could not find unpaused Scylla node for querying" ) ;
175+
176+ let policy = SingleTargetLBP {
177+ target : ( scylla_node. clone ( ) , Some ( 0 ) ) ,
178+ } ;
179+ let execution_profile = ExecutionProfile :: builder ( )
180+ . load_balancing_policy ( Arc :: new ( policy) )
181+ . build ( ) ;
182+ let mut stmt =
183+ Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
184+ stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
145185 // Create a table while one node is paused
146- let _result = session
147- . query_unpaged (
148- "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ,
149- & [ ] ,
150- )
151- . await
152- . unwrap ( ) ;
186+ let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
153187
154188 // Schema agreement should succeed with remaining up nodes
155189 let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
156190 assert ! ( schema_agreement. is_some( ) ) ;
157191
158192 // Start the node back
159- node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
193+ ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
194+
160195 let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
161196 assert ! ( schema_agreement. is_some( ) ) ;
162197 }
0 commit comments