Skip to content

Commit cb1d70a

Browse files
committed
refactor: schema agreement test
Make test easier to read and more robust.
1 parent 590866d commit cb1d70a

File tree

1 file changed

+176
-95
lines changed

1 file changed

+176
-95
lines changed
Lines changed: 176 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use crate::ccm::cluster::{Cluster, ClusterOptions};
45
use crate::ccm::{run_ccm_test, CLUSTER_VERSION};
56
use crate::common::utils::{setup_tracing, unique_keyspace_name};
67

78
use scylla::client::execution_profile::ExecutionProfile;
9+
use scylla::client::session::Session;
810
use scylla::cluster::{ClusterState, Node, NodeRef};
911
use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
1012
use scylla::query::Query;
1113
use tokio::sync::Mutex;
14+
use tracing::info;
15+
use uuid::Uuid;
1216

17+
/// Creates a cluster configuration with 3 nodes for schema agreement tests.
1318
fn cluster_3_nodes() -> ClusterOptions {
1419
ClusterOptions {
1520
name: "schema_agreement_test".to_string(),
@@ -19,6 +24,7 @@ fn cluster_3_nodes() -> ClusterOptions {
1924
}
2025
}
2126

27+
/// A load balancing policy that targets a single node.
2228
#[derive(Debug)]
2329
struct SingleTargetLBP {
2430
target: (Arc<Node>, Option<u32>),
@@ -46,145 +52,220 @@ impl LoadBalancingPolicy for SingleTargetLBP {
4652
}
4753
}
4854

55+
/// Waits for schema agreement with a timeout and retries.
56+
async fn wait_for_schema_agreement(
57+
session: &Session,
58+
timeout: Duration,
59+
retries: u32,
60+
) -> Result<Option<Uuid>, anyhow::Error> {
61+
let retry_interval = Duration::from_millis(500);
62+
let mut attempts = 0;
63+
64+
tokio::time::timeout(timeout, async {
65+
loop {
66+
match session.check_schema_agreement().await {
67+
Ok(Some(agreement)) => return Ok(Some(agreement)),
68+
Ok(None) => {
69+
attempts += 1;
70+
if attempts > retries {
71+
return Err(anyhow::anyhow!(
72+
"Schema agreement not reached after {} retries",
73+
retries
74+
));
75+
}
76+
info!("Schema agreement not yet reached, retrying ({}/{})", attempts, retries);
77+
tokio::time::sleep(retry_interval).await;
78+
}
79+
Err(e) => return Err(anyhow::anyhow!("Failed to check schema agreement: {}", e)),
80+
}
81+
}
82+
})
83+
.await
84+
.map_err(|_| anyhow::anyhow!("Schema agreement timed out after {:?}", timeout))?
85+
}
86+
87+
/// Sets up a keyspace with a given replication factor.
88+
async fn setup_keyspace(
89+
session: &Session,
90+
keyspace: &str,
91+
replication_factor: u32,
92+
) -> Result<(), anyhow::Error> {
93+
let query = format!(
94+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : {}}}",
95+
keyspace, replication_factor
96+
);
97+
session.query_unpaged(query, &[]).await?;
98+
session.use_keyspace(keyspace, true).await?;
99+
Ok(())
100+
}
101+
49102
#[tokio::test]
50103
#[cfg_attr(not(ccm_tests), ignore)]
51104
async fn test_schema_agreement() {
52105
setup_tracing();
53106
run_ccm_test(cluster_3_nodes, test_schema_agreement_all_nodes).await;
54107
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_stopped_node).await;
55108
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_paused_node).await;
109+
// TODO - multidc cases
56110
}
57111

