11use std:: sync:: Arc ;
2+ use std:: time:: Duration ;
23
34use crate :: ccm:: cluster:: { Cluster , ClusterOptions } ;
45use crate :: ccm:: { run_ccm_test, CLUSTER_VERSION } ;
56use crate :: common:: utils:: { setup_tracing, unique_keyspace_name} ;
67
78use scylla:: client:: execution_profile:: ExecutionProfile ;
9+ use scylla:: client:: session:: Session ;
810use scylla:: cluster:: { ClusterState , Node , NodeRef } ;
911use scylla:: policies:: load_balancing:: { FallbackPlan , LoadBalancingPolicy , RoutingInfo } ;
1012use scylla:: query:: Query ;
1113use tokio:: sync:: Mutex ;
14+ use tracing:: info;
15+ use uuid:: Uuid ;
1216
17+ /// Creates a cluster configuration with 3 nodes for schema agreement tests.
1318fn cluster_3_nodes ( ) -> ClusterOptions {
1419 ClusterOptions {
1520 name : "schema_agreement_test" . to_string ( ) ,
@@ -19,6 +24,7 @@ fn cluster_3_nodes() -> ClusterOptions {
1924 }
2025}
2126
27+ /// A load balancing policy that targets a single node.
2228#[ derive( Debug ) ]
2329struct SingleTargetLBP {
2430 target : ( Arc < Node > , Option < u32 > ) ,
@@ -46,145 +52,225 @@ impl LoadBalancingPolicy for SingleTargetLBP {
4652 }
4753}
4854
55+ /// Waits for schema agreement with a timeout and retries.
56+ async fn wait_for_schema_agreement (
57+ session : & Session ,
58+ timeout : Duration ,
59+ retries : u32 ,
60+ ) -> Result < Option < Uuid > , anyhow:: Error > {
61+ let retry_interval = Duration :: from_millis ( 500 ) ;
62+ let mut attempts = 0 ;
63+
64+ tokio:: time:: timeout ( timeout, async {
65+ loop {
66+ match session. check_schema_agreement ( ) . await {
67+ Ok ( Some ( agreement) ) => return Ok ( Some ( agreement) ) ,
68+ Ok ( None ) => {
69+ attempts += 1 ;
70+ if attempts > retries {
71+ return Err ( anyhow:: anyhow!(
72+ "Schema agreement not reached after {} retries" ,
73+ retries
74+ ) ) ;
75+ }
76+ info ! (
77+ "Schema agreement not yet reached, retrying ({}/{})" ,
78+ attempts, retries
79+ ) ;
80+ tokio:: time:: sleep ( retry_interval) . await ;
81+ }
82+ Err ( e) => return Err ( anyhow:: anyhow!( "Failed to check schema agreement: {}" , e) ) ,
83+ }
84+ }
85+ } )
86+ . await
87+ . map_err ( |_| anyhow:: anyhow!( "Schema agreement timed out after {:?}" , timeout) ) ?
88+ }
89+
90+ /// Sets up a keyspace with a given replication factor.
91+ async fn setup_keyspace (
92+ session : & Session ,
93+ keyspace : & str ,
94+ replication_factor : u32 ,
95+ ) -> Result < ( ) , anyhow:: Error > {
96+ let query = format ! (
97+ "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : {}}}" ,
98+ keyspace, replication_factor
99+ ) ;
100+ session. query_unpaged ( query, & [ ] ) . await ?;
101+ session. use_keyspace ( keyspace, true ) . await ?;
102+ Ok ( ( ) )
103+ }
104+
49105#[ tokio:: test]
50106#[ cfg_attr( not( ccm_tests) , ignore) ]
51107async fn test_schema_agreement ( ) {
52108 setup_tracing ( ) ;
53109 run_ccm_test ( cluster_3_nodes, test_schema_agreement_all_nodes) . await ;
54110 run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_stopped_node) . await ;
55111 run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_paused_node) . await ;
112+ // TODO - multidc cases
56113}
57114
115+ /// Tests schema agreement with all nodes running.
58116async fn test_schema_agreement_all_nodes ( cluster : Arc < Mutex < Cluster > > ) {
59117 let cluster = cluster. lock ( ) . await ;
60- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
118+ let session = cluster
119+ . make_session_builder ( )
120+ . await
121+ . build ( )
122+ . await
123+ . expect ( "Failed to create session" ) ;
61124
62- // Create keyspace
63125 let keyspace = unique_keyspace_name ( ) ;
126+ setup_keyspace ( & session, & keyspace, 3 )
127+ . await
128+ . expect ( "Failed to setup keyspace" ) ;
129+
130+ info ! ( "Creating table in test_schema_agreement_all_nodes" ) ;
64131 session
65- . query_unpaged (
66- format ! (
67- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
68- keyspace
69- ) ,
70- & [ ] ,
71- )
72- . await
73- . unwrap ( ) ;
74-
75- // Use keyspace
76- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
77-
78- // Create a table and check schema agreement
79- let _result = session
80- . query_unpaged (
81- "CREATE TABLE test_schema_agreement_all (k int primary key, v int)" ,
82- & [ ] ,
83- )
84- . await
85- . unwrap ( ) ;
86-
87- // Check if schema is in agreement
88- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
89- assert ! ( schema_agreement. is_some( ) ) ;
132+ . query_unpaged ( "CREATE TABLE test_table (k int primary key, v int)" , & [ ] )
133+ . await
134+ . expect ( "Failed to create table" ) ;
135+
136+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
137+ . await
138+ . expect ( "Schema agreement failed" ) ;
139+ assert ! ( agreement. is_some( ) , "Schema agreement should be reached" ) ;
140+ info ! ( "Schema agreement achieved with all nodes" ) ;
90141}
91142
143+ /// Tests schema agreement with one node stopped.
92144async fn test_schema_agreement_with_stopped_node ( cluster : Arc < Mutex < Cluster > > ) {
93145 let cluster = cluster. lock ( ) . await ;
94-
95- // Create keyspace
96- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
146+ let session = cluster
147+ . make_session_builder ( )
148+ . await
149+ . build ( )
150+ . await
151+ . expect ( "Failed to create session" ) ;
97152
98153 let keyspace = unique_keyspace_name ( ) ;
154+ setup_keyspace ( & session, & keyspace, 3 )
155+ . await
156+ . expect ( "Failed to setup keyspace" ) ;
157+
158+ let node = cluster
159+ . nodes ( )
160+ . get_by_id ( 2 )
161+ . await
162+ . expect ( "Failed to get node 2" ) ;
163+ info ! ( "Stopping node 2" ) ;
164+ node. write ( )
165+ . await
166+ . stop ( None )
167+ . await
168+ . expect ( "Failed to stop node" ) ;
169+
170+ info ! ( "Creating table with one node stopped" ) ;
99171 session
100- . query_unpaged (
101- format ! (
102- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
103- keyspace
104- ) ,
105- & [ ] ,
106- )
107- . await
108- . unwrap ( ) ;
109-
110- // Use keyspace
111- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
112-
113- // Stop node 2
114- let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
115- node. write ( ) . await . stop ( None ) . await . unwrap ( ) ;
116-
117- // Create a table while one node is stopped
118- let _result = session
119- . query_unpaged (
120- "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)" ,
121- & [ ] ,
122- )
123- . await
124- . unwrap ( ) ;
125-
126- // Schema agreement should succeed with remaining up nodes
127- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
128- assert ! ( schema_agreement. is_some( ) ) ;
129-
130- // Start the node back
131- node. write ( ) . await . start ( None ) . await . unwrap ( ) ;
132- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
133- assert ! ( schema_agreement. is_some( ) ) ;
172+ . query_unpaged ( "CREATE TABLE test_table (k int primary key, v int)" , & [ ] )
173+ . await
174+ . expect ( "Failed to create table" ) ;
175+
176+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
177+ . await
178+ . expect ( "Schema agreement failed with stopped node" ) ;
179+ assert ! (
180+ agreement. is_some( ) ,
181+ "Schema agreement should be reached with remaining nodes"
182+ ) ;
183+
184+ info ! ( "Restarting node 2" ) ;
185+ node. write ( )
186+ . await
187+ . start ( None )
188+ . await
189+ . expect ( "Failed to restart node" ) ;
190+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
191+ . await
192+ . expect ( "Schema agreement failed after restart" ) ;
193+ assert ! (
194+ agreement. is_some( ) ,
195+ "Schema agreement should be reached after node restart"
196+ ) ;
197+ info ! ( "Schema agreement achieved after node restart" ) ;
134198}
135199
200+ /// Tests schema agreement with one node paused.
136201async fn test_schema_agreement_with_paused_node ( cluster : Arc < Mutex < Cluster > > ) {
137202 let cluster = cluster. lock ( ) . await ;
138-
139- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
203+ let session = cluster
204+ . make_session_builder ( )
205+ . await
206+ . build ( )
207+ . await
208+ . expect ( "Failed to create session" ) ;
140209
141210 let keyspace = unique_keyspace_name ( ) ;
142- session
143- . query_unpaged (
144- format ! (
145- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
146- keyspace
147- ) ,
148- & [ ] ,
149- )
211+ setup_keyspace ( & session, & keyspace, 3 )
150212 . await
151- . unwrap ( ) ;
152-
153- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
213+ . expect ( "Failed to setup keyspace" ) ;
154214
155- // Stop node 2
156215 let node_id = 2 ;
157- let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
216+ let ccm_node = cluster
217+ . nodes ( )
218+ . get_by_id ( node_id)
219+ . await
220+ . expect ( "Failed to get node 2" ) ;
158221 let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
159- ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
222+ info ! ( "Pausing node 2" ) ;
223+ ccm_node
224+ . write ( )
225+ . await
226+ . pause ( )
227+ . await
228+ . expect ( "Failed to pause node" ) ;
160229
161- // Find the corresponding Scylla node from the session to avoid querying it directly
162230 let cluster_state = session. get_cluster_state ( ) ;
163- let scylla_node = cluster_state
231+ let running_scylla_node = cluster_state
164232 . get_nodes_info ( )
165233 . iter ( )
166234 . find ( |n| n. address . ip ( ) != ccm_node_addr)
167- . expect ( "Could not find unpaused Scylla node for querying " ) ;
235+ . expect ( "Could not find unpaused Scylla node" ) ;
168236
169237 let policy = SingleTargetLBP {
170- target : ( scylla_node . clone ( ) , Some ( 0 ) ) ,
238+ target : ( running_scylla_node . clone ( ) , Some ( 0 ) ) ,
171239 } ;
172240 let execution_profile = ExecutionProfile :: builder ( )
173241 . load_balancing_policy ( Arc :: new ( policy) )
174242 . build ( ) ;
175- let mut stmt =
176- Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
243+ let mut stmt = Query :: new ( "CREATE TABLE test_table (k int primary key, v int)" ) ;
177244 stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
178- // Create a table while one node is paused
179- let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
180245
181- // Schema agreement should succeed with remaining up nodes
182- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
183- assert ! ( schema_agreement. is_some( ) ) ;
246+ info ! ( "Creating table with one node paused" ) ;
247+ session
248+ . query_unpaged ( stmt, & [ ] )
249+ . await
250+ . expect ( "Failed to create table" ) ;
251+
252+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
253+ . await
254+ . expect ( "Schema agreement failed with paused node" ) ;
255+ assert ! (
256+ agreement. is_some( ) ,
257+ "Schema agreement should be reached with remaining nodes"
258+ ) ;
184259
185- // Start the node back
186- ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
260+ info ! ( "Resuming node 2" ) ;
261+ ccm_node
262+ . write ( )
263+ . await
264+ . resume ( )
265+ . await
266+ . expect ( "Failed to resume node" ) ;
187267
188- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
189- assert ! ( schema_agreement. is_some( ) ) ;
268+ let agreement = wait_for_schema_agreement ( & session, Duration :: from_secs ( 10 ) , 20 )
269+ . await
270+ . expect ( "Schema agreement failed after resume" ) ;
271+ assert ! (
272+ agreement. is_some( ) ,
273+ "Schema agreement should be reached after node resume"
274+ ) ;
275+ info ! ( "Schema agreement achieved after node resume" ) ;
190276}
0 commit comments