Skip to content

Commit 771521a

Browse files
committed
ccm: Add DowngradingConsistencyRetryPolicy test
1 parent c839847 commit 771521a

File tree

2 files changed

+119
-0
lines changed

2 files changed

+119
-0
lines changed

scylla/tests/ccm_integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
mod common;
33

44
pub(crate) mod ccm;
5+
mod retry_policies;
56
mod test_example;
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use std::collections::HashSet;
2+
use std::sync::Arc;
3+
use std::time::Instant;
4+
use tokio::sync::Mutex;
5+
6+
use scylla::client::execution_profile::ExecutionProfile;
7+
use scylla::client::session::Session;
8+
use scylla::policies::retry::DowngradingConsistencyRetryPolicy;
9+
use scylla::statement::Consistency;
10+
11+
use crate::ccm::cluster::{Cluster, ClusterOptions};
12+
use crate::ccm::{run_ccm_test, CLUSTER_VERSION};
13+
use crate::common::utils::setup_tracing;
14+
15+
fn cluster_config() -> ClusterOptions {
16+
ClusterOptions {
17+
name: "ccm_retry_policies".to_string(),
18+
version: CLUSTER_VERSION.clone(),
19+
nodes: vec![3],
20+
..ClusterOptions::default()
21+
}
22+
}
23+
24+
async fn cql_read_cl_all(session: &Session) {
25+
let cql = "SELECT range_end FROM system_distributed_everywhere.cdc_generation_descriptions_v2";
26+
for is_idempotent in [false, true] {
27+
let mut prep_stmt = session.prepare(cql).await.unwrap();
28+
prep_stmt.set_is_idempotent(is_idempotent);
29+
prep_stmt.set_retry_policy(Some(Arc::new(DowngradingConsistencyRetryPolicy::new())));
30+
prep_stmt.set_consistency(Consistency::All);
31+
session
32+
.execute_unpaged(&prep_stmt, &[])
33+
.await
34+
.expect("failed to execute CL=ALL read query");
35+
}
36+
}
37+
38+
async fn get_alive_nodes_number(session: &Session) -> usize {
39+
let cluster_state = session.get_cluster_state();
40+
let alive_nodes: HashSet<_> = cluster_state
41+
.get_nodes_info()
42+
.iter()
43+
.filter(|node| node.is_connected())
44+
.map(|node| node.address)
45+
.collect();
46+
alive_nodes.len()
47+
}
48+
49+
#[tokio::test]
50+
#[ntest::timeout(30000)]
51+
#[cfg_attr(not(ccm_tests), ignore)]
52+
async fn test_downgrading_cl_dbnode_unavailable() {
53+
// NOTE: whole test takes 15-20 seconds
54+
setup_tracing();
55+
async fn test(cluster: Arc<Mutex<Cluster>>) {
56+
let handle = ExecutionProfile::builder()
57+
.retry_policy(Arc::new(DowngradingConsistencyRetryPolicy::new()))
58+
.consistency(Consistency::All)
59+
.build()
60+
.into_handle();
61+
let cluster = cluster.lock().await;
62+
let session = cluster
63+
.make_session_builder()
64+
.await
65+
.default_execution_profile_handle(handle)
66+
.build()
67+
.await
68+
.unwrap();
69+
70+
cql_read_cl_all(&session).await;
71+
let target_node = cluster.nodes().iter().next();
72+
73+
let alive_nodes_num = get_alive_nodes_number(&session).await;
74+
let all_nodes_num: usize = cluster_config().nodes.iter().map(|&n| n as usize).sum();
75+
assert_eq!(all_nodes_num, alive_nodes_num);
76+
77+
println!("Going to stop first node");
78+
target_node
79+
.expect("failed to get DB node")
80+
.write()
81+
.await
82+
.stop(None)
83+
.await
84+
.unwrap();
85+
86+
// NOTE: make sure we have "ALL-1" active DB nodes
87+
let alive_nodes_num_after_stop = get_alive_nodes_number(&session).await;
88+
assert_eq!(all_nodes_num, alive_nodes_num_after_stop + 1);
89+
90+
// NOTE: make a CL=ALL query, it should succeed having "ALL-1" alive nodes
91+
cql_read_cl_all(&session).await;
92+
93+
println!("Going to start first node");
94+
target_node
95+
.expect("failed to get DB node")
96+
.write()
97+
.await
98+
.start(None)
99+
.await
100+
.unwrap();
101+
102+
// NOTE: wait while driver detects the node availability back again.
103+
// During the test development the waiting loop was taking ~1.2s
104+
let loop_start_time = Instant::now();
105+
loop {
106+
let alive_nodes_num_after_start = get_alive_nodes_number(&session).await;
107+
if alive_nodes_num_after_start == all_nodes_num {
108+
break;
109+
}
110+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
111+
}
112+
println!(
113+
"Waiting for the node availability took {:#?}",
114+
Instant::now() - loop_start_time
115+
);
116+
}
117+
run_ccm_test(cluster_config, test).await;
118+
}

0 commit comments

Comments
 (0)