From 6a1bb35baf5043733b21d94496851ae4edf46c2b Mon Sep 17 00:00:00 2001 From: Lukasz Sojka Date: Thu, 13 Feb 2025 12:16:08 +0100 Subject: [PATCH 1/4] test(schema_agreement): added ccm based test for schema agreement Tested scenarios (positive: schema agreement is ok): 1. when all nodes up 2. when one node is stopped 3. when one node is paused (looks like disconnected) --- scylla/tests/ccm_integration/ccm/cluster.rs | 33 ++++ scylla/tests/ccm_integration/main.rs | 1 + .../tests/ccm_integration/schema_agreement.rs | 164 ++++++++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 scylla/tests/ccm_integration/schema_agreement.rs diff --git a/scylla/tests/ccm_integration/ccm/cluster.rs b/scylla/tests/ccm_integration/ccm/cluster.rs index 84d621b496..5a8f758b69 100644 --- a/scylla/tests/ccm_integration/ccm/cluster.rs +++ b/scylla/tests/ccm_integration/ccm/cluster.rs @@ -105,6 +105,7 @@ pub(crate) enum NodeStatus { Stopped, Started, Deleted, + Paused, } /// Options to start the node with. @@ -385,6 +386,38 @@ impl Node { Ok(()) } + /// Pauses the node by sending SIGSTOP signal to the process. + pub(crate) async fn pause(&mut self) -> Result<(), Error> { + let args: Vec = vec![ + self.opts.name(), + "pause".to_string(), + "--config-dir".to_string(), + self.config_dir.to_string_lossy().to_string(), + ]; + + self.logged_cmd + .run_command("ccm", &args, RunOptions::new().with_env(self.get_ccm_env())) + .await?; + self.set_status(NodeStatus::Paused); + Ok(()) + } + + /// Resumes the node by sending SIGCONT signal to the process. + pub(crate) async fn resume(&mut self) -> Result<(), Error> { + let args: Vec = vec![ + self.opts.name(), + "resume".to_string(), + "--config-dir".to_string(), + self.config_dir.to_string_lossy().to_string(), + ]; + + self.logged_cmd + .run_command("ccm", &args, RunOptions::new().with_env(self.get_ccm_env())) + .await?; + self.set_status(NodeStatus::Started); + Ok(()) + } + pub(crate) async fn delete(&mut self) -> Result<(), Error> { if self.status == NodeStatus::Deleted { return Ok(()); diff --git a/scylla/tests/ccm_integration/main.rs b/scylla/tests/ccm_integration/main.rs index 3f9373b477..8fd9692ed0 100644 --- a/scylla/tests/ccm_integration/main.rs +++ b/scylla/tests/ccm_integration/main.rs @@ -6,3 +6,4 @@ pub(crate) mod ccm; mod test_example; #[cfg(feature = "ssl")] mod tls; +mod schema_agreement; \ No newline at end of file diff --git a/scylla/tests/ccm_integration/schema_agreement.rs b/scylla/tests/ccm_integration/schema_agreement.rs new file mode 100644 index 0000000000..987a40ed2d --- /dev/null +++ b/scylla/tests/ccm_integration/schema_agreement.rs @@ -0,0 +1,164 @@ +use std::sync::Arc; + +use crate::ccm::cluster::{Cluster, ClusterOptions}; +use crate::ccm::{run_ccm_test, CLUSTER_VERSION}; +use crate::common::utils::{setup_tracing, unique_keyspace_name}; + +use tokio::sync::Mutex; + +fn cluster_3_nodes() -> ClusterOptions { + ClusterOptions { + name: "schema_agreement_test".to_string(), + version: CLUSTER_VERSION.clone(), + nodes: vec![3], + ..ClusterOptions::default() + } +} + +#[tokio::test] +#[cfg_attr(not(ccm_tests), ignore)] +async fn test_schema_agreement_all_nodes() { + setup_tracing(); + async fn test(cluster: Arc>) { + let cluster = cluster.lock().await; + let session = cluster.make_session_builder().await.build().await.unwrap(); + + // Create keyspace + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + // Use keyspace + session.use_keyspace(keyspace, true).await.unwrap(); + + // Create a table and check schema agreement + let _result = session + .query_unpaged( + "CREATE TABLE test_schema_agreement_all (k int primary key, v int)", + &[], + ) + .await + .unwrap(); + + // Check if schema is in agreement + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + } + run_ccm_test(cluster_3_nodes, test).await; +} + +#[tokio::test] +#[cfg_attr(not(ccm_tests), ignore)] +async fn test_schema_agreement_with_stopped_node() { + setup_tracing(); + async fn test(cluster: Arc>) { + let cluster = cluster.lock().await; + + // Create keyspace + let session = cluster.make_session_builder().await.build().await.unwrap(); + + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + // Use keyspace + session.use_keyspace(keyspace, true).await.unwrap(); + + // Stop node 2 + let node = cluster.nodes().get_by_id(2).await.unwrap(); + node.write().await.stop(None).await.unwrap(); + + // Create a table while one node is stopped + let _result = session + .query_unpaged( + "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)", + &[], + ) + .await + .unwrap(); + + // Schema agreement should succeed with remaining up nodes + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + + // Start the node back + node.write().await.start(None).await.unwrap(); + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + } + run_ccm_test(cluster_3_nodes, test).await; +} + + +#[tokio::test] +#[cfg_attr(not(ccm_tests), ignore)] +async fn test_schema_agreement_with_paused_node() { + setup_tracing(); + async fn test(cluster: Arc>) { + let cluster = cluster.lock().await; + + // Create keyspace + let session = cluster.make_session_builder().await.build().await.unwrap(); + + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + // Use keyspace + session.use_keyspace(keyspace, true).await.unwrap(); + + // Stop node 2 + let node = cluster.nodes().get_by_id(2).await.unwrap(); + node.write().await.pause().await.unwrap(); + + // Try a simple query to verify database is responsive + let result = session + .query_unpaged("SELECT * FROM system.local", &[]) + .await + .unwrap(); + println!("Result after pause: {result:?}"); + + // Create a table while one node is paused + let _result = session + .query_unpaged( + "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)", + &[], + ) + .await + .unwrap(); + + // Schema agreement should succeed with remaining up nodes + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + + // Start the node back + node.write().await.resume().await.unwrap(); + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + } + run_ccm_test(cluster_3_nodes, test).await; +} From 7beeaaf33a9f39f0ea1ba24f94a54667296a4c91 Mon Sep 17 00:00:00 2001 From: Lukasz Sojka Date: Fri, 14 Feb 2025 16:41:39 +0100 Subject: [PATCH 2/4] added load balancing policy to schema agreement test with paused node to avoid querying paused node directly. --- scylla/tests/ccm_integration/main.rs | 2 +- .../tests/ccm_integration/schema_agreement.rs | 77 ++++++++++++++----- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/scylla/tests/ccm_integration/main.rs b/scylla/tests/ccm_integration/main.rs index 8fd9692ed0..d0390521e0 100644 --- a/scylla/tests/ccm_integration/main.rs +++ b/scylla/tests/ccm_integration/main.rs @@ -3,7 +3,7 @@ mod common; mod authenticate; pub(crate) mod ccm; +mod schema_agreement; mod test_example; #[cfg(feature = "ssl")] mod tls; -mod schema_agreement; \ No newline at end of file diff --git a/scylla/tests/ccm_integration/schema_agreement.rs b/scylla/tests/ccm_integration/schema_agreement.rs index 987a40ed2d..d7e4f2acda 100644 --- a/scylla/tests/ccm_integration/schema_agreement.rs +++ b/scylla/tests/ccm_integration/schema_agreement.rs @@ -4,6 +4,10 @@ use crate::ccm::cluster::{Cluster, ClusterOptions}; use crate::ccm::{run_ccm_test, CLUSTER_VERSION}; use crate::common::utils::{setup_tracing, unique_keyspace_name}; +use scylla::client::execution_profile::ExecutionProfile; +use scylla::cluster::{ClusterState, Node, NodeRef}; +use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo}; +use scylla::query::Query; use tokio::sync::Mutex; fn cluster_3_nodes() -> ClusterOptions { @@ -15,6 +19,33 @@ fn cluster_3_nodes() -> ClusterOptions { } } +#[derive(Debug)] +struct SingleTargetLBP { + target: (Arc, Option), +} + +impl LoadBalancingPolicy for SingleTargetLBP { + fn pick<'a>( + &'a self, + _query: &'a RoutingInfo, + _cluster: &'a ClusterState, + ) -> Option<(NodeRef<'a>, Option)> { + Some((&self.target.0, self.target.1)) + } + + fn fallback<'a>( + &'a self, + _query: &'a RoutingInfo, + _cluster: &'a ClusterState, + ) -> FallbackPlan<'a> { + Box::new(std::iter::empty()) + } + + fn name(&self) -> String { + "SingleTargetLBP".to_owned() + } +} + #[tokio::test] #[cfg_attr(not(ccm_tests), ignore)] async fn test_schema_agreement_all_nodes() { @@ -105,7 +136,6 @@ async fn test_schema_agreement_with_stopped_node() { run_ccm_test(cluster_3_nodes, test).await; } - #[tokio::test] #[cfg_attr(not(ccm_tests), ignore)] async fn test_schema_agreement_with_paused_node() { @@ -113,7 +143,6 @@ async fn test_schema_agreement_with_paused_node() { async fn test(cluster: Arc>) { let cluster = cluster.lock().await; - // Create keyspace let session = cluster.make_session_builder().await.build().await.unwrap(); let keyspace = unique_keyspace_name(); @@ -128,35 +157,41 @@ async fn test_schema_agreement_with_paused_node() { .await .unwrap(); - // Use keyspace session.use_keyspace(keyspace, true).await.unwrap(); // Stop node 2 - let node = cluster.nodes().get_by_id(2).await.unwrap(); - node.write().await.pause().await.unwrap(); - - // Try a simple query to verify database is responsive - let result = session - .query_unpaged("SELECT * FROM system.local", &[]) - .await - .unwrap(); - println!("Result after pause: {result:?}"); - + let node_id = 2; + let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap(); + let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone(); + ccm_node.write().await.pause().await.unwrap(); + + // Find the corresponding Scylla node from the session to avoid querying it directly + let cluster_state = session.get_cluster_state(); + let scylla_node = cluster_state + .get_nodes_info() + .iter() + .find(|n| n.address.ip() != ccm_node_addr) + .expect("Could not find unpaused Scylla node for querying"); + + let policy = SingleTargetLBP { + target: (scylla_node.clone(), Some(0)), + }; + let execution_profile = ExecutionProfile::builder() + .load_balancing_policy(Arc::new(policy)) + .build(); + let mut stmt = + Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)"); + stmt.set_execution_profile_handle(Some(execution_profile.into_handle())); // Create a table while one node is paused - let _result = session - .query_unpaged( - "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)", - &[], - ) - .await - .unwrap(); + let _result = session.query_unpaged(stmt, &[]).await.unwrap(); // Schema agreement should succeed with remaining up nodes let schema_agreement = session.check_schema_agreement().await.unwrap(); assert!(schema_agreement.is_some()); // Start the node back - node.write().await.resume().await.unwrap(); + ccm_node.write().await.resume().await.unwrap(); + let schema_agreement = session.check_schema_agreement().await.unwrap(); assert!(schema_agreement.is_some()); } From 590866d665719a2f45cd316bd53342a1f2e35512 Mon Sep 17 00:00:00 2001 From: Lukasz Sojka Date: Fri, 14 Feb 2025 17:00:49 +0100 Subject: [PATCH 3/4] combined schema agreement tests to one This way we save resources (less nodes) but prolong test duration (from 70 seconds to 90s, where 60s is lost on waiting for timeout in failing test) --- .../tests/ccm_integration/schema_agreement.rs | 277 +++++++++--------- 1 file changed, 134 insertions(+), 143 deletions(-) diff --git a/scylla/tests/ccm_integration/schema_agreement.rs b/scylla/tests/ccm_integration/schema_agreement.rs index d7e4f2acda..cbe92ae3d0 100644 --- a/scylla/tests/ccm_integration/schema_agreement.rs +++ b/scylla/tests/ccm_integration/schema_agreement.rs @@ -48,152 +48,143 @@ impl LoadBalancingPolicy for SingleTargetLBP { #[tokio::test] #[cfg_attr(not(ccm_tests), ignore)] -async fn test_schema_agreement_all_nodes() { +async fn test_schema_agreement() { setup_tracing(); - async fn test(cluster: Arc>) { - let cluster = cluster.lock().await; - let session = cluster.make_session_builder().await.build().await.unwrap(); - - // Create keyspace - let keyspace = unique_keyspace_name(); - session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) - .await - .unwrap(); - - // Use keyspace - session.use_keyspace(keyspace, true).await.unwrap(); - - // Create a table and check schema agreement - let _result = session - .query_unpaged( - "CREATE TABLE test_schema_agreement_all (k int primary key, v int)", - &[], - ) - .await - .unwrap(); - - // Check if schema is in agreement - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - } - run_ccm_test(cluster_3_nodes, test).await; + run_ccm_test(cluster_3_nodes, test_schema_agreement_all_nodes).await; + run_ccm_test(cluster_3_nodes, test_schema_agreement_with_stopped_node).await; + run_ccm_test(cluster_3_nodes, test_schema_agreement_with_paused_node).await; } -#[tokio::test] -#[cfg_attr(not(ccm_tests), ignore)] -async fn test_schema_agreement_with_stopped_node() { - setup_tracing(); - async fn test(cluster: Arc>) { - let cluster = cluster.lock().await; - - // Create keyspace - let session = cluster.make_session_builder().await.build().await.unwrap(); - - let keyspace = unique_keyspace_name(); - session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) - .await - .unwrap(); - - // Use keyspace - session.use_keyspace(keyspace, true).await.unwrap(); - - // Stop node 2 - let node = cluster.nodes().get_by_id(2).await.unwrap(); - node.write().await.stop(None).await.unwrap(); - - // Create a table while one node is stopped - let _result = session - .query_unpaged( - "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)", - &[], - ) - .await - .unwrap(); - - // Schema agreement should succeed with remaining up nodes - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - - // Start the node back - node.write().await.start(None).await.unwrap(); - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - } - run_ccm_test(cluster_3_nodes, test).await; +async fn test_schema_agreement_all_nodes(cluster: Arc>) { + let cluster = cluster.lock().await; + let session = cluster.make_session_builder().await.build().await.unwrap(); + + // Create keyspace + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + // Use keyspace + session.use_keyspace(keyspace, true).await.unwrap(); + + // Create a table and check schema agreement + let _result = session + .query_unpaged( + "CREATE TABLE test_schema_agreement_all (k int primary key, v int)", + &[], + ) + .await + .unwrap(); + + // Check if schema is in agreement + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); } -#[tokio::test] -#[cfg_attr(not(ccm_tests), ignore)] -async fn test_schema_agreement_with_paused_node() { - setup_tracing(); - async fn test(cluster: Arc>) { - let cluster = cluster.lock().await; - - let session = cluster.make_session_builder().await.build().await.unwrap(); - - let keyspace = unique_keyspace_name(); - session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) - .await - .unwrap(); - - session.use_keyspace(keyspace, true).await.unwrap(); - - // Stop node 2 - let node_id = 2; - let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap(); - let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone(); - ccm_node.write().await.pause().await.unwrap(); - - // Find the corresponding Scylla node from the session to avoid querying it directly - let cluster_state = session.get_cluster_state(); - let scylla_node = cluster_state - .get_nodes_info() - .iter() - .find(|n| n.address.ip() != ccm_node_addr) - .expect("Could not find unpaused Scylla node for querying"); - - let policy = SingleTargetLBP { - target: (scylla_node.clone(), Some(0)), - }; - let execution_profile = ExecutionProfile::builder() - .load_balancing_policy(Arc::new(policy)) - .build(); - let mut stmt = - Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)"); - stmt.set_execution_profile_handle(Some(execution_profile.into_handle())); - // Create a table while one node is paused - let _result = session.query_unpaged(stmt, &[]).await.unwrap(); - - // Schema agreement should succeed with remaining up nodes - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - - // Start the node back - ccm_node.write().await.resume().await.unwrap(); - - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - } - run_ccm_test(cluster_3_nodes, test).await; +async fn test_schema_agreement_with_stopped_node(cluster: Arc>) { + let cluster = cluster.lock().await; + + // Create keyspace + let session = cluster.make_session_builder().await.build().await.unwrap(); + + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + // Use keyspace + session.use_keyspace(keyspace, true).await.unwrap(); + + // Stop node 2 + let node = cluster.nodes().get_by_id(2).await.unwrap(); + node.write().await.stop(None).await.unwrap(); + + // Create a table while one node is stopped + let _result = session + .query_unpaged( + "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)", + &[], + ) + .await + .unwrap(); + + // Schema agreement should succeed with remaining up nodes + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + + // Start the node back + node.write().await.start(None).await.unwrap(); + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); +} + +async fn test_schema_agreement_with_paused_node(cluster: Arc>) { + let cluster = cluster.lock().await; + + let session = cluster.make_session_builder().await.build().await.unwrap(); + + let keyspace = unique_keyspace_name(); + session + .query_unpaged( + format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", + keyspace + ), + &[], + ) + .await + .unwrap(); + + session.use_keyspace(keyspace, true).await.unwrap(); + + // Stop node 2 + let node_id = 2; + let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap(); + let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone(); + ccm_node.write().await.pause().await.unwrap(); + + // Find the corresponding Scylla node from the session to avoid querying it directly + let cluster_state = session.get_cluster_state(); + let scylla_node = cluster_state + .get_nodes_info() + .iter() + .find(|n| n.address.ip() != ccm_node_addr) + .expect("Could not find unpaused Scylla node for querying"); + + let policy = SingleTargetLBP { + target: (scylla_node.clone(), Some(0)), + }; + let execution_profile = ExecutionProfile::builder() + .load_balancing_policy(Arc::new(policy)) + .build(); + let mut stmt = + Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)"); + stmt.set_execution_profile_handle(Some(execution_profile.into_handle())); + // Create a table while one node is paused + let _result = session.query_unpaged(stmt, &[]).await.unwrap(); + + // Schema agreement should succeed with remaining up nodes + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); + + // Start the node back + ccm_node.write().await.resume().await.unwrap(); + + let schema_agreement = session.check_schema_agreement().await.unwrap(); + assert!(schema_agreement.is_some()); } From 1e821ef54df20ded4db4163664cdc85820c48d99 Mon Sep 17 00:00:00 2001 From: Lukasz Sojka Date: Tue, 4 Mar 2025 11:36:56 +0100 Subject: [PATCH 4/4] refactor: schema agreement test Make test easier to read and more robust. --- .../tests/ccm_integration/schema_agreement.rs | 276 ++++++++++++------ 1 file changed, 181 insertions(+), 95 deletions(-) diff --git a/scylla/tests/ccm_integration/schema_agreement.rs b/scylla/tests/ccm_integration/schema_agreement.rs index cbe92ae3d0..5352ca574a 100644 --- a/scylla/tests/ccm_integration/schema_agreement.rs +++ b/scylla/tests/ccm_integration/schema_agreement.rs @@ -1,15 +1,20 @@ use std::sync::Arc; +use std::time::Duration; use crate::ccm::cluster::{Cluster, ClusterOptions}; use crate::ccm::{run_ccm_test, CLUSTER_VERSION}; use crate::common::utils::{setup_tracing, unique_keyspace_name}; use scylla::client::execution_profile::ExecutionProfile; +use scylla::client::session::Session; use scylla::cluster::{ClusterState, Node, NodeRef}; use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo}; use scylla::query::Query; use tokio::sync::Mutex; +use tracing::info; +use uuid::Uuid; +/// Creates a cluster configuration with 3 nodes for schema agreement tests. fn cluster_3_nodes() -> ClusterOptions { ClusterOptions { name: "schema_agreement_test".to_string(), @@ -19,6 +24,7 @@ fn cluster_3_nodes() -> ClusterOptions { } } +/// A load balancing policy that targets a single node. #[derive(Debug)] struct SingleTargetLBP { target: (Arc, Option), @@ -46,6 +52,56 @@ impl LoadBalancingPolicy for SingleTargetLBP { } } +/// Waits for schema agreement with a timeout and retries. +async fn wait_for_schema_agreement( + session: &Session, + timeout: Duration, + retries: u32, +) -> Result, anyhow::Error> { + let retry_interval = Duration::from_millis(500); + let mut attempts = 0; + + tokio::time::timeout(timeout, async { + loop { + match session.check_schema_agreement().await { + Ok(Some(agreement)) => return Ok(Some(agreement)), + Ok(None) => { + attempts += 1; + if attempts > retries { + return Err(anyhow::anyhow!( + "Schema agreement not reached after {} retries", + retries + )); + } + info!( + "Schema agreement not yet reached, retrying ({}/{})", + attempts, retries + ); + tokio::time::sleep(retry_interval).await; + } + Err(e) => return Err(anyhow::anyhow!("Failed to check schema agreement: {}", e)), + } + } + }) + .await + .map_err(|_| anyhow::anyhow!("Schema agreement timed out after {:?}", timeout))? +} + +/// Sets up a keyspace with a given replication factor. +async fn setup_keyspace( + session: &Session, + keyspace: &str, + replication_factor: u32, +) -> Result<(), anyhow::Error> { + let query = format!( + "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : {}}}", + keyspace, replication_factor + ); + session.query_unpaged(query, &[]).await?; + session.use_keyspace(keyspace, true).await?; + Ok(()) +} + #[tokio::test] #[cfg_attr(not(ccm_tests), ignore)] async fn test_schema_agreement() { @@ -53,138 +109,168 @@ async fn test_schema_agreement() { run_ccm_test(cluster_3_nodes, test_schema_agreement_all_nodes).await; run_ccm_test(cluster_3_nodes, test_schema_agreement_with_stopped_node).await; run_ccm_test(cluster_3_nodes, test_schema_agreement_with_paused_node).await; + // TODO - multidc cases } +/// Tests schema agreement with all nodes running. async fn test_schema_agreement_all_nodes(cluster: Arc>) { let cluster = cluster.lock().await; - let session = cluster.make_session_builder().await.build().await.unwrap(); + let session = cluster + .make_session_builder() + .await + .build() + .await + .expect("Failed to create session"); - // Create keyspace let keyspace = unique_keyspace_name(); + setup_keyspace(&session, &keyspace, 3) + .await + .expect("Failed to setup keyspace"); + + info!("Creating table in test_schema_agreement_all_nodes"); session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) - .await - .unwrap(); - - // Use keyspace - session.use_keyspace(keyspace, true).await.unwrap(); - - // Create a table and check schema agreement - let _result = session - .query_unpaged( - "CREATE TABLE test_schema_agreement_all (k int primary key, v int)", - &[], - ) - .await - .unwrap(); - - // Check if schema is in agreement - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); + .query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[]) + .await + .expect("Failed to create table"); + + let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20) + .await + .expect("Schema agreement failed"); + assert!(agreement.is_some(), "Schema agreement should be reached"); + info!("Schema agreement achieved with all nodes"); } +/// Tests schema agreement with one node stopped. async fn test_schema_agreement_with_stopped_node(cluster: Arc>) { let cluster = cluster.lock().await; - - // Create keyspace - let session = cluster.make_session_builder().await.build().await.unwrap(); + let session = cluster + .make_session_builder() + .await + .build() + .await + .expect("Failed to create session"); let keyspace = unique_keyspace_name(); + setup_keyspace(&session, &keyspace, 3) + .await + .expect("Failed to setup keyspace"); + + let node = cluster + .nodes() + .get_by_id(2) + .await + .expect("Failed to get node 2"); + info!("Stopping node 2"); + node.write() + .await + .stop(None) + .await + .expect("Failed to stop node"); + + info!("Creating table with one node stopped"); session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) - .await - .unwrap(); - - // Use keyspace - session.use_keyspace(keyspace, true).await.unwrap(); - - // Stop node 2 - let node = cluster.nodes().get_by_id(2).await.unwrap(); - node.write().await.stop(None).await.unwrap(); - - // Create a table while one node is stopped - let _result = session - .query_unpaged( - "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)", - &[], - ) - .await - .unwrap(); - - // Schema agreement should succeed with remaining up nodes - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); - - // Start the node back - node.write().await.start(None).await.unwrap(); - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); + .query_unpaged("CREATE TABLE test_table (k int primary key, v int)", &[]) + .await + .expect("Failed to create table"); + + let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20) + .await + .expect("Schema agreement failed with stopped node"); + assert!( + agreement.is_some(), + "Schema agreement should be reached with remaining nodes" + ); + + info!("Restarting node 2"); + node.write() + .await + .start(None) + .await + .expect("Failed to restart node"); + let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20) + .await + .expect("Schema agreement failed after restart"); + assert!( + agreement.is_some(), + "Schema agreement should be reached after node restart" + ); + info!("Schema agreement achieved after node restart"); } +/// Tests schema agreement with one node paused. async fn test_schema_agreement_with_paused_node(cluster: Arc>) { let cluster = cluster.lock().await; - - let session = cluster.make_session_builder().await.build().await.unwrap(); + let session = cluster + .make_session_builder() + .await + .build() + .await + .expect("Failed to create session"); let keyspace = unique_keyspace_name(); - session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", - keyspace - ), - &[], - ) + setup_keyspace(&session, &keyspace, 3) .await - .unwrap(); - - session.use_keyspace(keyspace, true).await.unwrap(); + .expect("Failed to setup keyspace"); - // Stop node 2 let node_id = 2; - let ccm_node = cluster.nodes().get_by_id(node_id).await.unwrap(); + let ccm_node = cluster + .nodes() + .get_by_id(node_id) + .await + .expect("Failed to get node 2"); let ccm_node_addr = ccm_node.read().await.broadcast_rpc_address().clone(); - ccm_node.write().await.pause().await.unwrap(); + info!("Pausing node 2"); + ccm_node + .write() + .await + .pause() + .await + .expect("Failed to pause node"); - // Find the corresponding Scylla node from the session to avoid querying it directly let cluster_state = session.get_cluster_state(); - let scylla_node = cluster_state + let running_scylla_node = cluster_state .get_nodes_info() .iter() .find(|n| n.address.ip() != ccm_node_addr) - .expect("Could not find unpaused Scylla node for querying"); + .expect("Could not find unpaused Scylla node"); let policy = SingleTargetLBP { - target: (scylla_node.clone(), Some(0)), + target: (running_scylla_node.clone(), Some(0)), }; let execution_profile = ExecutionProfile::builder() .load_balancing_policy(Arc::new(policy)) .build(); - let mut stmt = - Query::new("CREATE TABLE test_schema_agreement_paused (k int primary key, v int)"); + let mut stmt = Query::new("CREATE TABLE test_table (k int primary key, v int)"); stmt.set_execution_profile_handle(Some(execution_profile.into_handle())); - // Create a table while one node is paused - let _result = session.query_unpaged(stmt, &[]).await.unwrap(); - // Schema agreement should succeed with remaining up nodes - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); + info!("Creating table with one node paused"); + session + .query_unpaged(stmt, &[]) + .await + .expect("Failed to create table"); + + let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20) + .await + .expect("Schema agreement failed with paused node"); + assert!( + agreement.is_some(), + "Schema agreement should be reached with remaining nodes" + ); - // Start the node back - ccm_node.write().await.resume().await.unwrap(); + info!("Resuming node 2"); + ccm_node + .write() + .await + .resume() + .await + .expect("Failed to resume node"); - let schema_agreement = session.check_schema_agreement().await.unwrap(); - assert!(schema_agreement.is_some()); + let agreement = wait_for_schema_agreement(&session, Duration::from_secs(10), 20) + .await + .expect("Schema agreement failed after resume"); + assert!( + agreement.is_some(), + "Schema agreement should be reached after node resume" + ); + info!("Schema agreement achieved after node resume"); }