Skip to content

Commit 590866d

Browse files
committed
combined schema agreement tests to one
This way we save resources (less nodes) but prolong test duration (from 70 seconds to 90s, where 60s is lost on waiting for timeout in failing test)
1 parent 7beeaaf commit 590866d

File tree

1 file changed

+134
-143
lines changed

1 file changed

+134
-143
lines changed

scylla/tests/ccm_integration/schema_agreement.rs

Lines changed: 134 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -48,152 +48,143 @@ impl LoadBalancingPolicy for SingleTargetLBP {
4848

4949
#[tokio::test]
5050
#[cfg_attr(not(ccm_tests), ignore)]
51-
async fn test_schema_agreement_all_nodes() {
51+
async fn test_schema_agreement() {
5252
setup_tracing();
53-
async fn test(cluster: Arc<Mutex<Cluster>>) {
54-
let cluster = cluster.lock().await;
55-
let session = cluster.make_session_builder().await.build().await.unwrap();
56-
57-
// Create keyspace
58-
let keyspace = unique_keyspace_name();
59-
session
60-
.query_unpaged(
61-
format!(
62-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
63-
keyspace
64-
),
65-
&[],
66-
)
67-
.await
68-
.unwrap();
69-
70-
// Use keyspace
71-
session.use_keyspace(keyspace, true).await.unwrap();
72-
73-
// Create a table and check schema agreement
74-
let _result = session
75-
.query_unpaged(
76-
"CREATE TABLE test_schema_agreement_all (k int primary key, v int)",
77-
&[],
78-
)
79-
.await
80-
.unwrap();
81-
82-
// Check if schema is in agreement
83-
let schema_agreement = session.check_schema_agreement().await.unwrap();
84-
assert!(schema_agreement.is_some());
85-
}
86-
run_ccm_test(cluster_3_nodes, test).await;
53+
run_ccm_test(cluster_3_nodes, test_schema_agreement_all_nodes).await;
54+
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_stopped_node).await;
55+
run_ccm_test(cluster_3_nodes, test_schema_agreement_with_paused_node).await;
8756
}
8857

89-
#[tokio::test]
90-
#[cfg_attr(not(ccm_tests), ignore)]
91-
async fn test_schema_agreement_with_stopped_node() {
92-
setup_tracing();
93-
async fn test(cluster: Arc<Mutex<Cluster>>) {
94-
let cluster = cluster.lock().await;
95-
96-
// Create keyspace
97-
let session = cluster.make_session_builder().await.build().await.unwrap();
98-
99-
let keyspace = unique_keyspace_name();
100-
session
101-
.query_unpaged(
102-
format!(
103-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
104-
keyspace
105-
),
106-
&[],
107-
)
108-
.await
109-
.unwrap();
110-
111-
// Use keyspace
112-
session.use_keyspace(keyspace, true).await.unwrap();
113-
114-
// Stop node 2
115-
let node = cluster.nodes().get_by_id(2).await.unwrap();
116-
node.write().await.stop(None).await.unwrap();
117-
118-
// Create a table while one node is stopped
119-
let _result = session
120-
.query_unpaged(
121-
"CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)",
122-
&[],
123-
)
124-
.await
125-
.unwrap();
126-
127-
// Schema agreement should succeed with remaining up nodes
128-
let schema_agreement = session.check_schema_agreement().await.unwrap();
129-
assert!(schema_agreement.is_some());
130-
131-
// Start the node back
132-
node.write().await.start(None).await.unwrap();
133-
let schema_agreement = session.check_schema_agreement().await.unwrap();
134-
assert!(schema_agreement.is_some());
135-
}
136-
run_ccm_test(cluster_3_nodes, test).await;
58+
async fn test_schema_agreement_all_nodes(cluster: Arc<Mutex<Cluster>>) {
59+
let cluster = cluster.lock().await;
60+
let session = cluster.make_session_builder().await.build().await.unwrap();
61+
62+
// Create keyspace
63+
let keyspace = unique_keyspace_name();
64+
session
65+
.query_unpaged(
66+
format!(
67+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
68+
keyspace
69+
),
70+
&[],
71+
)
72+
.await
73+
.unwrap();
74+
75+
// Use keyspace
76+
session.use_keyspace(keyspace, true).await.unwrap();
77+
78+
// Create a table and check schema agreement
79+
let _result = session
80+
.query_unpaged(
81+
"CREATE TABLE test_schema_agreement_all (k int primary key, v int)",
82+
&[],
83+
)
84+
.await
85+
.unwrap();
86+
87+
// Check if schema is in agreement
88+
let schema_agreement = session.check_schema_agreement().await.unwrap();
89+
assert!(schema_agreement.is_some());
13790
}
13891

139-
#[tokio::test]
140-
#[cfg_attr(not(ccm_tests), ignore)]
141-
async fn test_schema_agreement_with_paused_node() {
142-
setup_tracing();
143-
async fn test(cluster: Arc<Mutex<Cluster>>) {
144-
let cluster = cluster.lock().await;
145-
146-
let session = cluster.make_session_builder().await.build().await.unwrap();
147-
148-
let keyspace = unique_keyspace_name();
149-
session
150-
.query_unpaged(
151-
format!(
152-
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
153-
keyspace
154-
),
155-
&[],
156-
)
157-
.await
158-
.unwrap();
159-
160-
session.use_keyspace(keyspace, true).await.unwrap();
161-
162-
// Stop node 2
163-
let node_id = 2;
164-
let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap();
165-
let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone();
166-
ccm_node.write().await.pause().await.unwrap();
167-
168-
// Find the corresponding Scylla node from the session to avoid querying it directly
169-
let cluster_state = session.get_cluster_state();
170-
let scylla_node = cluster_state
171-
.get_nodes_info()
172-
.iter()
173-
.find(|n| n.address.ip() != ccm_node_addr)
174-
.expect("Could not find unpaused Scylla node for querying");
175-
176-
let policy = SingleTargetLBP {
177-
target: (scylla_node.clone(), Some(0)),
178-
};
179-
let execution_profile = ExecutionProfile::builder()
180-
.load_balancing_policy(Arc::new(policy))
181-
.build();
182-
let mut stmt =
183-
Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)");
184-
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));
185-
// Create a table while one node is paused
186-
let _result = session.query_unpaged(stmt, &[]).await.unwrap();
187-
188-
// Schema agreement should succeed with remaining up nodes
189-
let schema_agreement = session.check_schema_agreement().await.unwrap();
190-
assert!(schema_agreement.is_some());
191-
192-
// Start the node back
193-
ccm_node.write().await.resume().await.unwrap();
194-
195-
let schema_agreement = session.check_schema_agreement().await.unwrap();
196-
assert!(schema_agreement.is_some());
197-
}
198-
run_ccm_test(cluster_3_nodes, test).await;
92+
async fn test_schema_agreement_with_stopped_node(cluster: Arc<Mutex<Cluster>>) {
93+
let cluster = cluster.lock().await;
94+
95+
// Create keyspace
96+
let session = cluster.make_session_builder().await.build().await.unwrap();
97+
98+
let keyspace = unique_keyspace_name();
99+
session
100+
.query_unpaged(
101+
format!(
102+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
103+
keyspace
104+
),
105+
&[],
106+
)
107+
.await
108+
.unwrap();
109+
110+
// Use keyspace
111+
session.use_keyspace(keyspace, true).await.unwrap();
112+
113+
// Stop node 2
114+
let node = cluster.nodes().get_by_id(2).await.unwrap();
115+
node.write().await.stop(None).await.unwrap();
116+
117+
// Create a table while one node is stopped
118+
let _result = session
119+
.query_unpaged(
120+
"CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)",
121+
&[],
122+
)
123+
.await
124+
.unwrap();
125+
126+
// Schema agreement should succeed with remaining up nodes
127+
let schema_agreement = session.check_schema_agreement().await.unwrap();
128+
assert!(schema_agreement.is_some());
129+
130+
// Start the node back
131+
node.write().await.start(None).await.unwrap();
132+
let schema_agreement = session.check_schema_agreement().await.unwrap();
133+
assert!(schema_agreement.is_some());
134+
}
135+
136+
async fn test_schema_agreement_with_paused_node(cluster: Arc<Mutex<Cluster>>) {
137+
let cluster = cluster.lock().await;
138+
139+
let session = cluster.make_session_builder().await.build().await.unwrap();
140+
141+
let keyspace = unique_keyspace_name();
142+
session
143+
.query_unpaged(
144+
format!(
145+
"CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}",
146+
keyspace
147+
),
148+
&[],
149+
)
150+
.await
151+
.unwrap();
152+
153+
session.use_keyspace(keyspace, true).await.unwrap();
154+
155+
// Stop node 2
156+
let node_id = 2;
157+
let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap();
158+
let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone();
159+
ccm_node.write().await.pause().await.unwrap();
160+
161+
// Find the corresponding Scylla node from the session to avoid querying it directly
162+
let cluster_state = session.get_cluster_state();
163+
let scylla_node = cluster_state
164+
.get_nodes_info()
165+
.iter()
166+
.find(|n| n.address.ip() != ccm_node_addr)
167+
.expect("Could not find unpaused Scylla node for querying");
168+
169+
let policy = SingleTargetLBP {
170+
target: (scylla_node.clone(), Some(0)),
171+
};
172+
let execution_profile = ExecutionProfile::builder()
173+
.load_balancing_policy(Arc::new(policy))
174+
.build();
175+
let mut stmt =
176+
Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)");
177+
stmt.set_execution_profile_handle(Some(execution_profile.into_handle()));
178+
// Create a table while one node is paused
179+
let _result = session.query_unpaged(stmt, &[]).await.unwrap();
180+
181+
// Schema agreement should succeed with remaining up nodes
182+
let schema_agreement = session.check_schema_agreement().await.unwrap();
183+
assert!(schema_agreement.is_some());
184+
185+
// Start the node back
186+
ccm_node.write().await.resume().await.unwrap();
187+
188+
let schema_agreement = session.check_schema_agreement().await.unwrap();
189+
assert!(schema_agreement.is_some());
199190
}

0 commit comments

Comments
 (0)