From f1b259d1ba0886791a43b9c400121a4a71db3efc Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 11 Feb 2025 17:00:53 +0100 Subject: [PATCH 1/8] Remove duplicate test --- scylla/src/client/session_test.rs | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index 267b57fc34..849fa9f5dc 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -1,4 +1,3 @@ -use super::session_builder::SessionBuilder; use crate as scylla; use crate::cluster::metadata::{ColumnType, NativeType}; use crate::query::Query; @@ -7,35 +6,8 @@ use crate::routing::Token; use crate::utils::test_utils::{ create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL, }; -use futures::FutureExt; use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; use scylla_cql::serialize::row::SerializedValues; -use tokio::net::TcpListener; - -#[tokio::test] -async fn test_connection_failure() { - setup_tracing(); - // Make sure that Session::create fails when the control connection - // fails to connect. - - // Create a dummy server which immediately closes the connection. - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - let (fut, _handle) = async move { - loop { - let _ = listener.accept().await; - } - } - .remote_handle(); - tokio::spawn(fut); - - let res = SessionBuilder::new().known_node_addr(addr).build().await; - match res { - Ok(_) => panic!("Unexpected success"), - Err(err) => println!("Connection error (it was expected): {:?}", err), - } -} #[tokio::test] async fn test_prepared_statement() { From e2a08ab1353c03ca77d9cd59a5ddd084958ee65e Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 12 Feb 2025 13:44:26 +0100 Subject: [PATCH 2/8] Fix assert_matches! usage in tests/integration/session.rs --- scylla/tests/integration/session.rs | 32 ++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 6a6b4f4140..9ce6ac2156 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -1153,10 +1153,10 @@ async fn test_request_timeout() { let mut query: Query = Query::new("SELECT * FROM system_schema.tables"); query.set_request_timeout(Some(Duration::from_millis(1))); - match session.query_unpaged(query, &[]).await { - Ok(_) => panic!("the query should have failed due to a client-side timeout"), - Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)), - } + assert_matches!( + session.query_unpaged(query, &[]).await, + Err(ExecutionError::RequestTimeout(_)) + ); let mut prepared = session .prepare("SELECT * FROM system_schema.tables") @@ -1164,10 +1164,10 @@ async fn test_request_timeout() { .unwrap(); prepared.set_request_timeout(Some(Duration::from_millis(1))); - match session.execute_unpaged(&prepared, &[]).await { - Ok(_) => panic!("the prepared query should have failed due to a client-side timeout"), - Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)), - }; + assert_matches!( + session.execute_unpaged(&prepared, &[]).await, + Err(ExecutionError::RequestTimeout(_)) + ); } { let timeouting_session = create_new_session_builder() @@ -1178,10 +1178,10 @@ async fn test_request_timeout() { let mut query = Query::new("SELECT * FROM system_schema.tables"); - match timeouting_session.query_unpaged(query.clone(), &[]).await { - Ok(_) => panic!("the query should have failed due to a client-side timeout"), - Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)), - }; + assert_matches!( + timeouting_session.query_unpaged(query.clone(), &[]).await, + Err(ExecutionError::RequestTimeout(_)) + ); query.set_request_timeout(Some(Duration::from_secs(10000))); @@ -1194,10 +1194,10 @@ async fn test_request_timeout() { .await .unwrap(); - match timeouting_session.execute_unpaged(&prepared, &[]).await { - Ok(_) => panic!("the prepared query should have failed due to a client-side timeout"), - Err(e) => assert_matches!(e, ExecutionError::RequestTimeout(_)), - }; + assert_matches!( + timeouting_session.execute_unpaged(&prepared, &[]).await, + Err(ExecutionError::RequestTimeout(_)) + ); prepared.set_request_timeout(Some(Duration::from_secs(10000))); From c223d1a2d21fa39d43a961fd04eb744f4f75a9de Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 12 Feb 2025 16:16:43 +0100 Subject: [PATCH 3/8] Move test_deserialize_empty_collections to cql_types.rs --- scylla/tests/integration/cql_types.rs | 65 +++++++++++++++++++++++++ scylla/tests/integration/session.rs | 70 ++------------------------- 2 files changed, 68 insertions(+), 67 deletions(-) diff --git a/scylla/tests/integration/cql_types.rs b/scylla/tests/integration/cql_types.rs index bec26c382c..3124fae18e 100644 --- a/scylla/tests/integration/cql_types.rs +++ b/scylla/tests/integration/cql_types.rs @@ -4,6 +4,7 @@ use scylla::serialize::value::SerializeValue; use scylla::value::{Counter, CqlDate, CqlTime, CqlTimestamp, CqlTimeuuid, CqlValue, CqlVarint}; use scylla::{DeserializeValue, SerializeValue}; use std::cmp::PartialEq; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; @@ -1863,3 +1864,67 @@ async fn test_udt_with_missing_field() { ) .await; } + +/// ScyllaDB does not distinguish empty collections from nulls. That is, INSERTing an empty collection +/// is equivalent to nullifying the corresponding column. +/// As pointed out in [#1001](https://github.com/scylladb/scylla-rust-driver/issues/1001), it's a nice +/// QOL feature to be able to deserialize empty CQL collections to empty Rust collections instead of +/// `None::`. This test checks that. +#[tokio::test] +async fn test_deserialize_empty_collections() { + // Setup session. + let ks = unique_keyspace_name(); + let session = create_new_session_builder().build().await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(&ks, true).await.unwrap(); + + async fn deserialize_empty_collection< + Collection: Default + DeserializeOwnedValue + SerializeValue, + >( + session: &Session, + collection_name: &str, + collection_type_params: &str, + ) -> Collection { + // Create a table for the given collection type. + let table_name = "test_empty_".to_owned() + collection_name; + let query = format!( + "CREATE TABLE {} (n int primary key, c {}<{}>)", + table_name, collection_name, collection_type_params + ); + session.ddl(query).await.unwrap(); + + // Populate the table with an empty collection, effectively inserting null as the collection. + session + .query_unpaged( + format!("INSERT INTO {} (n, c) VALUES (?, ?)", table_name,), + (0, Collection::default()), + ) + .await + .unwrap(); + + let query_rows_result = session + .query_unpaged(format!("SELECT c FROM {}", table_name), ()) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let (collection,) = query_rows_result.first_row::<(Collection,)>().unwrap(); + + // Drop the table + collection + } + + let list = deserialize_empty_collection::>(&session, "list", "int").await; + assert!(list.is_empty()); + + let set = deserialize_empty_collection::>(&session, "set", "bigint").await; + assert!(set.is_empty()); + + let map = deserialize_empty_collection::>( + &session, + "map", + "boolean, varint", + ) + .await; + assert!(map.is_empty()); +} diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 9ce6ac2156..7de14dd5de 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -1,4 +1,3 @@ -use crate::utils::DeserializeOwnedValue; use crate::utils::{ create_new_session_builder, scylla_supports_tablets, setup_tracing, supports_feature, unique_keyspace_name, PerformDDL, @@ -27,9 +26,10 @@ use scylla::statement::Consistency; use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; use scylla_cql::serialize::row::{SerializeRow, SerializedValues}; use scylla_cql::serialize::value::SerializeValue; -use scylla_cql::value::{CqlVarint, Row}; +use scylla_cql::value::Row; +use std::collections::BTreeSet; use std::collections::{BTreeMap, HashMap}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; @@ -2854,70 +2854,6 @@ async fn test_manual_primary_key_computation() { } } -/// ScyllaDB does not distinguish empty collections from nulls. That is, INSERTing an empty collection -/// is equivalent to nullifying the corresponding column. -/// As pointed out in [#1001](https://github.com/scylladb/scylla-rust-driver/issues/1001), it's a nice -/// QOL feature to be able to deserialize empty CQL collections to empty Rust collections instead of -/// `None::`. This test checks that. -#[tokio::test] -async fn test_deserialize_empty_collections() { - // Setup session. - let ks = unique_keyspace_name(); - let session = create_new_session_builder().build().await.unwrap(); - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session.use_keyspace(&ks, true).await.unwrap(); - - async fn deserialize_empty_collection< - Collection: Default + DeserializeOwnedValue + SerializeValue, - >( - session: &Session, - collection_name: &str, - collection_type_params: &str, - ) -> Collection { - // Create a table for the given collection type. - let table_name = "test_empty_".to_owned() + collection_name; - let query = format!( - "CREATE TABLE {} (n int primary key, c {}<{}>)", - table_name, collection_name, collection_type_params - ); - session.ddl(query).await.unwrap(); - - // Populate the table with an empty collection, effectively inserting null as the collection. - session - .query_unpaged( - format!("INSERT INTO {} (n, c) VALUES (?, ?)", table_name,), - (0, Collection::default()), - ) - .await - .unwrap(); - - let query_rows_result = session - .query_unpaged(format!("SELECT c FROM {}", table_name), ()) - .await - .unwrap() - .into_rows_result() - .unwrap(); - let (collection,) = query_rows_result.first_row::<(Collection,)>().unwrap(); - - // Drop the table - collection - } - - let list = deserialize_empty_collection::>(&session, "list", "int").await; - assert!(list.is_empty()); - - let set = deserialize_empty_collection::>(&session, "set", "bigint").await; - assert!(set.is_empty()); - - let map = deserialize_empty_collection::>( - &session, - "map", - "boolean, varint", - ) - .await; - assert!(map.is_empty()); -} - #[cfg(cassandra_tests)] #[tokio::test] async fn test_vector_type_metadata() { From 72b59c7e843c0c2abd9e4e7bebad4ef192bff2bf Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 12 Feb 2025 16:19:59 +0100 Subject: [PATCH 4/8] Move timestamp test to statement.rs and add missing cases --- scylla/tests/integration/main.rs | 1 + scylla/tests/integration/session.rs | 196 +----------------- scylla/tests/integration/statement.rs | 273 ++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 194 deletions(-) create mode 100644 scylla/tests/integration/statement.rs diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index d69c0aa0ca..3a42d21a70 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -19,6 +19,7 @@ mod shards; mod silent_prepare_batch; mod silent_prepare_query; mod skip_metadata_optimization; +mod statement; mod tablets; #[path = "../common/utils.rs"] mod utils; diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 7de14dd5de..82d87b8c7d 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -5,7 +5,7 @@ use crate::utils::{ use assert_matches::assert_matches; use futures::{FutureExt, StreamExt as _, TryStreamExt}; use itertools::Itertools; -use scylla::batch::{Batch, BatchStatement, BatchType}; +use scylla::batch::{Batch, BatchStatement}; use scylla::client::caching_session::CachingSession; use scylla::client::execution_profile::ExecutionProfile; use scylla::client::session::Session; @@ -31,7 +31,7 @@ use std::collections::BTreeSet; use std::collections::{BTreeMap, HashMap}; use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::net::TcpListener; use uuid::Uuid; @@ -945,198 +945,6 @@ async fn test_await_schema_agreement() { let _schema_version = session.await_schema_agreement().await.unwrap(); } -#[tokio::test] -async fn test_timestamp() { - setup_tracing(); - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))", - ks - )) - .await - .unwrap(); - - session.await_schema_agreement().await.unwrap(); - - let query_str = format!("INSERT INTO {}.t_timestamp (a, b) VALUES (?, ?)", ks); - - // test regular query timestamps - - let mut regular_query = Query::new(query_str.to_string()); - - regular_query.set_timestamp(Some(420)); - session - .query_unpaged(regular_query.clone(), ("regular query", "higher timestamp")) - .await - .unwrap(); - - regular_query.set_timestamp(Some(42)); - session - .query_unpaged(regular_query.clone(), ("regular query", "lower timestamp")) - .await - .unwrap(); - - // test prepared statement timestamps - - let mut prepared_statement = session.prepare(query_str).await.unwrap(); - - prepared_statement.set_timestamp(Some(420)); - session - .execute_unpaged(&prepared_statement, ("prepared query", "higher timestamp")) - .await - .unwrap(); - - prepared_statement.set_timestamp(Some(42)); - session - .execute_unpaged(&prepared_statement, ("prepared query", "lower timestamp")) - .await - .unwrap(); - - // test batch statement timestamps - - let mut batch: Batch = Default::default(); - batch.append_statement(regular_query); - batch.append_statement(prepared_statement); - - batch.set_timestamp(Some(420)); - session - .batch( - &batch, - ( - ("first query in batch", "higher timestamp"), - ("second query in batch", "higher timestamp"), - ), - ) - .await - .unwrap(); - - batch.set_timestamp(Some(42)); - session - .batch( - &batch, - ( - ("first query in batch", "lower timestamp"), - ("second query in batch", "lower timestamp"), - ), - ) - .await - .unwrap(); - - let query_rows_result = session - .query_unpaged( - format!("SELECT a, b, WRITETIME(b) FROM {}.t_timestamp", ks), - &[], - ) - .await - .unwrap() - .into_rows_result() - .unwrap(); - - let mut results = query_rows_result - .rows::<(&str, &str, i64)>() - .unwrap() - .map(Result::unwrap) - .collect::>(); - results.sort(); - - let expected_results = [ - ("first query in batch", "higher timestamp", 420), - ("prepared query", "higher timestamp", 420), - ("regular query", "higher timestamp", 420), - ("second query in batch", "higher timestamp", 420), - ] - .into_iter() - .collect::>(); - - assert_eq!(results, expected_results); -} - -#[tokio::test] -async fn test_timestamp_generator() { - use rand::random; - use scylla::policies::timestamp_generator::TimestampGenerator; - - setup_tracing(); - struct LocalTimestampGenerator { - generated_timestamps: Arc>>, - } - - impl TimestampGenerator for LocalTimestampGenerator { - fn next_timestamp(&self) -> i64 { - let timestamp = random::().abs(); - self.generated_timestamps.lock().unwrap().insert(timestamp); - timestamp - } - } - - let timestamps = Arc::new(Mutex::new(HashSet::new())); - let generator = LocalTimestampGenerator { - generated_timestamps: timestamps.clone(), - }; - - let session = create_new_session_builder() - .timestamp_generator(Arc::new(generator)) - .build() - .await - .unwrap(); - let ks = unique_keyspace_name(); - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)", - ks - )) - .await - .unwrap(); - - let prepared = session - .prepare(format!( - "INSERT INTO {}.t_generator (a, b) VALUES (1, 1)", - ks - )) - .await - .unwrap(); - session.execute_unpaged(&prepared, []).await.unwrap(); - - let unprepared = Query::new(format!( - "INSERT INTO {}.t_generator (a, b) VALUES (2, 2)", - ks - )); - session.query_unpaged(unprepared, []).await.unwrap(); - - let mut batch = Batch::new(BatchType::Unlogged); - let stmt = session - .prepare(format!( - "INSERT INTO {}.t_generator (a, b) VALUES (3, 3)", - ks - )) - .await - .unwrap(); - batch.append_statement(stmt); - session.batch(&batch, &((),)).await.unwrap(); - - let query_rows_result = session - .query_unpaged( - format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks), - &[], - ) - .await - .unwrap() - .into_rows_result() - .unwrap(); - - let timestamps_locked = timestamps.lock().unwrap(); - assert!(query_rows_result - .rows::<(i32, i32, i64)>() - .unwrap() - .map(|row_result| row_result.unwrap()) - .all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime))); -} - #[ignore = "works on remote Scylla instances only (local ones are too fast)"] #[tokio::test] async fn test_request_timeout() { diff --git a/scylla/tests/integration/statement.rs b/scylla/tests/integration/statement.rs new file mode 100644 index 0000000000..fe255551b8 --- /dev/null +++ b/scylla/tests/integration/statement.rs @@ -0,0 +1,273 @@ +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; +use scylla::{ + batch::{Batch, BatchType}, + query::Query, + response::PagingState, +}; +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; + +#[tokio::test] +async fn test_timestamp() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_timestamp (a text, b text, primary key (a))") + .await + .unwrap(); + + let query_str = "INSERT INTO t_timestamp (a, b) VALUES (?, ?)"; + + // test regular query timestamps + + let mut regular_query = Query::new(query_str.to_string()); + + regular_query.set_timestamp(Some(420)); + session + .query_unpaged(regular_query.clone(), ("regular query", "higher timestamp")) + .await + .unwrap(); + + regular_query.set_timestamp(Some(42)); + session + .query_unpaged(regular_query.clone(), ("regular query", "lower timestamp")) + .await + .unwrap(); + + // test regular query iter timestamps + + let mut regular_query = Query::new(query_str.to_string()); + + regular_query.set_timestamp(Some(420)); + session + .query_iter( + regular_query.clone(), + ("regular query iter", "higher timestamp"), + ) + .await + .unwrap(); + + regular_query.set_timestamp(Some(42)); + session + .query_iter( + regular_query.clone(), + ("regular query iter", "lower timestamp"), + ) + .await + .unwrap(); + + // test regular query single page timestamps + + let mut regular_query = Query::new(query_str.to_string()); + + regular_query.set_timestamp(Some(420)); + session + .query_single_page( + regular_query.clone(), + ("regular query single page", "higher timestamp"), + PagingState::start(), + ) + .await + .unwrap(); + + regular_query.set_timestamp(Some(42)); + session + .query_single_page( + regular_query.clone(), + ("regular query single page", "lower timestamp"), + PagingState::start(), + ) + .await + .unwrap(); + + // test prepared statement timestamps + + let mut prepared_statement = session.prepare(query_str).await.unwrap(); + + prepared_statement.set_timestamp(Some(420)); + session + .execute_unpaged(&prepared_statement, ("prepared query", "higher timestamp")) + .await + .unwrap(); + + prepared_statement.set_timestamp(Some(42)); + session + .execute_unpaged(&prepared_statement, ("prepared query", "lower timestamp")) + .await + .unwrap(); + + // test prepared statement iter timestamps + + let mut prepared_statement = session.prepare(query_str).await.unwrap(); + + prepared_statement.set_timestamp(Some(420)); + session + .execute_iter( + prepared_statement.clone(), + ("prepared query iter", "higher timestamp"), + ) + .await + .unwrap(); + + prepared_statement.set_timestamp(Some(42)); + session + .execute_iter( + prepared_statement, + ("prepared query iter", "lower timestamp"), + ) + .await + .unwrap(); + + // test prepared statement single page timestamps + + let mut prepared_statement = session.prepare(query_str).await.unwrap(); + + prepared_statement.set_timestamp(Some(420)); + session + .execute_single_page( + &prepared_statement, + ("prepared query single page", "higher timestamp"), + PagingState::start(), + ) + .await + .unwrap(); + + prepared_statement.set_timestamp(Some(42)); + session + .execute_single_page( + &prepared_statement, + ("prepared query single page", "lower timestamp"), + PagingState::start(), + ) + .await + .unwrap(); + + // test batch statement timestamps + + let mut batch: Batch = Default::default(); + batch.append_statement(regular_query); + batch.append_statement(prepared_statement); + + batch.set_timestamp(Some(420)); + session + .batch( + &batch, + ( + ("first query in batch", "higher timestamp"), + ("second query in batch", "higher timestamp"), + ), + ) + .await + .unwrap(); + + batch.set_timestamp(Some(42)); + session + .batch( + &batch, + ( + ("first query in batch", "lower timestamp"), + ("second query in batch", "lower timestamp"), + ), + ) + .await + .unwrap(); + + let query_rows_result = session + .query_unpaged("SELECT a, b, WRITETIME(b) FROM t_timestamp", &[]) + .await + .unwrap() + .into_rows_result() + .unwrap(); + + let mut results = query_rows_result + .rows::<(&str, &str, i64)>() + .unwrap() + .map(Result::unwrap) + .collect::>(); + results.sort_unstable(); + + let expected_results = [ + ("first query in batch", "higher timestamp", 420), + ("prepared query", "higher timestamp", 420), + ("prepared query iter", "higher timestamp", 420), + ("prepared query single page", "higher timestamp", 420), + ("regular query", "higher timestamp", 420), + ("regular query iter", "higher timestamp", 420), + ("regular query single page", "higher timestamp", 420), + ("second query in batch", "higher timestamp", 420), + ]; + + assert_eq!(results, expected_results); +} + +#[tokio::test] +async fn test_timestamp_generator() { + use rand::random; + use scylla::policies::timestamp_generator::TimestampGenerator; + + setup_tracing(); + struct LocalTimestampGenerator { + generated_timestamps: Arc>>, + } + + impl TimestampGenerator for LocalTimestampGenerator { + fn next_timestamp(&self) -> i64 { + let timestamp = random::().abs(); + self.generated_timestamps.lock().unwrap().insert(timestamp); + timestamp + } + } + + let timestamps = Arc::new(Mutex::new(HashSet::new())); + let generator = LocalTimestampGenerator { + generated_timestamps: timestamps.clone(), + }; + + let session = create_new_session_builder() + .timestamp_generator(Arc::new(generator)) + .build() + .await + .unwrap(); + let ks = unique_keyspace_name(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_generator (a int primary key, b int)") + .await + .unwrap(); + + let prepared = session + .prepare("INSERT INTO t_generator (a, b) VALUES (1, 1)") + .await + .unwrap(); + session.execute_unpaged(&prepared, []).await.unwrap(); + + let unprepared = Query::new("INSERT INTO t_generator (a, b) VALUES (2, 2)"); + session.query_unpaged(unprepared, []).await.unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let stmt = session + .prepare("INSERT INTO t_generator (a, b) VALUES (3, 3)") + .await + .unwrap(); + batch.append_statement(stmt); + session.batch(&batch, &((),)).await.unwrap(); + + let query_rows_result = session + .query_unpaged("SELECT a, b, WRITETIME(b) FROM t_generator", &[]) + .await + .unwrap() + .into_rows_result() + .unwrap(); + + let timestamps_locked = timestamps.lock().unwrap(); + assert!(query_rows_result + .rows::<(i32, i32, i64)>() + .unwrap() + .map(|row_result| row_result.unwrap()) + .all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime))); +} From d9af125a9aa924317879bab00720721663f43bd3 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 12 Feb 2025 10:40:24 +0100 Subject: [PATCH 5/8] Move test_prepared_config to statement.rs and add additional cases to it --- scylla/tests/integration/session.rs | 15 ------------ scylla/tests/integration/statement.rs | 35 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 82d87b8c7d..0e9bb0905a 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -1013,21 +1013,6 @@ async fn test_request_timeout() { } } -#[tokio::test] -async fn test_prepared_config() { - setup_tracing(); - let session = create_new_session_builder().build().await.unwrap(); - - let mut query = Query::new("SELECT * FROM system_schema.tables"); - query.set_is_idempotent(true); - query.set_page_size(42); - - let prepared_statement = session.prepare(query).await.unwrap(); - - assert!(prepared_statement.get_is_idempotent()); - assert_eq!(prepared_statement.get_page_size(), 42); -} - fn udt_type_a_def(ks: &str) -> Arc> { Arc::new(UserDefinedType { name: "type_a".into(), diff --git a/scylla/tests/integration/statement.rs b/scylla/tests/integration/statement.rs index fe255551b8..6b8e2bc5bf 100644 --- a/scylla/tests/integration/statement.rs +++ b/scylla/tests/integration/statement.rs @@ -3,10 +3,45 @@ use scylla::{ batch::{Batch, BatchType}, query::Query, response::PagingState, + statement::SerialConsistency, }; +use scylla_cql::Consistency; use std::collections::HashSet; use std::sync::{Arc, Mutex}; +#[tokio::test] +async fn test_prepared_config() { + use std::time::Duration; + + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + + let mut query = Query::new("SELECT * FROM system_schema.tables"); + query.set_is_idempotent(true); + query.set_page_size(42); + query.set_consistency(Consistency::One); + query.set_serial_consistency(Some(SerialConsistency::LocalSerial)); + query.set_tracing(true); + query.set_request_timeout(Some(Duration::from_millis(1))); + query.set_timestamp(Some(42)); + + let prepared_statement = session.prepare(query).await.unwrap(); + + assert!(prepared_statement.get_is_idempotent()); + assert_eq!(prepared_statement.get_page_size(), 42); + assert_eq!(prepared_statement.get_consistency(), Some(Consistency::One)); + assert_eq!( + prepared_statement.get_serial_consistency(), + Some(SerialConsistency::LocalSerial) + ); + assert!(prepared_statement.get_tracing()); + assert_eq!( + prepared_statement.get_request_timeout(), + Some(Duration::from_millis(1)) + ); + assert_eq!(prepared_statement.get_timestamp(), Some(42)); +} + #[tokio::test] async fn test_timestamp() { setup_tracing(); From d3cbf9dd608e9f8aeb76555d985ebb9bb3abc8e3 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 13 Feb 2025 15:04:37 +0100 Subject: [PATCH 6/8] Move bind markers and value list tests to statement.rs Extra cases and some small fixes added. --- scylla/tests/integration/session.rs | 120 +------------------- scylla/tests/integration/statement.rs | 153 ++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 119 deletions(-) diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 0e9bb0905a..bab199f10d 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -25,11 +25,9 @@ use scylla::routing::partitioner::{calculate_token_for_partition_key, Partitione use scylla::statement::Consistency; use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; use scylla_cql::serialize::row::{SerializeRow, SerializedValues}; -use scylla_cql::serialize::value::SerializeValue; use scylla_cql::value::Row; use std::collections::BTreeSet; -use std::collections::{BTreeMap, HashMap}; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::net::TcpListener; @@ -1514,62 +1512,6 @@ async fn test_turning_off_schema_fetching() { assert_eq!(keyspace.user_defined_types.len(), 0); } -#[tokio::test] -async fn test_named_bind_markers() { - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - - session - .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) - .await - .unwrap(); - - session - .query_unpaged(format!("USE {}", ks), &[]) - .await - .unwrap(); - - session - .ddl("CREATE TABLE t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v))") - .await - .unwrap(); - - session.await_schema_agreement().await.unwrap(); - - let prepared = session - .prepare("INSERT INTO t (pk, ck, v) VALUES (:pk, :ck, :v)") - .await - .unwrap(); - let hashmap: HashMap<&str, i32> = HashMap::from([("pk", 7), ("v", 42), ("ck", 13)]); - session.execute_unpaged(&prepared, &hashmap).await.unwrap(); - - let btreemap: BTreeMap<&str, i32> = BTreeMap::from([("ck", 113), ("v", 142), ("pk", 17)]); - session.execute_unpaged(&prepared, &btreemap).await.unwrap(); - - let rows: Vec<(i32, i32, i32)> = session - .query_unpaged("SELECT pk, ck, v FROM t", &[]) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, i32)>() - .unwrap() - .map(|res| res.unwrap()) - .collect(); - - assert_eq!(rows, vec![(7, 13, 42), (17, 113, 142)]); - - let wrongmaps: Vec> = vec![ - HashMap::from([("pk", 7), ("fefe", 42), ("ck", 13)]), - HashMap::from([("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", 7)]), - HashMap::new(), - HashMap::from([("ck", 9)]), - ]; - for wrongmap in wrongmaps { - assert!(session.execute_unpaged(&prepared, &wrongmap).await.is_err()); - } -} - #[tokio::test] async fn test_prepared_partitioner() { let session = create_new_session_builder().build().await.unwrap(); @@ -1707,66 +1649,6 @@ async fn test_unprepared_reprepare_in_execute() { assert_eq!(all_rows, vec![(1, 2, 3), (1, 3, 2)]); } -#[tokio::test] -async fn test_unusual_valuelists() { - let _ = tracing_subscriber::fmt::try_init(); - - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session.use_keyspace(ks, false).await.unwrap(); - - session - .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c varchar, primary key (a, b, c))") - .await - .unwrap(); - - let insert_a_b_c = session - .prepare("INSERT INTO tab (a, b, c) VALUES (?, ?, ?)") - .await - .unwrap(); - - let values_dyn: Vec<&dyn SerializeValue> = vec![ - &1 as &dyn SerializeValue, - &2 as &dyn SerializeValue, - &"&dyn" as &dyn SerializeValue, - ]; - session - .execute_unpaged(&insert_a_b_c, values_dyn) - .await - .unwrap(); - - let values_box_dyn: Vec> = vec![ - Box::new(1) as Box, - Box::new(3) as Box, - Box::new("Box dyn") as Box, - ]; - session - .execute_unpaged(&insert_a_b_c, values_box_dyn) - .await - .unwrap(); - - let mut all_rows: Vec<(i32, i32, String)> = session - .query_unpaged("SELECT a, b, c FROM tab", ()) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, String)>() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - all_rows.sort(); - assert_eq!( - all_rows, - vec![ - (1i32, 2i32, "&dyn".to_owned()), - (1, 3, "Box dyn".to_owned()) - ] - ); -} - // A tests which checks that Session::batch automatically reprepares PreparedStatemtns if they become unprepared. // Doing an ALTER TABLE statement clears prepared statement cache and all prepared statements need // to be prepared again. diff --git a/scylla/tests/integration/statement.rs b/scylla/tests/integration/statement.rs index 6b8e2bc5bf..67690b74d6 100644 --- a/scylla/tests/integration/statement.rs +++ b/scylla/tests/integration/statement.rs @@ -1,11 +1,18 @@ use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; +use assert_matches::assert_matches; +use scylla::errors::BadQuery; +use scylla::errors::ExecutionError; +use scylla::serialize::value::SerializeValue; use scylla::{ batch::{Batch, BatchType}, query::Query, response::PagingState, + serialize, statement::SerialConsistency, }; use scylla_cql::Consistency; +use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::HashSet; use std::sync::{Arc, Mutex}; @@ -306,3 +313,149 @@ async fn test_timestamp_generator() { .map(|row_result| row_result.unwrap()) .all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime))); } + +#[tokio::test] +async fn test_named_bind_markers() { + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) + .await + .unwrap(); + + session.use_keyspace(ks, false).await.unwrap(); + + session + .ddl("CREATE TABLE t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v))") + .await + .unwrap(); + + let prepared = session + .prepare("INSERT INTO t (pk, ck, v) VALUES (:pk, :ck, :v)") + .await + .unwrap(); + let hashmap: HashMap<&str, i32> = HashMap::from([("pk", 7), ("v", 42), ("ck", 13)]); + session.execute_unpaged(&prepared, &hashmap).await.unwrap(); + + let btreemap: BTreeMap<&str, i32> = BTreeMap::from([("ck", 113), ("v", 142), ("pk", 17)]); + session.execute_unpaged(&prepared, &btreemap).await.unwrap(); + + let rows: Vec<(i32, i32, i32)> = session + .query_unpaged("SELECT pk, ck, v FROM t", &[]) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, i32)>() + .unwrap() + .collect::>() + .unwrap(); + + assert_eq!(rows, vec![(7, 13, 42), (17, 113, 142)]); + + let wrongmaps = [ + HashMap::from([("v", 7), ("fefe", 42), ("ck", 13)]), + HashMap::from([("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", 7)]), + HashMap::new(), + HashMap::from([("ck", 9)]), + ]; + for wrongmap in wrongmaps { + let result = session.execute_unpaged(&prepared, &wrongmap).await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "pk"); + } +} + +#[tokio::test] +async fn test_unusual_valuelists() { + setup_tracing(); + + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + + session + .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c varchar, primary key (a, b, c))") + .await + .unwrap(); + + let insert_a_b_c = session + .prepare("INSERT INTO tab (a, b, c) VALUES (?, ?, ?)") + .await + .unwrap(); + + let values_dyn: Vec<&dyn SerializeValue> = vec![ + &1 as &dyn SerializeValue, + &2 as &dyn SerializeValue, + &"&dyn" as &dyn SerializeValue, + ]; + session + .execute_unpaged(&insert_a_b_c, values_dyn) + .await + .unwrap(); + + let values_box_dyn: Vec> = vec![ + Box::new(1) as Box, + Box::new(3) as Box, + Box::new("Box dyn") as Box, + ]; + session + .execute_unpaged(&insert_a_b_c, values_box_dyn) + .await + .unwrap(); + + session + .execute_unpaged( + &insert_a_b_c, + ( + Box::new(1) as Box, + &2 as &dyn SerializeValue, + &"Box,&dyn" as &dyn SerializeValue, + ), + ) + .await + .unwrap(); + + session + .execute_unpaged( + &insert_a_b_c, + ( + &2 as &dyn SerializeValue, + Box::new(1) as Box, + &"&dyn,Box" as &dyn SerializeValue, + ), + ) + .await + .unwrap(); + + let mut all_rows: Vec<(i32, i32, String)> = session + .query_unpaged("SELECT a, b, c FROM tab", ()) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, String)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + all_rows.sort_unstable(); + assert_eq!( + all_rows, + vec![ + (1i32, 2i32, "&dyn".to_owned()), + (1, 2i32, "Box,&dyn".to_owned()), + (1, 3, "Box dyn".to_owned()), + (2i32, 1, "&dyn,Box".to_owned()) + ] + ); +} From ba2c83fdbcffc1fd7aff429b2110c654a1cc9af5 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 13 Feb 2025 15:05:39 +0100 Subject: [PATCH 7/8] Add integration tests for named/unnamed bind markers Tests are inspired by https://github.com/scylladb/java-driver/blob/48a9ea0496e5fe6c75eb481789490731c8e33050/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/SimpleStatementCcmIT.java --- scylla/tests/integration/statement.rs | 676 +++++++++++++++++++++++++- 1 file changed, 669 insertions(+), 7 deletions(-) diff --git a/scylla/tests/integration/statement.rs b/scylla/tests/integration/statement.rs index 67690b74d6..d0afe73f03 100644 --- a/scylla/tests/integration/statement.rs +++ b/scylla/tests/integration/statement.rs @@ -1,19 +1,20 @@ use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; use assert_matches::assert_matches; -use scylla::errors::BadQuery; -use scylla::errors::ExecutionError; -use scylla::serialize::value::SerializeValue; +use futures::TryStreamExt; use scylla::{ batch::{Batch, BatchType}, + client::session::Session, + errors::{ + BadQuery, DbError, ExecutionError, NextPageError, PagerExecutionError, RequestAttemptError, + RequestError, + }, query::Query, response::PagingState, - serialize, + serialize::{self, value::SerializeValue}, statement::SerialConsistency, }; use scylla_cql::Consistency; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::{Arc, Mutex}; #[tokio::test] @@ -459,3 +460,664 @@ async fn test_unusual_valuelists() { ] ); } + +const KEY: &str = "test"; + +/// Initialize a cluster with two tables table and insert some data into one of them. +/// Returns a session. +/// +/// # Example +/// ```rust +/// let session = initialize_cluster_two_tables().await; +/// ``` +async fn initialize_cluster_two_tables() -> Session { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS test (k text, v int, PRIMARY KEY(k, v))") + .await + .unwrap(); + + for i in 0..100 { + session + .query_unpaged("INSERT INTO test (k, v) VALUES (?, ?)", (KEY, i)) + .await + .unwrap(); + } + + session + .ddl("CREATE TABLE IF NOT EXISTS test2 (k text primary key, v int)") + .await + .unwrap(); + + session +} + +#[tokio::test] +async fn should_fail_when_too_many_positional_values_provided() { + let session = initialize_cluster_two_tables().await; + let query = Query::new("SELECT v FROM test WHERE k=?"); + + let result = session.query_unpaged(query.clone(), (KEY, 1)).await; + let Err(ExecutionError::LastAttemptError(RequestAttemptError::SerializationError(e))) = result + else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); + + let result = session.query_iter(query.clone(), (KEY, 1)).await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected WrongColumnCount error"); + }; + + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); + + let result = session + .query_single_page(query.clone(), (KEY, 1), PagingState::start()) + .await; + let Err(ExecutionError::LastAttemptError(RequestAttemptError::SerializationError(e))) = result + else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); + + let prepared_query = session.prepare(query).await.unwrap(); + + let result = session.execute_unpaged(&prepared_query, (KEY, 1)).await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); + + let result = session.execute_iter(prepared_query.clone(), (KEY, 1)).await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); + + let result = session + .execute_single_page(&prepared_query, (KEY, 1), PagingState::start()) + .await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 2, + cql_cols: 1, + } + ); +} + +#[tokio::test] +async fn should_fail_when_not_enough_positional_values_provided() { + let session = initialize_cluster_two_tables().await; + + let query = Query::new("SELECT v FROM test WHERE k=?"); + + let result = session.query_unpaged(query.clone(), ()).await; + assert_matches!( + result, + Err(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::Invalid, message) + )) if message.contains("Invalid amount of bind variables") + ); + + let result = session.query_iter(query.clone(), ()).await; + assert_matches!( + result, + Err(PagerExecutionError::NextPageError( + NextPageError::RequestFailure(RequestError::LastAttemptError( + RequestAttemptError::DbError(DbError::Invalid, message) + )) + )) if message.contains("Invalid amount of bind variables") + ); + + let result = session + .query_single_page(query.clone(), (), PagingState::start()) + .await; + assert_matches!( + result, + Err(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::Invalid, message) + )) if message.contains("Invalid amount of bind variables") + ); + + let prepared_query = session.prepare(query).await.unwrap(); + + let result = session.execute_unpaged(&prepared_query, ()).await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 0, + cql_cols: 1, + } + ); + + let result = session.execute_iter(prepared_query.clone(), ()).await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 0, + cql_cols: 1, + } + ); + + let result = session + .execute_single_page(&prepared_query, (), PagingState::start()) + .await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected WrongColumnCount error"); + }; + assert_matches!( + e.downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::WrongColumnCount { + rust_cols: 0, + cql_cols: 1, + } + ); +} + +#[tokio::test] +async fn should_allow_nulls_in_positional_values() { + let session = initialize_cluster_two_tables().await; + + let insert_query = Query::new("INSERT INTO test2 (k, v) VALUES (?, ?)"); + let select_query = Query::new("SELECT k, v FROM test2 WHERE k=?"); + + { + let name = "name1"; + let result = session + .query_unpaged(insert_query.clone(), (name, None::)) + .await; + assert!(result.is_ok()); + + let rows = session + .query_unpaged(select_query.clone(), (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + let row = rows.single_row::<(String, Option)>().unwrap(); + assert_eq!(row, (name.to_string(), None)); + } + { + let name = "name2"; + let result = session + .query_iter(insert_query.clone(), (name, None::)) + .await; + assert!(result.is_ok()); + + let mut iter = session + .query_iter(select_query.clone(), (name,)) + .await + .unwrap() + .rows_stream::<(String, Option)>() + .unwrap(); + + let mut i = 0; + while let Some((a, b)) = iter.try_next().await.unwrap() { + assert_eq!(a, name); + assert!(b.is_none()); + i += 1; + } + assert_eq!(i, 1); + } + + let result = session + .query_single_page( + insert_query.clone(), + ("name3", None::), + PagingState::start(), + ) + .await; + assert!(result.is_ok()); + + let prepared_insert_query = session.prepare(insert_query).await.unwrap(); + let prepared_select_query = session.prepare(select_query).await.unwrap(); + + { + let name = "name4"; + let result = session + .execute_unpaged(&prepared_insert_query, (name, None::)) + .await; + assert!(result.is_ok()); + + let rows = session + .execute_unpaged(&prepared_select_query, (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + let row = rows.single_row::<(String, Option)>().unwrap(); + assert_eq!(row, (name.to_string(), None)); + } + { + let name = "name5"; + let result = session + .execute_iter(prepared_insert_query.clone(), (name, None::)) + .await; + assert!(result.is_ok()); + + let mut iter = session + .execute_iter(prepared_select_query.clone(), (name,)) + .await + .unwrap() + .rows_stream::<(String, Option)>() + .unwrap(); + + let mut i = 0; + while let Some((a, b)) = iter.try_next().await.unwrap() { + assert_eq!(a, name); + assert!(b.is_none()); + i += 1; + } + assert_eq!(i, 1); + } + + let result = session + .execute_single_page( + &prepared_insert_query, + ("name6", None::), + PagingState::start(), + ) + .await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn should_allow_nulls_in_named_bind_markers() { + let session = initialize_cluster_two_tables().await; + + let query = Query::new("INSERT INTO test2 (k, v) VALUES (:k, :v)"); + let select_query = Query::new("SELECT k, v FROM test2 WHERE k=?"); + + let prepared_query = session.prepare(query.clone()).await.unwrap(); + + #[derive(scylla::SerializeRow)] + struct TestStruct<'a> { + k: &'a str, + v: Option, + } + + { + let name = "name1"; + session + .query_unpaged( + query.clone(), + TestStruct { + k: name, + v: None::, + }, + ) + .await + .unwrap(); + let rows: Vec<(String, Option)> = session + .query_unpaged(select_query.clone(), (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(String, Option)>() + .unwrap() + .map(|res| res.unwrap()) + .collect(); + + assert_eq!(rows, vec![(name.to_string(), None::)]); + } + { + let name = "name2"; + session + .query_iter( + query.clone(), + TestStruct { + k: name, + v: None::, + }, + ) + .await + .unwrap(); + let rows: Vec<(String, Option)> = session + .query_unpaged(select_query.clone(), (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(String, Option)>() + .unwrap() + .map(|res| res.unwrap()) + .collect(); + + assert_eq!(rows, vec![(name.to_string(), None::)]); + } + { + let name = "name3"; + session + .execute_unpaged( + &prepared_query, + TestStruct { + k: name, + v: None::, + }, + ) + .await + .unwrap(); + let rows: Vec<(String, Option)> = session + .query_unpaged(select_query.clone(), (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(String, Option)>() + .unwrap() + .map(|res| res.unwrap()) + .collect(); + + assert_eq!(rows, vec![(name.to_string(), None::)]); + } + { + let name = "name4"; + session + .execute_iter( + prepared_query.clone(), + TestStruct { + k: name, + v: None::, + }, + ) + .await + .unwrap(); + let rows: Vec<(String, Option)> = session + .query_unpaged(select_query.clone(), (name,)) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(String, Option)>() + .unwrap() + .map(|res| res.unwrap()) + .collect(); + + assert_eq!(rows, vec![(name.to_string(), None::)]); + } +} + +#[tokio::test] +async fn test_named_bind_markers_with_case_sensitive_id() { + let session = initialize_cluster_two_tables().await; + + let regular_query = Query::new("SELECT count(*) FROM test2 WHERE k=:theKey"); + let prepared_regular_query = session.prepare(regular_query.clone()).await.unwrap(); + let case_sensitive_query = Query::new("SELECT count(*) FROM test2 WHERE k=:\"theKey\""); + let prepared_case_sensitive_query = + session.prepare(case_sensitive_query.clone()).await.unwrap(); + + let hashmap_case_sensitive: HashMap<&str, &str> = HashMap::from([("theKey", "name")]); + let hashmap_regular: HashMap<&str, &str> = HashMap::from([("thekey", "name")]); + { + let result = session + .query_unpaged(regular_query.clone(), &hashmap_case_sensitive) + .await; + let Err(ExecutionError::LastAttemptError(RequestAttemptError::SerializationError(e))) = + result + else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "thekey"); + + let rows = session + .query_unpaged(regular_query.clone(), &hashmap_regular) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + assert!(rows.single_row::<(i64,)>().unwrap().0 == 0); + + let result = session + .query_unpaged(case_sensitive_query.clone(), &hashmap_regular) + .await; + let Err(ExecutionError::LastAttemptError(RequestAttemptError::SerializationError(e))) = + result + else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "theKey"); + + let rows = session + .query_unpaged(case_sensitive_query.clone(), &hashmap_case_sensitive) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + assert!(rows.single_row::<(i64,)>().unwrap().0 == 0); + } + { + let result = session + .query_iter(regular_query.clone(), &hashmap_case_sensitive) + .await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "thekey"); + + let mut iter = session + .query_iter(regular_query.clone(), &hashmap_regular) + .await + .unwrap() + .rows_stream::<(i64,)>() + .unwrap(); + let mut i = 0; + while let Some((a,)) = iter.try_next().await.unwrap() { + assert_eq!(a, 0); + i += 1; + } + assert_eq!(i, 1); + + let result = session + .query_iter(case_sensitive_query.clone(), &hashmap_regular) + .await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "theKey"); + + let mut iter = session + .query_iter(case_sensitive_query.clone(), &hashmap_case_sensitive) + .await + .unwrap() + .rows_stream::<(i64,)>() + .unwrap(); + let mut i = 0; + while let Some((a,)) = iter.try_next().await.unwrap() { + assert_eq!(a, 0); + i += 1; + } + assert_eq!(i, 1); + } + { + let result = session + .execute_unpaged(&prepared_regular_query, &hashmap_case_sensitive) + .await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "thekey"); + + let rows = session + .execute_unpaged(&prepared_regular_query, &hashmap_regular) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + assert!(rows.single_row::<(i64,)>().unwrap().0 == 0); + + let result = session + .execute_unpaged(&prepared_case_sensitive_query, &hashmap_regular) + .await; + let Err(ExecutionError::BadQuery(BadQuery::SerializationError(e))) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "theKey"); + + let rows = session + .execute_unpaged(&prepared_case_sensitive_query, &hashmap_case_sensitive) + .await + .unwrap() + .into_rows_result() + .unwrap(); + assert_eq!(rows.rows_num(), 1); + assert!(rows.single_row::<(i64,)>().unwrap().0 == 0); + } + { + let result = session + .execute_iter(prepared_regular_query.clone(), &hashmap_case_sensitive) + .await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "thekey"); + + let mut iter = session + .execute_iter(prepared_regular_query.clone(), &hashmap_regular) + .await + .unwrap() + .rows_stream::<(i64,)>() + .unwrap(); + let mut i = 0; + while let Some((a,)) = iter.try_next().await.unwrap() { + assert_eq!(a, 0); + i += 1; + } + assert_eq!(i, 1); + + let result = session + .execute_iter(prepared_case_sensitive_query.clone(), &hashmap_regular) + .await; + let Err(PagerExecutionError::SerializationError(e)) = result else { + panic!("Expected ValueMissingForColumn error"); + }; + assert_matches!(&e + .downcast_ref::() + .unwrap() + .kind, + serialize::row::BuiltinTypeCheckErrorKind::ValueMissingForColumn { name } + if name == "theKey"); + + let mut iter = session + .execute_iter( + prepared_case_sensitive_query.clone(), + &hashmap_case_sensitive, + ) + .await + .unwrap() + .rows_stream::<(i64,)>() + .unwrap(); + let mut i = 0; + while let Some((a,)) = iter.try_next().await.unwrap() { + assert_eq!(a, 0); + i += 1; + } + assert_eq!(i, 1); + } +} From 8c417708392aa981781c75f5e41becf17c3e79a8 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 13 Feb 2025 15:26:23 +0100 Subject: [PATCH 8/8] Fix panic while generatic a timestamp If random::() produces i64::MIN, calling .abs() on it causes an overflow. This commit fixes that by using random::(). --- scylla/tests/integration/statement.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla/tests/integration/statement.rs b/scylla/tests/integration/statement.rs index d0afe73f03..864df956b5 100644 --- a/scylla/tests/integration/statement.rs +++ b/scylla/tests/integration/statement.rs @@ -259,7 +259,7 @@ async fn test_timestamp_generator() { impl TimestampGenerator for LocalTimestampGenerator { fn next_timestamp(&self) -> i64 { - let timestamp = random::().abs(); + let timestamp = (random::() as i64).abs(); self.generated_timestamps.lock().unwrap().insert(timestamp); timestamp }