diff --git a/scylla-proxy/src/actions.rs b/scylla-proxy/src/actions.rs index 274b2c530d..29400b9dca 100644 --- a/scylla-proxy/src/actions.rs +++ b/scylla-proxy/src/actions.rs @@ -1,7 +1,7 @@ -use std::{fmt, sync::Arc, time::Duration}; - use bytes::Bytes; use rand::{Rng, RngCore}; +use std::fmt::Debug; +use std::{fmt, sync::Arc, time::Duration}; use tokio::sync::mpsc; #[cfg(test)] @@ -51,14 +51,41 @@ pub enum Condition { // True if any REGISTER was sent on this connection. Useful to filter out control connection messages. ConnectionRegisteredAnyEvent, + + // A custom condition handler, allows you to customize frame matching + CustomCondition(ConditionHandler), +} + +pub struct ConditionHandler(Arc bool + Send + Sync>); + +impl ConditionHandler { + pub fn new(handler: Arc bool + Send + Sync>) -> Self { + ConditionHandler(handler) + } + + fn execute(&mut self, ctx: &EvaluationContext) -> bool { + self.0(ctx) + } +} + +impl Debug for ConditionHandler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ConditionHandler") + } +} + +impl Clone for ConditionHandler { + fn clone(&self) -> Self { + Self(self.0.clone()) + } } /// The context in which [`Conditions`](Condition) are evaluated. -pub(crate) struct EvaluationContext { - pub(crate) connection_seq_no: usize, - pub(crate) connection_has_events: bool, - pub(crate) opcode: FrameOpcode, - pub(crate) frame_body: Bytes, +pub struct EvaluationContext { + pub connection_seq_no: usize, + pub connection_has_events: bool, + pub opcode: FrameOpcode, + pub frame_body: Bytes, } impl Condition { @@ -116,7 +143,11 @@ impl Condition { val }, - Condition::ConnectionRegisteredAnyEvent => ctx.connection_has_events + Condition::ConnectionRegisteredAnyEvent => ctx.connection_has_events, + + Condition::CustomCondition(cb) => { + cb.execute(ctx) + } } } @@ -135,6 +166,11 @@ impl Condition { pub fn or(self, c2: Self) -> Self { Self::Or(Box::new(self), Box::new(c2)) } + + /// A convenience function for creating [Condition::CustomCondition] variant. + pub fn custom(handler_fn: fn(&EvaluationContext) -> bool) -> Self { + Self::CustomCondition(ConditionHandler::new(Arc::new(handler_fn))) + } } /// Just a trait to unify API of both [RequestReaction] and [ResponseReaction]. diff --git a/scylla-proxy/src/frame.rs b/scylla-proxy/src/frame.rs index 435a164863..ae1b90efab 100644 --- a/scylla-proxy/src/frame.rs +++ b/scylla-proxy/src/frame.rs @@ -43,7 +43,7 @@ pub(crate) enum FrameType { } #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub(crate) enum FrameOpcode { +pub enum FrameOpcode { Request(RequestOpcode), Response(ResponseOpcode), } diff --git a/scylla-proxy/src/lib.rs b/scylla-proxy/src/lib.rs index f4e3529acc..28e76c4597 100644 --- a/scylla-proxy/src/lib.rs +++ b/scylla-proxy/src/lib.rs @@ -6,11 +6,11 @@ mod proxy; pub type TargetShard = u16; pub use actions::{ - example_db_errors, Action, Condition, Reaction, RequestReaction, RequestRule, ResponseReaction, - ResponseRule, + example_db_errors, Action, Condition, ConditionHandler, Reaction, RequestReaction, RequestRule, + ResponseReaction, ResponseRule, }; pub use errors::{DoorkeeperError, ProxyError, WorkerError}; -pub use frame::{RequestFrame, RequestOpcode, ResponseFrame, ResponseOpcode}; +pub use frame::{FrameOpcode, RequestFrame, RequestOpcode, ResponseFrame, ResponseOpcode}; pub use proxy::{Node, Proxy, RunningProxy, ShardAwareness}; pub use proxy::get_exclusive_local_address; diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index d69c0aa0ca..4ca13f27f7 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -19,6 +19,8 @@ mod shards; mod silent_prepare_batch; mod silent_prepare_query; mod skip_metadata_optimization; +mod speculative_retry; +mod speculative_tests_utils; mod tablets; #[path = "../common/utils.rs"] mod utils; diff --git a/scylla/tests/integration/retries.rs b/scylla/tests/integration/retries.rs index 109c6bc1dc..873aa6087a 100644 --- a/scylla/tests/integration/retries.rs +++ b/scylla/tests/integration/retries.rs @@ -14,88 +14,6 @@ use scylla_proxy::{ WorkerError, }; -#[tokio::test] -#[ntest::timeout(30000)] -#[cfg(not(scylla_cloud_tests))] -async fn speculative_execution_is_fired() { - setup_tracing(); - const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000); - - let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { - // DB preparation phase - let simple_speculative_no_retry_profile = ExecutionProfile::builder().speculative_execution_policy(Some(Arc::new(SimpleSpeculativeExecutionPolicy { - max_retry_count: 2, - retry_interval: Duration::from_millis(10), - }))).retry_policy(Arc::new(FallthroughRetryPolicy)).build(); - let session: Session = SessionBuilder::new() - .known_node(proxy_uris[0].as_str()) - .default_execution_profile_handle(simple_speculative_no_retry_profile.into_handle()) - .address_translator(Arc::new(translation_map)) - .build() - .await - .unwrap(); - - let ks = unique_keyspace_name(); - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); - session.use_keyspace(ks, false).await.unwrap(); - session - .ddl("CREATE TABLE t (a int primary key)") - .await - .unwrap(); - - let mut q = Query::from("INSERT INTO t (a) VALUES (?)"); - q.set_is_idempotent(true); // this is to allow speculative execution to fire - - let drop_frame_rule = RequestRule( - Condition::RequestOpcode(RequestOpcode::Prepare) - .and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))), - RequestReaction::drop_frame(), - ); - - info!("--------------------- BEGINNING main test part ----------------"); - - info!("--------------------- first query - no rules ----------------"); - // first run before any rules - session.query_unpaged(q.clone(), (3,)).await.unwrap(); - - info!("--------------------- second query - 0 and 2 nodes not responding ----------------"); - running_proxy.running_nodes[0] - .change_request_rules(Some(vec![drop_frame_rule.clone()])); - running_proxy.running_nodes[2] - .change_request_rules(Some(vec![drop_frame_rule.clone()])); - - session.query_unpaged(q.clone(), (2,)).await.unwrap(); - - info!("--------------------- third query - 0 and 1 nodes not responding ----------------"); - running_proxy.running_nodes[2] - .change_request_rules(None); - running_proxy.running_nodes[1] - .change_request_rules(Some(vec![drop_frame_rule.clone()])); - - session.query_unpaged(q.clone(), (1,)).await.unwrap(); - - - info!("--------------------- fourth query - all nodes not responding ----------------"); - running_proxy.running_nodes[2] - .change_request_rules(Some(vec![drop_frame_rule])); - - tokio::select! { - res = session.query_unpaged(q, (0,)) => panic!("Rules did not work: received response {:?}", res), - _ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => (), - }; - - info!("--------------------- FINISHING main test part ----------------"); - - running_proxy - }).await; - - match res { - Ok(()) => (), - Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), - Err(err) => panic!("{}", err), - } -} - #[tokio::test] #[ntest::timeout(30000)] #[cfg(not(scylla_cloud_tests))] diff --git a/scylla/tests/integration/speculative_retry.rs b/scylla/tests/integration/speculative_retry.rs new file mode 100644 index 0000000000..aec645e712 --- /dev/null +++ b/scylla/tests/integration/speculative_retry.rs @@ -0,0 +1,539 @@ +use crate::speculative_tests_utils::{ + check_expectations, convert_into_simple_history, into_proxy_request_rules, + AttemptRecordExpectation, NodeExpectation, SimpleProxyRules, +}; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; +use scylla::client::execution_profile::ExecutionProfile; +use scylla::client::session::Session; +use scylla::client::session_builder; +use scylla::errors::RequestAttemptError; +use scylla::observability::history::HistoryCollector; +use scylla::policies::retry::{ + FallthroughRetryPolicy, RequestInfo, RetryDecision, RetryPolicy, + RetrySession, +}; +use scylla::policies::speculative_execution::{ + SimpleSpeculativeExecutionPolicy, SpeculativeExecutionPolicy, +}; +use scylla::query::Query; +use scylla_cql::frame::request::RequestOpcode; +use scylla_cql::frame::response::error::DbError; +use scylla_proxy::{example_db_errors, ProxyError, ShardAwareness, WorkerError}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +#[tokio::test] +#[ntest::timeout(300000)] +#[cfg(not(scylla_cloud_tests))] +async fn speculative_execution_is_fired2() { + setup_tracing(); + + let ks = unique_keyspace_name(); + + // There is no way to isolate attempts, test logic relays on ticks, + // which is time when one given attempts is going to be 100% executed + let tick = Duration::from_millis(200); + let never = tick * 10000; + + let target_opcode = RequestOpcode::Execute; + let simple_speculative = Some(Arc::new(SimpleSpeculativeExecutionPolicy { + max_retry_count: 3, + retry_interval: tick * 2, + }) as Arc); + + enum QueryResultExpectation { + Success, + Failed, + } + + struct TestCase { + name: String, + request_rules: Vec, + speculative_policy: Option>, + retry_policy: Arc, + expectation: Vec, + expected_query_result: QueryResultExpectation, + } + + // an error that is retried only once on the next host + let error_retry_next = example_db_errors::overloaded; + + // an error that is retried only once on the next host + let error_retry_once = example_db_errors::protocol_error; + + // an error that is retried only once by on the same host + let error_retry_same = example_db_errors::is_bootstrapping; + + // an error that is not retried + let error_noretry = example_db_errors::truncate_error; + + #[derive(Debug)] + struct TestRetryPolicy { + max_retry: usize, + } + + impl TestRetryPolicy { + fn new(max_retry: usize) -> Self { + Self { max_retry } + } + } + + impl RetryPolicy for TestRetryPolicy { + fn new_session(&self) -> Box { + Box::new(TestRetrySession::new(self.max_retry)) + } + } + + struct TestRetrySession { + try_done: AtomicUsize, + max_retry: usize, + } + + impl TestRetrySession { + fn new(max_retry: usize) -> Self { + TestRetrySession { + max_retry, + try_done: AtomicUsize::new(0), + } + } + } + + impl RetrySession for TestRetrySession { + fn decide_should_retry(&mut self, rq: RequestInfo) -> RetryDecision { + if self.try_done.fetch_add(1, Ordering::Relaxed) >= self.max_retry { + return RetryDecision::DontRetry; + } + match rq.error { + RequestAttemptError::DbError(DbError::Overloaded, _) => { + RetryDecision::RetryNextTarget(None) + } + RequestAttemptError::DbError(DbError::IsBootstrapping, _) => { + RetryDecision::RetrySameTarget(None) + } + _ => RetryDecision::DontRetry, + } + } + fn reset(&mut self) {} + } + + let test_cases = vec![ + TestCase { + name: "pass on first try".to_string(), + request_rules: vec![], // No error injected, query is going just pass + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Should just succeed + AttemptRecordExpectation::RegularSuccess(NodeExpectation::AnyNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + TestCase { + name: "failure on first try then pass".to_string(), + request_rules: vec![ + // Will make query fail once with error that is retried on the next node + // And then it is going to pass + SimpleProxyRules::fail(None, error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Should retry on next node once and then succeed + AttemptRecordExpectation::RegularFailure(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::RegularSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + TestCase { + name: "block regular fiber, succeeds on speculative".to_string(), + request_rules: vec![ + // Will make first query scheduled to fail too far in future + // Next retry should be speculative and should succeed, not waiting for regular fiber response + SimpleProxyRules::delay(never), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // First attempt on regular fiber stays without response (response comes too late) + // Second attempts on speculative fiber succeeds + AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + TestCase { + name: "block regular fiber, fails and then succeeds on speculative".to_string(), + request_rules: vec![ + // Will make query fail once with error that is retried on the next node + // Next retry should be speculative and should fail and to be retried on next node + // Next attempt (speculative as well) should succeed + SimpleProxyRules::delay(never), + SimpleProxyRules::fail(None, error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // First attempt on regular fiber stays without response (response comes too late) + // Second attempts on speculative fiber succeeds + AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + + TestCase { + name: "Regular fails after speculative fiber fails twice".to_string(), + request_rules: vec![ + // Will make query fail once with error that is retried on the next node + // Next retry should be speculative and should fail and to be retried on next node + // Next attempt (speculative as well) should succeed + SimpleProxyRules::fail(Some(tick*4), error_retry_next), + SimpleProxyRules::fail(None, error_retry_next), + SimpleProxyRules::fail(None, error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // First attempt on regular fiber stays without response (response comes too late) + // Second attempts on speculative fiber fails and only next one completes + // Regular fiber should stay locked. + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::RegularFailure(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Failed, + }, + TestCase { + name: "Regular succeeds after speculative fiber fails twice".to_string(), + request_rules: vec![ + // Will make query fail once with error that is retried on the next node + // Next retry should be speculative and should fail and to be retried on next node + // Next attempt (speculative as well) should succeed + SimpleProxyRules::delay(tick*3), + SimpleProxyRules::fail(None, error_retry_next), + SimpleProxyRules::fail(None, error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // First attempt on regular fiber stays without response (response comes too late) + // Second attempts on speculative fiber fails and only next one completes + // Regular fiber should stay locked. + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::RegularSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + + TestCase { + name: "regular fiber exhausting query plan".to_string(), + request_rules: vec![ + // Will make query fail all the time with error that is retried on the next node + SimpleProxyRules::fail_all(error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Since there are three nodes only, it is going retry only 3 times, one time per node + AttemptRecordExpectation::RegularFailure(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::RegularFailure(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::RegularFailure(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Failed, + }, + TestCase { + name: "regular failed speculative failed exhausting query plan".to_string(), + request_rules: vec![ + // Will make query fail all the time with error that is retried on the next node + SimpleProxyRules::fail_all(error_retry_next), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(1)), + expectation: vec![ + // Since there are three nodes only, it is going retry only 3 times, one time per node + AttemptRecordExpectation::RegularFailure(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::RegularFailure(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Failed, + }, + TestCase { + name: "Speculative max_retry_count(1) - block one fiber, let last speculative to complete".to_string(), + request_rules: vec![ + // Blocks first query, next one is going to be delayed to give driver a chance to start new fiber + SimpleProxyRules::delay(never), + SimpleProxyRules::delay(tick*3), + ], + speculative_policy: Some(Arc::new(SimpleSpeculativeExecutionPolicy { + max_retry_count: 1, + retry_interval: tick * 2, + })), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Since max_retry_count=1, it is going retry only 1 time, + // even though there are more nodes in the cluster, one time per node + AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + TestCase { + name: "Speculative max_retry_count(2) - block two fibers, let last one to complete".to_string(), + request_rules: vec![ + // Blocks first two queries, next one is going to be delayed to give driver a chance to start new fiber + SimpleProxyRules::delay(never), + SimpleProxyRules::delay(never), + SimpleProxyRules::delay(tick*3), + ], + speculative_policy: Some(Arc::new(SimpleSpeculativeExecutionPolicy { + max_retry_count: 2, + retry_interval: tick * 2, + })), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Since there are three nodes only, it is going retry only 2 times, one attempt per node + AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeNoResponse(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + TestCase { + name: "Speculative max_retry_count(3) - block two fibers, let last one to complete".to_string(), + request_rules: vec![ + // Blocks first two queries, next one is going to be delayed to give driver a chance to start new fiber + // It should not start them since there is only three nodes in the cluster + SimpleProxyRules::delay(never), + SimpleProxyRules::delay(never), + SimpleProxyRules::delay(tick*5), + ], + speculative_policy: simple_speculative.clone(), + retry_policy: Arc::new(TestRetryPolicy::new(3)), + expectation: vec![ + // Since there are three nodes only, it is going retry only 2 times, one time per node + AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + AttemptRecordExpectation::SpeculativeNoResponse(NodeExpectation::UniqueNode(false)), + AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::UniqueNode(false)), + ], + expected_query_result: QueryResultExpectation::Success, + }, + + // ============= DONE =============== + + // TODO: speculative fiber starts prematurely, it still should be retried on regular fiber + // TestCase { + // name: "Speculative - error_retry_same, pass".to_string(), + // request_rules: vec![ + // // Will make query fail once with error that is retried on the next node + // // And then it is going to pass + // SimpleProxyRules::fail(None, error_retry_same), + // ], + // speculative_policy: simple_speculative.clone(), + // retry_policy: Arc::new(TestRetryPolicy::new(3)), + // expectation: vec![ + // // Should retry on next node once and then succeed + // AttemptRecordExpectation::RegularFailure(NodeExpectation::AnyNode(false)), + // AttemptRecordExpectation::RegularSuccess(NodeExpectation::SameNode(false)), + // ], + // expected_query_result: QueryResultExpectation::Success, + // }, + // TestCase { + // name: "Speculative - error_retry_same, pass".to_string(), + // request_rules: vec![ + // // Will make first query scheduled to fail too far in future + // // Next retry should be speculative and should succeed, not waiting for regular fiber response + // SimpleProxyRules::delay(never), + // ], + // speculative_policy: simple_speculative.clone(), + // retry_policy: Arc::new(TestRetryPolicy::new(3)), + // expectation: vec![ + // // First attempt on regular fiber stays without response (response comes too late) + // // Second attempts on speculative fiber succeeds + // AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + // AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::AnotherNode(false)), + // ], + // expected_query_result: QueryResultExpectation::Success, + // }, + // TODO: By some reason speculative fiber start prematurely + // TestCase { + // name: "Speculative - error_retry_same, error_retry_same, pass".to_string(), + // request_rules: vec![ + // // Will make query fail once with error that is retried on the next node + // // Next retry should be speculative and should fail and to be retried on next node + // // Next attempt (speculative as well) should succeed + // SimpleProxyRules::delay(never), + // SimpleProxyRules::fail(None, error_retry_same), + // ], + // speculative_policy: simple_speculative.clone(), + // retry_policy: Arc::new(TestRetryPolicy::new(3)), + // expectation: vec![ + // // First attempt on regular fiber stays without response (response comes too late) + // // Second attempts on speculative fiber succeeds + // AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + // AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::AnotherNode(false)), + // AttemptRecordExpectation::SpeculativeSuccess(NodeExpectation::SameNode(false)), + // ], + // expected_query_result: QueryResultExpectation::Success, + // }, + + // TODO: driver waits regular fiber when speculative fiber failed + // TestCase { + // name: "Speculative - error_retry_same, error_retry_same".to_string(), + // request_rules: vec![ + // // Will make query fail once with error that is retried on the next node + // // Next retry should be speculative and should fail and to be retried on next node + // // Next attempt (speculative as well) should succeed + // SimpleProxyRules::delay(never), + // SimpleProxyRules::fail_all(error_retry_same), + // ], + // speculative_policy: simple_speculative.clone(), + // retry_policy: Arc::new(TestRetryPolicy::new(3)), + // expectation: vec![ + // // First attempt on regular fiber stays without response (response comes too late) + // // Second attempts on speculative fiber fails and only next one completes + // // Regular fiber should stay locked. + // AttemptRecordExpectation::RegularNoResponse(NodeExpectation::AnyNode(false)), + // AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::SameNode(false)), + // AttemptRecordExpectation::SpeculativeFailure(NodeExpectation::SameNode(false)), + // ], + // expected_query_result: QueryResultExpectation::Failed, + // }, + + // TODO: speculative fiber starts prematurely, all requests should fail on regular fiber + // TestCase { + // name: "Speculative - error_retry_same".to_string(), + // request_rules: vec![ + // // Will make query fail all the time with error that is retried on the next node + // SimpleProxyRules::fail_all(error_retry_same), + // ], + // speculative_policy: simple_speculative.clone(), + // retry_policy: Arc::new(TestRetryPolicy::new(3)), + // expectation: vec![ + // // Since there are three nodes only, it is going retry only 3 times, one time per node + // AttemptRecordExpectation::RegularFailure(NodeExpectation::AnyNode(false)), + // AttemptRecordExpectation::RegularFailure(NodeExpectation::SameNode(false)), + // AttemptRecordExpectation::RegularFailure(NodeExpectation::SameNode(false)), + // ], + // expected_query_result: QueryResultExpectation::Failed, + // }, + ]; + + let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { + let translation_map = Arc::new(translation_map); + + // DB preparation phase + let setup_session: Session = session_builder::SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(translation_map.clone()) + .default_execution_profile_handle(ExecutionProfile::builder().retry_policy(Arc::new(FallthroughRetryPolicy)).build().into_handle()) + .build() + .await + .unwrap(); + + setup_session.ddl( + format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks) + ).await.expect("Failed to create keyspace"); + setup_session.use_keyspace(ks.clone(), false).await.unwrap(); + setup_session + .ddl("CREATE TABLE speculative_retry_tests (a int primary key)") + .await + .expect("Unable to create table"); + + // Test execution phase + let mut errors: Vec = Vec::new(); + for test_case in test_cases { + let case_name = test_case.name; + // Setup proxy request rules + if test_case.request_rules.is_empty() { + running_proxy.running_nodes.iter_mut().for_each(|node| { + node.change_request_rules(None); + }); + } else { + let rules = into_proxy_request_rules(test_case.request_rules, target_opcode); + running_proxy.running_nodes.iter_mut().for_each(|node| { + node.change_request_rules(Some(rules.clone())); + }); + } + + // Init session + + let profile = ExecutionProfile::builder() + .speculative_execution_policy(test_case.speculative_policy) + .retry_policy(test_case.retry_policy).build(); + + let hs = Arc::new(HistoryCollector::new()); + let session: Session = session_builder::SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .default_execution_profile_handle(profile.into_handle()) + .address_translator(translation_map.clone()) + .build() + .await + .unwrap(); + + session.use_keyspace(ks.clone(), false).await.unwrap(); + + // Init query with history listener to catch query attempts + let mut q = Query::from("INSERT INTO speculative_retry_tests (a) VALUES (?)"); + q.set_is_idempotent(true); // this is to allow speculative execution to fire + q.set_history_listener(hs.clone()); + + let start = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); + + // Execute query and collect result + let query_result = session.query_unpaged(q.clone(), (3,)).await; + + match test_case.expected_query_result { + QueryResultExpectation::Failed => { + match query_result { + Ok(_) => { + errors.push(format!("{:30} -FAILED-> expected query to fail, but it succeeded", case_name)); + } + Err(err) => { + println!("{:30} -> request failed as expected: {:?}", case_name, err); + } + } + } + QueryResultExpectation::Success => { + match query_result { + Ok(_) => { + println!("{:30} --> request succeeded as expected", case_name); + } + Err(err) => { + errors.push(format!("{:30} -FAILED-> expected query to succeed, but it failed: {:?}", case_name, err)); + } + } + } + } + + // Read query history and convert it to simple representation + let single_query_result = convert_into_simple_history(hs.clone_structured_history()); + let single_query_result = single_query_result[0].clone(); + + // Check if test case expectation met + if let Err(err) = check_expectations(test_case.expectation.clone(), single_query_result.clone()) { + errors.push(format!("{:30} -FAILED-> recorded history does not match expectations, details:\n\t\thistory :\n\t\t{:?}\n\t\texpectations:\n\t\t{:?}\n\t\tdifferences:\n\t\t{}", case_name, single_query_result, test_case.expectation, err)); + } else { + println!("{:30} -> request history expectations {:?} matches recorded one {:?}", case_name, test_case.expectation, single_query_result); + } + + let end = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); + println!("{:30} -> elapsed {:?}", case_name, end - start); + + } + + if !errors.is_empty() { + panic!("{}", errors.join("\n")); + } + + running_proxy + }).await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} diff --git a/scylla/tests/integration/speculative_tests_utils.rs b/scylla/tests/integration/speculative_tests_utils.rs new file mode 100644 index 0000000000..0e4aac2520 --- /dev/null +++ b/scylla/tests/integration/speculative_tests_utils.rs @@ -0,0 +1,458 @@ +use chrono::{DateTime, Utc}; +use scylla::observability::history::{AttemptResult, StructuredHistory}; +use scylla_cql::frame::request::RequestOpcode; +use scylla_cql::frame::response::error::DbError; +use scylla_proxy::{Action, Condition, ConditionHandler, RequestReaction, RequestRule}; +use std::fmt; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +// Attempt fiber type: regular or speculative +#[derive(Debug, Copy, Clone, Hash, PartialEq)] +pub(crate) enum FiberType { + Regular, + Speculative, +} + +impl fmt::Display for FiberType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl Default for FiberType { + fn default() -> Self { + FiberType::Regular + } +} + +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum SimpleAttemptResult { + // Attempt was registered, but no result was collected + NoResponse, + // Attempt succeeded + Success, + // Attempt failed + Failure, +} + +impl fmt::Display for SimpleAttemptResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +// Attempt record, collected from `HistoryListener` +// Needed for better visibility and better API when work with simple history +#[derive(Clone, Debug)] +pub(crate) enum AttemptSimpleHistoryRecord { + RegularNoResponse(DateTime, SocketAddr), + RegularSuccess(DateTime, DateTime, SocketAddr), + RegularFailure(DateTime, DateTime, SocketAddr), + SpeculativeNoResponse(DateTime, SocketAddr), + SpeculativeSuccess(DateTime, DateTime, SocketAddr), + SpeculativeFailure(DateTime, DateTime, SocketAddr), +} + +impl AttemptSimpleHistoryRecord { + fn start_time(&self) -> DateTime { + match self { + AttemptSimpleHistoryRecord::RegularNoResponse(start_time, _) => *start_time, + AttemptSimpleHistoryRecord::RegularFailure(start_time, _, _) => *start_time, + AttemptSimpleHistoryRecord::RegularSuccess(start_time, _, _) => *start_time, + AttemptSimpleHistoryRecord::SpeculativeNoResponse(start_time, _) => *start_time, + AttemptSimpleHistoryRecord::SpeculativeFailure(start_time, _, _) => *start_time, + AttemptSimpleHistoryRecord::SpeculativeSuccess(start_time, _, _) => *start_time, + } + } + + fn done_time(&self) -> Option> { + match self { + AttemptSimpleHistoryRecord::RegularNoResponse(_, _) => None, + AttemptSimpleHistoryRecord::RegularFailure(_, done_time, _) => Some(*done_time), + AttemptSimpleHistoryRecord::RegularSuccess(_, done_time, _) => Some(*done_time), + AttemptSimpleHistoryRecord::SpeculativeNoResponse(_, _) => None, + AttemptSimpleHistoryRecord::SpeculativeFailure(_, start_time, _) => Some(*start_time), + AttemptSimpleHistoryRecord::SpeculativeSuccess(_, start_time, _) => Some(*start_time), + } + } + + fn get_fiber(&self) -> FiberType { + match self { + AttemptSimpleHistoryRecord::RegularNoResponse(_, _) => FiberType::Regular, + AttemptSimpleHistoryRecord::RegularFailure(_, _, _) => FiberType::Regular, + AttemptSimpleHistoryRecord::RegularSuccess(_, _, _) => FiberType::Regular, + AttemptSimpleHistoryRecord::SpeculativeNoResponse(_, _) => FiberType::Speculative, + AttemptSimpleHistoryRecord::SpeculativeFailure(_, _, _) => FiberType::Speculative, + AttemptSimpleHistoryRecord::SpeculativeSuccess(_, _, _) => FiberType::Speculative, + } + } + + fn get_result(&self) -> SimpleAttemptResult { + match self { + AttemptSimpleHistoryRecord::RegularNoResponse(_, _) => SimpleAttemptResult::NoResponse, + AttemptSimpleHistoryRecord::RegularFailure(_, _, _) => SimpleAttemptResult::Failure, + AttemptSimpleHistoryRecord::RegularSuccess(_, _, _) => SimpleAttemptResult::Success, + AttemptSimpleHistoryRecord::SpeculativeNoResponse(_, _) => { + SimpleAttemptResult::NoResponse + } + AttemptSimpleHistoryRecord::SpeculativeFailure(_, _, _) => SimpleAttemptResult::Failure, + AttemptSimpleHistoryRecord::SpeculativeSuccess(_, _, _) => SimpleAttemptResult::Success, + } + } + + fn node_addr(&self) -> &SocketAddr { + match self { + AttemptSimpleHistoryRecord::RegularNoResponse(_, node_addr) => node_addr, + AttemptSimpleHistoryRecord::RegularFailure(_, _, node_addr) => node_addr, + AttemptSimpleHistoryRecord::RegularSuccess(_, _, node_addr) => node_addr, + AttemptSimpleHistoryRecord::SpeculativeNoResponse(_, node_addr) => node_addr, + AttemptSimpleHistoryRecord::SpeculativeFailure(_, _, node_addr) => node_addr, + AttemptSimpleHistoryRecord::SpeculativeSuccess(_, _, node_addr) => node_addr, + } + } +} + +impl fmt::Display for AttemptSimpleHistoryRecord { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +// Is used to be matched against node in `AttemptSimpleHistoryRecord` +#[derive(Clone, Debug)] +pub(crate) enum NodeExpectation { + // Any node + // if bool is true it will forget nodes was met before + AnyNode(bool), + // Node different from prior attempt, if it is first attempt, it can be any node + // if bool is true it will forget nodes was met before + AnotherNode(bool), + // Node is the same as last time, if it is first attempt, it can be any node + // if bool is true it will forget nodes was met before + SameNode(bool), + // Node different from any prior attempt, if it is first attempt, it can be any node + // if bool is true it will forget nodes was met before + UniqueNode(bool), +} + +impl NodeExpectation { + fn expectation_met(&self, nodes_met: &Vec, current_node: &SocketAddr) -> bool { + match &self { + NodeExpectation::AnyNode(_) => true, + NodeExpectation::AnotherNode(_) => { + if nodes_met.is_empty() { + return true; + } + nodes_met[nodes_met.len() - 1] != *current_node + } + NodeExpectation::UniqueNode(_) => !nodes_met.contains(¤t_node), + NodeExpectation::SameNode(_) => { + if nodes_met.is_empty() { + return true; + } + nodes_met[nodes_met.len() - 1] == *current_node + } + } + } + + fn forget_nodes(&self) -> bool { + match &self { + NodeExpectation::AnyNode(forget) => *forget, + NodeExpectation::AnotherNode(forget) => *forget, + NodeExpectation::UniqueNode(forget) => *forget, + NodeExpectation::SameNode(forget) => *forget, + } + } +} + +impl fmt::Display for NodeExpectation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +// Is used to match against whole `AttemptSimpleHistoryRecord` on the timeline +// Node expectation allows you to check on node in `AttemptSimpleHistoryRecord` +// You can check if node is the same as in prior attempt or completely unique +#[derive(Clone, Debug)] +pub(crate) enum AttemptRecordExpectation { + RegularNoResponse(NodeExpectation), + RegularSuccess(NodeExpectation), + RegularFailure(NodeExpectation), + SpeculativeNoResponse(NodeExpectation), + SpeculativeSuccess(NodeExpectation), + SpeculativeFailure(NodeExpectation), +} + +impl AttemptRecordExpectation { + fn node_expectation(&self) -> NodeExpectation { + match self { + AttemptRecordExpectation::RegularNoResponse(node_expectation) => { + node_expectation.clone() + } + AttemptRecordExpectation::RegularSuccess(node_expectation) => node_expectation.clone(), + AttemptRecordExpectation::RegularFailure(node_expectation) => node_expectation.clone(), + AttemptRecordExpectation::SpeculativeNoResponse(node_expectation) => { + node_expectation.clone() + } + AttemptRecordExpectation::SpeculativeSuccess(node_expectation) => { + node_expectation.clone() + } + AttemptRecordExpectation::SpeculativeFailure(node_expectation) => { + node_expectation.clone() + } + } + } + + fn get_fiber(&self) -> FiberType { + match self { + AttemptRecordExpectation::RegularNoResponse(_) => FiberType::Regular, + AttemptRecordExpectation::RegularSuccess(_) => FiberType::Regular, + AttemptRecordExpectation::RegularFailure(_) => FiberType::Regular, + AttemptRecordExpectation::SpeculativeNoResponse(_) => FiberType::Speculative, + AttemptRecordExpectation::SpeculativeSuccess(_) => FiberType::Speculative, + AttemptRecordExpectation::SpeculativeFailure(_) => FiberType::Speculative, + } + } + + fn get_result(&self) -> SimpleAttemptResult { + match self { + AttemptRecordExpectation::RegularNoResponse(_) => SimpleAttemptResult::NoResponse, + AttemptRecordExpectation::RegularSuccess(_) => SimpleAttemptResult::Success, + AttemptRecordExpectation::RegularFailure(_) => SimpleAttemptResult::Failure, + AttemptRecordExpectation::SpeculativeNoResponse(_) => SimpleAttemptResult::NoResponse, + AttemptRecordExpectation::SpeculativeSuccess(_) => SimpleAttemptResult::Success, + AttemptRecordExpectation::SpeculativeFailure(_) => SimpleAttemptResult::Failure, + } + } +} + +impl fmt::Display for AttemptRecordExpectation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +// Check if recorded attempt history meet expectations +pub(crate) fn check_expectations( + expectations: Vec, + result: Vec, +) -> Result<(), String> { + let mut nodes_met: Vec = Vec::new(); + let mut total_results = 0; + + for (attempt_id, ref attempt) in result.clone().into_iter().enumerate() { + total_results += 1; + let expectation = expectations.get(attempt_id); + if expectation.is_none() { + return Err(format!( + "[{}]: end of expectation reached, but extra attempt {} found", + attempt_id, attempt + ) + .into()); + } + let expectation = expectation.unwrap(); + let expected_fiber = expectation.get_fiber(); + let result_fiber = attempt.get_fiber(); + if result_fiber != expected_fiber { + return Err(format!( + "[{}]: attempt was executed on fiber {} instead of {}, result `{}`, expectation `{}`", + attempt_id, result_fiber, expected_fiber, attempt, expectation + ).into()); + } + + let expected_status = expectation.get_result(); + let result_status = attempt.get_result(); + + if expected_status != result_status { + return Err(format!( + "[{}] attempt resulted in {} instead of {}: result `{}`, expectation: `{}`", + attempt_id, result_status, expected_status, attempt, expectation + ) + .into()); + } + let node_expectation = expectation.node_expectation(); + + if !node_expectation.expectation_met(&nodes_met, attempt.node_addr()) { + return Err(format!( + "[{}] attempt was scheduled to the wrong node {}, while expectation `{}`, nodes observed `{}`", + attempt_id, attempt.node_addr(), node_expectation, nodes_met.iter().map(|n| n.to_string()).collect::>().join(", ") + ) + .into()); + } + + if node_expectation.forget_nodes() { + nodes_met.clear() + } + nodes_met.push(*attempt.node_addr()); + } + if expectations.len() > total_results { + return Err(format!( + "end of attempts reached, but there are more expected: {:?}", + expectations.get(total_results..).unwrap() + ) + .into()); + } + + Ok(()) +} + +pub(crate) fn convert_into_simple_history( + history: StructuredHistory, +) -> Vec> { + let mut timeline: Vec<(DateTime, Vec)> = Vec::new(); + history.requests.iter().for_each(|hist| { + let mut attempts_timeline: Vec = Vec::new(); + hist.non_speculative_fiber + .attempts + .iter() + .for_each(|attempt| match attempt.result { + Some(ref result) => match result { + AttemptResult::Success(time) => { + attempts_timeline.push(AttemptSimpleHistoryRecord::RegularSuccess( + attempt.send_time, + *time, + attempt.node_addr, + )) + } + AttemptResult::Error(time, _, _) => { + attempts_timeline.push(AttemptSimpleHistoryRecord::RegularFailure( + attempt.send_time, + *time, + attempt.node_addr, + )) + } + }, + None => attempts_timeline.push(AttemptSimpleHistoryRecord::RegularNoResponse( + attempt.send_time, + attempt.node_addr, + )), + }); + + hist.speculative_fibers.iter().for_each(|fiber| { + fiber + .attempts + .iter() + .for_each(|attempt| match attempt.result { + Some(ref result) => match result { + AttemptResult::Success(time) => { + attempts_timeline.push(AttemptSimpleHistoryRecord::SpeculativeSuccess( + attempt.send_time, + *time, + attempt.node_addr, + )) + } + AttemptResult::Error(time, _, _) => { + attempts_timeline.push(AttemptSimpleHistoryRecord::SpeculativeFailure( + attempt.send_time, + *time, + attempt.node_addr, + )) + } + }, + None => { + attempts_timeline.push(AttemptSimpleHistoryRecord::SpeculativeNoResponse( + attempt.send_time, + attempt.node_addr, + )) + } + }) + }); + attempts_timeline.sort_by(|a, b| { + a.done_time() + .unwrap_or(a.start_time()) + .cmp(&b.done_time().unwrap_or(b.start_time())) + }); + timeline.push((hist.start_time, attempts_timeline)); + }); + + timeline.sort_by(|a, b| a.0.cmp(&b.0)); + let timeline: Vec> = + timeline.iter().map(|(_, res)| res.clone()).collect(); + timeline.into() +} + +pub(crate) enum SimpleProxyRules { + Drop(Option), + Fail(Option, FailError), + Delay(Duration), + FailAll(FailError), +} + +impl SimpleProxyRules { + pub(crate) fn fail(wait: Option, err: fn() -> DbError) -> Self { + SimpleProxyRules::Fail(wait, FailError(Box::new(err))) + } + + pub(crate) fn drop(wait: Option) -> Self { + SimpleProxyRules::Drop(wait) + } + + pub(crate) fn fail_all(err: fn() -> DbError) -> Self { + SimpleProxyRules::FailAll(FailError(Box::new(err))) + } + + pub(crate) fn delay(wait: Duration) -> Self { + SimpleProxyRules::Delay(wait) + } +} + +pub(crate) struct FailError(Box DbError + Send + Sync>); + +fn nth(expected: usize) -> Condition { + let counter = AtomicUsize::new(0); + Condition::CustomCondition(ConditionHandler::new(Arc::new(move |_| { + let value = counter.fetch_add(1, Ordering::Relaxed) + 1; + expected == value + }))) +} + +pub(crate) fn into_proxy_request_rules( + val: Vec, + target_opcode: RequestOpcode, +) -> Vec { + val.into_iter() + .map(|action| match action { + SimpleProxyRules::Delay(d) => RequestRule( + Condition::not(Condition::ConnectionRegisteredAnyEvent) // Not control connection + .and(Condition::RequestOpcode(target_opcode)) // Query only + .and(nth(1)), + RequestReaction { + to_addressee: Some(Action { + delay: Some(d), + msg_processor: None, + }), + to_sender: None, + drop_connection: None, + feedback_channel: None, + } + ), + SimpleProxyRules::Drop(d) => RequestRule( + Condition::not(Condition::ConnectionRegisteredAnyEvent) // Not control connection + .and(Condition::RequestOpcode(target_opcode)) // Query only + .and(nth(1)), + RequestReaction { + to_addressee: None, + to_sender: None, + drop_connection: Some(d), + feedback_channel: None, + }, + ), + SimpleProxyRules::Fail(wait, err) => RequestRule( + Condition::not(Condition::ConnectionRegisteredAnyEvent) // Not control connection + .and(Condition::RequestOpcode(target_opcode)) // Query only + .and(nth(1)), + RequestReaction::forge_with_error_lazy_delay(err.0, wait), + ), + SimpleProxyRules::FailAll(err) => RequestRule( + Condition::not(Condition::ConnectionRegisteredAnyEvent) // Not control connection + .and(Condition::RequestOpcode(target_opcode)), // Query only + // no nth condition, we want to fail everything + RequestReaction::forge_with_error_lazy_delay(err.0, None), + ), + }) + .collect() +}