112+
/// Tests schema agreement with all nodes running.
58113
async fn test_schema_agreement_all_nodes(cluster: Arc<Mutex<Cluster>>) {
59114
let cluster = cluster.lock().await;
60-
let session = cluster.make_session_builder().await.build().await.unwrap();
115+
let session = cluster
116+
.make_session_builder()
117+
.await
118+
.build()
119+
.await
120+
.expect("Failed to create session");
61121

62-
// Create keyspace
63122
let keyspace = unique_keyspace_name();
123+
setup_keyspace(&session, &keyspace, 3)
124+
.await
125+
.expect("Failed to setup keyspace");
126+
127+
info!("Creating table in test_schema_agreement_all_nodes");
64128
session
65-
.query_unpaged(
66-
format!(
67-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
68-
keyspace
69-
),
70-
&[],
71-
)
72-
.await
73-
.unwrap();
74-
75-
// Use keyspace
76-
session.use_keyspace(keyspace, true).await.unwrap();
77-
78-
// Create a table and check schema agreement
79-
let _result = session
80-
.query_unpaged(
81-
"CREATE TABLE test_schema_agreement_all (k int primary key, v int)",
82-
&[],
83-
)
84-
.await
85-
.unwrap();
86-
87-
// Check if schema is in agreement
88-
let schema_agreement = session.check_schema_agreement().await.unwrap();
89-
assert!(schema_agreement.is_some());
129+
.query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[])
130+
.await
131+
.expect("Failed to create table");
132+
133+
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
134+
.await
135+
.expect("Schema agreement failed");
136+
assert!(agreement.is_some(), "Schema agreement should be reached");
137+
info!("Schema agreement achieved with all nodes");
90138
}
91139

140+
/// Tests schema agreement with one node stopped.
92141
async fn test_schema_agreement_with_stopped_node(cluster: Arc<Mutex<Cluster>>) {
93142
let cluster = cluster.lock().await;
94-
95-
// Create keyspace
96-
let session = cluster.make_session_builder().await.build().await.unwrap();
143+
let session = cluster
144+
.make_session_builder()
145+
.await
146+
.build()
147+
.await
148+
.expect("Failed to create session");
97149

98150
let keyspace = unique_keyspace_name();
151+
setup_keyspace(&session, &keyspace, 3)
152+
.await
153+
.expect("Failed to setup keyspace");
154+
155+
let node = cluster
156+
.nodes()
157+
.get_by_id(2)
158+
.await
159+
.expect("Failed to get node 2");
160+
info!("Stopping node 2");
161+
node.write().await.stop(None).await.expect("Failed to stop node");
162+
163+
info!("Creating table with one node stopped");
99164
session
100-
.query_unpaged(
101-
format!(
102-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
103-
keyspace
104-
),
105-
&[],
106-
)
107-
.await
108-
.unwrap();
109-
110-
// Use keyspace
111-
session.use_keyspace(keyspace, true).await.unwrap();
112-
113-
// Stop node 2
114-
let node = cluster.nodes().get_by_id(2).await.unwrap();
115-
node.write().await.stop(None).await.unwrap();
116-
117-
// Create a table while one node is stopped
118-
let _result = session
119-
.query_unpaged(
120-
"CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)",
121-
&[],
122-
)
123-
.await
124-
.unwrap();
125-
126-
// Schema agreement should succeed with remaining up nodes
127-
let schema_agreement = session.check_schema_agreement().await.unwrap();
128-
assert!(schema_agreement.is_some());
129-
130-
// Start the node back
131-
node.write().await.start(None).await.unwrap();
132-
let schema_agreement = session.check_schema_agreement().await.unwrap();
133-
assert!(schema_agreement.is_some());
165+
.query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[])
166+
.await
167+
.expect("Failed to create table");
168+
169+
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
170+
.await
171+
.expect("Schema agreement failed with stopped node");
172+
assert!(
173+
agreement.is_some(),
174+
"Schema agreement should be reached with remaining nodes"
175+
);
176+
177+
info!("Restarting node 2");
178+
node.write()
179+
.await
180+
.start(None)
181+
.await
182+
.expect("Failed to restart node");
183+
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
184+
.await
185+
.expect("Schema agreement failed after restart");
186+
assert!(
187+
agreement.is_some(),
188+
"Schema agreement should be reached after node restart"
189+
);
190+
info!("Schema agreement achieved after node restart");
134191
}
135192

