Skip to content

Commit 6a1bb35

Browse files
committed
test(schema_agreement): added ccm based test for schema agreement
Tested scenarios (positive: schema agreement is ok): 1. when all nodes up 2. when one node is stopped 3. when one node is paused (looks like disconnected)
1 parent ee98512 commit 6a1bb35

File tree

3 files changed

+198
-0
lines changed

3 files changed

+198
-0
lines changed

scylla/tests/ccm_integration/ccm/cluster.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ pub(crate) enum NodeStatus {
105105
Stopped,
106106
Started,
107107
Deleted,
108+
Paused,
108109
}
109110

110111
/// Options to start the node with.
@@ -385,6 +386,38 @@ impl Node {
385386
Ok(())
386387
}
387388

389+
/// Pauses the node by sending SIGSTOP signal to the process.
390+
pub(crate) async fn pause(&mut self) -> Result<(), Error> {
391+
let args: Vec<String> = vec![
392+
self.opts.name(),
393+
"pause".to_string(),
394+
"--config-dir".to_string(),
395+
self.config_dir.to_string_lossy().to_string(),
396+
];
397+
398+
self.logged_cmd
399+
.run_command("ccm", &args, RunOptions::new().with_env(self.get_ccm_env()))
400+
.await?;
401+
self.set_status(NodeStatus::Paused);
402+
Ok(())
403+
}
404+
405+
/// Resumes the node by sending SIGCONT signal to the process.
406+
pub(crate) async fn resume(&mut self) -> Result<(), Error> {
407+
let args: Vec<String> = vec![
408+
self.opts.name(),
409+
"resume".to_string(),
410+
"--config-dir".to_string(),
411+
self.config_dir.to_string_lossy().to_string(),
412+
];
413+
414+
self.logged_cmd
415+
.run_command("ccm", &args, RunOptions::new().with_env(self.get_ccm_env()))
416+
.await?;
417+
self.set_status(NodeStatus::Started);
418+
Ok(())
419+
}
420+
388421
pub(crate) async fn delete(&mut self) -> Result<(), Error> {
389422
if self.status == NodeStatus::Deleted {
390423
return Ok(());

scylla/tests/ccm_integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ pub(crate) mod ccm;
66
mod test_example;
77
#[cfg(feature = "ssl")]
88
mod tls;
9+
mod schema_agreement;
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
use std::sync::Arc;
2+
3+
use crate::ccm::cluster::{Cluster, ClusterOptions};
4+
use crate::ccm::{run_ccm_test, CLUSTER_VERSION};
5+
use crate::common::utils::{setup_tracing, unique_keyspace_name};
6+
7+
use tokio::sync::Mutex;
8+
9+
fn cluster_3_nodes() -> ClusterOptions {
10+
ClusterOptions {
11+
name: "schema_agreement_test".to_string(),
12+
version: CLUSTER_VERSION.clone(),
13+
nodes: vec![3],
14+
..ClusterOptions::default()
15+
}
16+
}
17+
18+
#[tokio::test]
19+
#[cfg_attr(not(ccm_tests), ignore)]
20+
async fn test_schema_agreement_all_nodes() {
21+
setup_tracing();
22+
async fn test(cluster: Arc<Mutex<Cluster>>) {
23+
let cluster = cluster.lock().await;
24+
let session = cluster.make_session_builder().await.build().await.unwrap();
25+
26+
// Create keyspace
27+
let keyspace = unique_keyspace_name();
28+
session
29+
.query_unpaged(
30+
format!(
31+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
32+
keyspace
33+
),
34+
&[],
35+
)
36+
.await
37+
.unwrap();
38+
39+
// Use keyspace
40+
session.use_keyspace(keyspace, true).await.unwrap();
41+
42+
// Create a table and check schema agreement
43+
let _result = session
44+
.query_unpaged(
45+
"CREATE TABLE test_schema_agreement_all (k int primary key, v int)",
46+
&[],
47+
)
48+
.await
49+
.unwrap();
50+
51+
// Check if schema is in agreement
52+
let schema_agreement = session.check_schema_agreement().await.unwrap();
53+
assert!(schema_agreement.is_some());
54+
}
55+
run_ccm_test(cluster_3_nodes, test).await;
56+
}
57+
58+
#[tokio::test]
59+
#[cfg_attr(not(ccm_tests), ignore)]
60+
async fn test_schema_agreement_with_stopped_node() {
61+
setup_tracing();
62+
async fn test(cluster: Arc<Mutex<Cluster>>) {
63+
let cluster = cluster.lock().await;
64+
65+
// Create keyspace
66+
let session = cluster.make_session_builder().await.build().await.unwrap();
67+
68+
let keyspace = unique_keyspace_name();
69+
session
70+
.query_unpaged(
71+
format!(
72+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
73+
keyspace
74+
),
75+
&[],
76+
)
77+
.await
78+
.unwrap();
79+
80+
// Use keyspace
81+
session.use_keyspace(keyspace, true).await.unwrap();
82+
83+
// Stop node 2
84+
let node = cluster.nodes().get_by_id(2).await.unwrap();
85+
node.write().await.stop(None).await.unwrap();
86+
87+
// Create a table while one node is stopped
88+
let _result = session
89+
.query_unpaged(
90+
"CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)",
91+
&[],
92+
)
93+
.await
94+
.unwrap();
95+
96+
// Schema agreement should succeed with remaining up nodes
97+
let schema_agreement = session.check_schema_agreement().await.unwrap();
98+
assert!(schema_agreement.is_some());
99+
100+
// Start the node back
101+
node.write().await.start(None).await.unwrap();
102+
let schema_agreement = session.check_schema_agreement().await.unwrap();
103+
assert!(schema_agreement.is_some());
104+
}
105+
run_ccm_test(cluster_3_nodes, test).await;
106+
}
107+
108+
109+
#[tokio::test]
110+
#[cfg_attr(not(ccm_tests), ignore)]
111+
async fn test_schema_agreement_with_paused_node() {
112+
setup_tracing();
113+
async fn test(cluster: Arc<Mutex<Cluster>>) {
114+
let cluster = cluster.lock().await;
115+
116+
// Create keyspace
117+
let session = cluster.make_session_builder().await.build().await.unwrap();
118+
119+
let keyspace = unique_keyspace_name();
120+
session
121+
.query_unpaged(
122+
format!(
123+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
124+
keyspace
125+
),
126+
&[],
127+
)
128+
.await
129+
.unwrap();
130+
131+
// Use keyspace
132+
session.use_keyspace(keyspace, true).await.unwrap();
133+
134+
// Stop node 2
135+
let node = cluster.nodes().get_by_id(2).await.unwrap();
136+
node.write().await.pause().await.unwrap();
137+
138+
// Try a simple query to verify database is responsive
139+
let result = session
140+
.query_unpaged("SELECT * FROM system.local", &[])
141+
.await
142+
.unwrap();
143+
println!("Result after pause: {result:?}");
144+
145+
// Create a table while one node is paused
146+
let _result = session
147+
.query_unpaged(
148+
"CREATE TABLE test_schema_agreement_paused (k int primary key, v int)",
149+
&[],
150+
)
151+
.await
152+
.unwrap();
153+
154+
// Schema agreement should succeed with remaining up nodes
155+
let schema_agreement = session.check_schema_agreement().await.unwrap();
156+
assert!(schema_agreement.is_some());
157+
158+
// Start the node back
159+
node.write().await.resume().await.unwrap();
160+
let schema_agreement = session.check_schema_agreement().await.unwrap();
161+
assert!(schema_agreement.is_some());
162+
}
163+
run_ccm_test(cluster_3_nodes, test).await;
164+
}

0 commit comments

Comments
 (0)