Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions scylla/tests/ccm_integration/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#[path = "../common/mod.rs"]
mod common;
mod new_session;
165 changes: 165 additions & 0 deletions scylla/tests/ccm_integration/new_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use assert_matches::assert_matches;
use scylla::client::execution_profile::ExecutionProfile;
use scylla::client::session_builder::SessionBuilder;
use scylla::errors::{ConnectionError, ConnectionPoolError, ExecutionError, NewSessionError};
use scylla::policies::retry::DefaultRetryPolicy;
use std::sync::Arc;

use crate::common::retry_policy::NoRetryPolicy;
use crate::common::utils::setup_tracing;

fn get_scylla() -> (String, String, String) {
let uri1 = std::env::var("SCYLLA_URI1").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let uri2 = std::env::var("SCYLLA_URI2").unwrap_or_else(|_| "127.0.0.2:9042".to_string());
let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string());

(uri1, uri2, uri3)
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn proceed_if_only_some_hostnames_are_invalid() {
setup_tracing();
// on purpose left without port
let uri1 = "scylladbisthefastestdb.invalid".to_owned();
// correctly provided port, but unknown domain
let uri2 = "cassandrasuckssomuch.invalid:9042".to_owned();
let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string());

let session = SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.build()
.await
.unwrap();

assert!(session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn all_hostnames_invalid() {
setup_tracing();
let uri = "cassandrasuckssomuch.invalid:9042".to_owned();

assert_matches!(
SessionBuilder::new().known_node(uri).build().await,
Err(NewSessionError::FailedToResolveAnyHostname(_))
);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_enabled() {
setup_tracing();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy))
.build();

assert!(SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_disabled() {
setup_tracing();
// TODO: Replace with CCM
let (uri1, uri2, uri3) = get_scylla();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(NoRetryPolicy))
.build();

assert_matches!(
SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await,
Err(NewSessionError::FailedToResolveAnyHostname(_))
);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_enabled_nodes_coming_back() {
setup_tracing();
// TODO: Setup CCM

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy))
.build();

assert!(SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn session_created_nodes_away_reconnection_enabled() {
setup_tracing();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy))
.build();

let _session = SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.unwrap();

assert!(true);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn session_created_nodes_away_reconnection_disabled() {
setup_tracing();

// TODO: Replace with CCM
let (uri1, uri2, uri3) = get_scylla();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(NoRetryPolicy))
.build();

let session = SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.unwrap();

// TODO: Everything should be fine
assert!(session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await
.is_ok());

// TODO: Stop the nodes

// TODO: Check the connection -> fails to execute query
assert_matches!(
session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await,
Err(ExecutionError::ConnectionPoolError(
ConnectionPoolError::Broken {
last_connection_error: ConnectionError::BrokenConnection(_),
}
))
);

assert!(true);
}
2 changes: 2 additions & 0 deletions scylla/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod retry_policy;
pub(crate) mod utils;
26 changes: 26 additions & 0 deletions scylla/tests/common/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use scylla::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use std::fmt::Debug;

#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub(crate) struct NoRetryPolicy;

#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub(crate) struct NoRetrySession;

impl RetryPolicy for NoRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(NoRetrySession)
}
}

impl RetrySession for NoRetrySession {
fn decide_should_retry(&mut self, _request_info: RequestInfo) -> RetryDecision {
RetryDecision::DontRetry
}

fn reset(&mut self) {
*self = Default::default()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ pub(crate) fn setup_tracing() {
.try_init();
}

#[allow(dead_code)]
static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0);

#[allow(dead_code)]
pub(crate) fn unique_keyspace_name() -> String {
let cnt = UNIQUE_COUNTER.fetch_add(1, Ordering::SeqCst);
let name = format!(
Expand All @@ -45,6 +47,7 @@ pub(crate) fn unique_keyspace_name() -> String {
name
}

#[allow(dead_code)]
pub(crate) async fn test_with_3_node_cluster<F, Fut>(
shard_awareness: ShardAwareness,
test: F,
Expand Down Expand Up @@ -95,6 +98,7 @@ where
running_proxy.finish().await
}

#[allow(dead_code)]
pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool {
// Cassandra doesn't have a concept of features, so first detect
// if there is the `supported_features` column in system.local
Expand Down Expand Up @@ -127,13 +131,15 @@ pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool {
.any(|f| f == feature)
}

#[allow(dead_code)]
pub(crate) async fn scylla_supports_tablets(session: &Session) -> bool {
supports_feature(session, "TABLETS").await
}

// Creates a generic session builder based on conditional compilation configuration
// For SessionBuilder of DefaultMode type, adds localhost to known hosts, as all of the tests
// connect to localhost.
#[allow(dead_code)]
pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl SessionBuilderKind> {
let session_builder = {
#[cfg(not(scylla_cloud_tests))]
Expand Down Expand Up @@ -168,6 +174,7 @@ pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl Session

// Shorthands for better readability.
// Copied from Scylla because we don't want to make it public there.
#[allow(dead_code)]
pub(crate) trait DeserializeOwnedValue:
for<'frame, 'metadata> DeserializeValue<'frame, 'metadata>
{
Expand All @@ -182,6 +189,7 @@ impl<T> DeserializeOwnedValue for T where
// This is to make sure that all DDL queries land on the same node,
// to prevent errors from concurrent DDL queries executed on different nodes.
#[derive(Debug)]
#[allow(dead_code)]
struct SchemaQueriesLBP;

impl LoadBalancingPolicy for SchemaQueriesLBP {
Expand Down Expand Up @@ -210,6 +218,7 @@ impl LoadBalancingPolicy for SchemaQueriesLBP {
}

#[derive(Debug, Default)]
#[allow(dead_code)]
struct SchemaQueriesRetrySession {
count: usize,
}
Expand Down Expand Up @@ -242,14 +251,16 @@ impl RetrySession for SchemaQueriesRetrySession {
}

#[derive(Debug)]
#[allow(dead_code)]
struct SchemaQueriesRetryPolicy;

impl RetryPolicy for SchemaQueriesRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(SchemaQueriesRetrySession::default())
Box::<SchemaQueriesRetrySession>::default()
}
}

#[allow(dead_code)]
fn apply_ddl_lbp(query: &mut Query) {
let policy = query
.get_execution_profile_handle()
Expand All @@ -265,6 +276,7 @@ fn apply_ddl_lbp(query: &mut Query) {
// we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...)
// or something like that.
#[async_trait::async_trait]
#[allow(dead_code)]
pub(crate) trait PerformDDL {
async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), ExecutionError>;
}
Expand Down
4 changes: 2 additions & 2 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ mod history;
mod hygiene;
mod large_batch_statements;
mod lwt_optimisation;
mod new_session;
mod retries;
mod self_identity;
mod shards;
mod silent_prepare_batch;
mod silent_prepare_query;
mod skip_metadata_optimization;
mod tablets;
pub(crate) mod utils;
#[path = "../common/utils.rs"]
mod utils;
38 changes: 0 additions & 38 deletions scylla/tests/integration/new_session.rs

This file was deleted.

Loading