193+
/// Tests schema agreement with one node paused.
136194
async fn test_schema_agreement_with_paused_node(cluster: Arc<Mutex<Cluster>>) {
137195
let cluster = cluster.lock().await;
138-
139-
let session = cluster.make_session_builder().await.build().await.unwrap();
196+
let session = cluster
197+
.make_session_builder()
198+
.await
199+
.build()
200+
.await
201+
.expect("Failed to create session");
140202

141203
let keyspace = unique_keyspace_name();
142-
session
143-
.query_unpaged(
144-
format!(
145-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
146-
keyspace
147-
),
148-
&[],
149-
)
204+
setup_keyspace(&session, &keyspace, 3)
150205
.await
151-
.unwrap();
152-
153-
session.use_keyspace(keyspace, true).await.unwrap();
206+
.expect("Failed to setup keyspace");
154207

155-
// Stop node 2
156208
let node_id = 2;
157-
let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap();
209+
let ccm_node = cluster
210+
.nodes()
211+
.get_by_id(node_id)
212+
.await
213+
.expect("Failed to get node 2");
158214
let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone();
159-
ccm_node.write().await.pause().await.unwrap();
215+
info!("Pausing node 2");
216+
ccm_node
217+
.write()
218+
.await
219+
.pause()
220+
.await
221+
.expect("Failed to pause node");
160222

161-
// Find the corresponding Scylla node from the session to avoid querying it directly
162223
let cluster_state = session.get_cluster_state();
163-
let scylla_node = cluster_state
224+
let running_scylla_node = cluster_state
164225
.get_nodes_info()
165226
.iter()
166227
.find(|n| n.address.ip() != ccm_node_addr)
167-
.expect("Could not find unpaused Scylla node for querying");
228+
.expect("Could not find unpaused Scylla node");
168229

169230
let policy = SingleTargetLBP {
170-
target: (scylla_node.clone(), Some(0)),
231+
target: (running_scylla_node.clone(), Some(0)),
171232
};
172233
let execution_profile = ExecutionProfile::builder()
173234
.load_balancing_policy(Arc::new(policy))
174235
.build();
175-
let mut stmt =
176-
Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)");
236+
let mut stmt = Query::new("CREATE TABLE test_table (k int primary key, v int)");
177237
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));
178-
// Create a table while one node is paused
179-
let _result = session.query_unpaged(stmt, &[]).await.unwrap();
180238

181-
// Schema agreement should succeed with remaining up nodes
182-
let schema_agreement = session.check_schema_agreement().await.unwrap();
183-
assert!(schema_agreement.is_some());
239+
info!("Creating table with one node paused");
240+
session
241+
.query_unpaged(stmt, &[])
242+
.await
243+
.expect("Failed to create table");
244+
245+
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
246+
.await
247+
.expect("Schema agreement failed with paused node");
248+
assert!(
249+
agreement.is_some(),
250+
"Schema agreement should be reached with remaining nodes"
251+
);
184252

185-
// Start the node back
186-
ccm_node.write().await.resume().await.unwrap();
253+
info!("Resuming node 2");
254+
ccm_node
255+
.write()
256+
.await
257+
.resume()
258+
.await
259+
.expect("Failed to resume node");
187260

188-
let schema_agreement = session.check_schema_agreement().await.unwrap();
189-
assert!(schema_agreement.is_some());
261+
let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20)
262+
.await
263+
.expect("Schema agreement failed after resume");
264+
assert!(
265+
agreement.is_some(),
266+
"Schema agreement should be reached after node resume"
267+
);
268+
info!("Schema agreement achieved after node resume");
190269
}
270+
271+

0 commit comments

Comments
 (0)