diff --git a/scylla/tests/integration/batch.rs b/scylla/tests/integration/batch.rs index 29868e7160..65e22ec516 100644 --- a/scylla/tests/integration/batch.rs +++ b/scylla/tests/integration/batch.rs @@ -1,16 +1,84 @@ +use crate::utils::setup_tracing; +use crate::utils::unique_keyspace_name; +use crate::utils::PerformDDL; +use crate::utils::{create_new_session_builder, scylla_supports_tablets}; use scylla::batch::Batch; use scylla::batch::BatchType; -use scylla::errors::{ExecutionError, RequestAttemptError}; +use scylla::client::session::Session; +use scylla::errors::{DbError, ExecutionError, RequestAttemptError}; use scylla::frame::frame_errors::BatchSerializationError; use scylla::frame::frame_errors::CqlRequestSerializationError; +use scylla::prepared_statement::PreparedStatement; use scylla::query::Query; - -use crate::utils::create_new_session_builder; -use crate::utils::setup_tracing; -use crate::utils::unique_keyspace_name; -use crate::utils::PerformDDL; +use scylla::value::{Counter, CqlValue, MaybeUnset}; +use std::collections::HashMap; +use std::string::String; +use std::sync::Arc; use assert_matches::assert_matches; +use scylla::response::query_result::{QueryResult, QueryRowsResult}; + +const BATCH_COUNT: usize = 100; + +async fn create_test_session(table_name: &str, supports_tablets: bool) -> Session { + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); + + // Need to disable tablets in this test because they don't support counters yet. + // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e). + if !supports_tablets && scylla_supports_tablets(&session).await { + create_ks += " AND TABLETS = {'enabled': false}" + } + session.ddl(create_ks).await.unwrap(); + session.use_keyspace(&ks, false).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))", + table_name + )) + .await + .unwrap(); + session +} + +async fn create_counter_tables(session: &Session) { + for &table in ["counter1", "counter2", "counter3"].iter() { + session + .ddl(format!( + "CREATE TABLE {} (k0 text PRIMARY KEY, c counter)", + table + )) + .await + .unwrap(); + } +} + +async fn verify_batch_insert(session: &Session, test_name: &str, count: usize) { + let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", test_name); + let query_result = session + .query_unpaged(select_query, (test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!(rows.len(), count); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, k1 + 1); + } +} + +async fn prepare_insert_statement(session: &Session, table: &str) -> PreparedStatement { + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", table); + session.prepare(Query::new(query_str)).await.unwrap() +} #[tokio::test] #[ntest::timeout(60000)] @@ -76,3 +144,553 @@ async fn batch_statements_and_values_mismatch_detected() { ) } } + +#[tokio::test] +async fn test_batch() { + setup_tracing(); + let session = Arc::new(create_new_session_builder().build().await.unwrap()); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", + ks + )) + .await + .unwrap(); + + let prepared_statement = session + .prepare(format!( + "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", + ks + )) + .await + .unwrap(); + + // TODO: Add API that supports binding values to statements in batch creation process, + // to avoid problem of statements/values count mismatch + use scylla::batch::Batch; + let mut batch: Batch = Default::default(); + batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", ks)[..]); + batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')", ks)[..]); + batch.append_statement(prepared_statement.clone()); + + let four_value: i32 = 4; + let hello_value: String = String::from("hello"); + let session_clone = session.clone(); + // We're spawning to a separate task here to test that it works even in that case, because in some scenarios + // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries) + // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on + // some lifetime for some unknown reason), so can't be spawned on tokio. + // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details + tokio::spawn(async move { + let values = ( + (1_i32, 2_i32, "abc"), + (), + (1_i32, &four_value, hello_value.as_str()), + ); + session_clone.batch(&batch, values).await.unwrap(); + }) + .await + .unwrap(); + + let mut results: Vec<(i32, i32, String)> = session + .query_unpaged(format!("SELECT a, b, c FROM {}.t_batch", ks), &[]) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, String)>() + .unwrap() + .collect::>() + .unwrap(); + + results.sort(); + assert_eq!( + results, + vec![ + (1, 2, String::from("abc")), + (1, 4, String::from("hello")), + (7, 11, String::from("")) + ] + ); + + // Test repreparing statement inside a batch + let mut batch: Batch = Default::default(); + batch.append_statement(prepared_statement); + let values = ((4_i32, 20_i32, "foobar"),); + + // This statement flushes the prepared statement cache + session + .ddl(format!( + "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", + ks + )) + .await + .unwrap(); + session.batch(&batch, values).await.unwrap(); + + let results: Vec<(i32, i32, String)> = session + .query_unpaged( + format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks), + &[], + ) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, String)>() + .unwrap() + .collect::>() + .unwrap(); + + assert_eq!(results, vec![(4, 20, String::from("foobar"))]); +} + +// This is a regression test for #1134. +#[tokio::test] +async fn test_batch_to_multiple_tables() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(&ks, true).await.unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))") + .await + .unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))") + .await + .unwrap(); + + let prepared_statement = session + .prepare( + " + BEGIN BATCH + INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?); + INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?); + APPLY BATCH; + ", + ) + .await + .unwrap(); + + session + .execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma")) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_batch_of_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_simple_statements"); + let session = create_test_session(&test_name, true).await; + + let mut batch = Batch::new(BatchType::Unlogged); + for i in 0..BATCH_COUNT { + let simple_statement = Query::new(format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})", + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + } + session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_prepared_statements() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements"); + let session = create_test_session(&test_name, true).await; + + let prepared = prepare_insert_statement(&session, &test_name).await; + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + batch_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_prepared_batch() { + setup_tracing(); + let test_name = String::from("test_prepared_batch"); + let session = create_test_session(&test_name, true).await; + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(Query::new(query_str.clone())); + batch_values.push((&test_name, i, i + 1)); + } + let prepared_batch = session.prepare_batch(&batch).await.unwrap(); + session.batch(&prepared_batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_prepared_statements_with_unset_values() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements_with_unset_values"); + let session = create_test_session(&test_name, true).await; + + let prepared = prepare_insert_statement(&session, &test_name).await; + let mut batch1 = Batch::new(BatchType::Unlogged); + let mut batch1_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch1.append_statement(prepared.clone()); + batch1_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch1, batch1_values).await.unwrap(); + + // Update v to (k1 + 2), but for every 20th row leave v unset. + let mut batch2 = Batch::new(BatchType::Unlogged); + let mut batch2_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch2.append_statement(prepared.clone()); + if i % 20 == 0 { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Unset, + )); + } else { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Set(i + 2), + )); + } + } + session.batch(&batch2, batch2_values).await.unwrap(); + + // Verify that rows with k1 % 20 == 0 retain the original value. + let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", &test_name); + let query_result = session + .query_unpaged(select_query, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!( + rows.len(), + BATCH_COUNT, + "Expected {} rows, got {}", + BATCH_COUNT, + rows.len() + ); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 }); + } +} + +#[tokio::test] +async fn test_batch_of_prepared_statements_named_variables() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements_named_variables"); + let session = create_test_session(&test_name, true).await; + + let query_str = format!( + "INSERT INTO {} (k0, k1, v) VALUES (:k0, :k1, :v)", + &test_name + ); + let prepared = session.prepare(query_str).await.unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + let mut values = HashMap::new(); + values.insert("k0", CqlValue::Text(test_name.clone())); + values.insert("k1", CqlValue::Int(i)); + values.insert("v", CqlValue::Int(i + 1)); + batch_values.push(values); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_mixed_prepared_and_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_mixed_prepared_and_simple_statements"); + let session = create_test_session(&test_name, true).await; + + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name); + let prepared_bound = session + .prepare(Query::new(query_str.clone())) + .await + .unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + if i % 2 == 1 { + let simple_statement = Query::new(format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})", + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + batch_values.push(vec![]); + } else { + batch.append_statement(prepared_bound.clone()); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(i), + CqlValue::Int(i + 1), + ]); + } + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +// Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied. +#[tokio::test] +async fn test_batch_lwts() { + let test_name = String::from("test_batch_lwts"); + let session = create_test_session(&test_name, false).await; + + session + .query_unpaged( + format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name), + (&test_name, 0, 0), + ) + .await + .unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + batch.append_statement( + format!( + "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0", + &test_name, &test_name + ) + .as_str(), + ); + batch.append_statement( + format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)", + &test_name, &test_name + ) + .as_str(), + ); + batch.append_statement( + format!( + "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0", + &test_name, &test_name + ) + .as_str(), + ); + + let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); + let batch_deserializer = batch_res.into_rows_result().unwrap(); + + // Scylla always returns a result set with additional 'applied' column, which indicates whether the statement was applied. + // In contrast, Cassandra returns only 'applied' column for applied transactions; and 'applied' column + // with result set for rejected ones. + // This is described in https://opensource.docs.scylladb.com/stable/features/lwt.html and + // https://opensource.docs.scylladb.com/stable/kb/lwt-differences.html). + let is_scylla: bool = batch_deserializer.column_specs().len() == 4; + + if is_scylla { + test_batch_lwts_for_scylla(&session, &batch, &batch_deserializer, &test_name).await; + } else { + test_batch_lwts_for_cassandra(&session, &batch, &batch_deserializer, &test_name).await; + } +} + +async fn test_batch_lwts_for_scylla( + session: &Session, + batch: &Batch, + query_rows_result: &QueryRowsResult, + k0_value: &str, +) { + // Alias required by clippy + type IntOrNull = Option; + type StrOrNull = Option; + + // Returned columns are: + // [applied], k0, k1, v + let batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = query_rows_result + .rows() + .unwrap() + .collect::>() + .unwrap(); + + let k0_value = k0_value.to_string(); + let expected_batch_res_rows = vec![ + (true, Some(k0_value.clone()), Some(0), Some(0)), + (true, None, None, None), + (true, Some(k0_value.clone()), Some(0), Some(0)), + ]; + + assert_eq!(batch_res_rows, expected_batch_res_rows); + + let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); + let prepared_batch_res: QueryResult = + session.batch(&prepared_batch, ((), (), ())).await.unwrap(); + + let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res + .into_rows_result() + .unwrap() + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_prepared_batch_res_rows = vec![ + (false, Some(k0_value.clone()), Some(0), Some(1)), + (false, None, None, None), + (false, Some(k0_value.clone()), Some(0), Some(1)), + ]; + + assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); +} + +async fn test_batch_lwts_for_cassandra( + session: &Session, + batch: &Batch, + query_rows_result: &QueryRowsResult, + k0_value: &str, +) { + // Alias required by clippy + type IntOrNull = Option; + type StrOrNull = Option; + + // Returned columns are: + // [applied] + let batch_res_rows: Vec<(bool,)> = query_rows_result + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_batch_res_rows = vec![(true,)]; + + assert_eq!(batch_res_rows, expected_batch_res_rows); + + let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); + let prepared_batch_res: QueryResult = + session.batch(&prepared_batch, ((), (), ())).await.unwrap(); + + // Returned columns are: + // [applied], k0, k1, v + let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res + .into_rows_result() + .unwrap() + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_prepared_batch_res_rows = + vec![(false, Some(k0_value.to_string()), Some(0), Some(1))]; + + assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); +} + +#[tokio::test] +async fn test_counter_batch() { + setup_tracing(); + let test_name = String::from("test_counter_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((Counter(i), &test_name)); + } + session.batch(&batch, batch_values).await.unwrap(); + + for i in 1..=3 { + let query_str = format!("SELECT c FROM counter{} WHERE k0 = ?", i); + let query_result = session + .query_unpaged(query_str, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let row = query_result.single_row::<(Counter,)>().unwrap(); + let (c,) = row; + assert_eq!(c, Counter(i)); + } +} + +#[tokio::test] +async fn test_fail_logged_batch_with_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_logged_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Logged); + let mut batch_values: Vec<_> = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((Counter(i), &test_name)); + } + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::LastAttemptError(RequestAttemptError::DbError(DbError::Invalid, _)), + "Expected an Invalid DbError when using counter statements in a LOGGED batch" + ); +} + +#[tokio::test] +async fn test_fail_counter_batch_with_non_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_counter_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values: Vec> = Vec::new(); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]); + } + + let prepared = prepare_insert_statement(&session, &test_name).await; + batch.append_statement(prepared); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(0), + CqlValue::Int(1), + ]); + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::BadQuery(_), + "Expected a BadQuery error when including a non-counter statement in a COUNTER batch" + ); +} diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 6a6b4f4140..3dad603967 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -35,7 +35,7 @@ use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; use uuid::Uuid; -use scylla::response::query_result::{QueryResult, QueryRowsResult}; +use scylla::response::query_result::QueryResult; #[tokio::test] async fn test_connection_failure() { @@ -166,200 +166,6 @@ async fn test_unprepared_statement() { assert_eq!(results_from_manual_paging, results); } -#[tokio::test] -async fn test_counter_batch() { - use scylla::value::Counter; - use scylla_cql::frame::request::batch::BatchType; - - setup_tracing(); - let session = Arc::new(create_new_session_builder().build().await.unwrap()); - let ks = unique_keyspace_name(); - - // Need to disable tablets in this test because they don't support counters yet. - // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e). - let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); - if scylla_supports_tablets(&session).await { - create_ks += " AND TABLETS = {'enabled': false}" - } - - session.ddl(create_ks).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (key int PRIMARY KEY, value counter)", - ks - )) - .await - .unwrap(); - - let statement_str = format!("UPDATE {}.t_batch SET value = value + ? WHERE key = ?", ks); - let query = Query::from(statement_str); - let prepared = session.prepare(query.clone()).await.unwrap(); - - let mut counter_batch = Batch::new(BatchType::Counter); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - - // Check that we do not get a server error - the driver - // should send a COUNTER batch instead of a LOGGED (default) one. - session - .batch( - &counter_batch, - ( - (Counter(1), 1), - (Counter(2), 2), - (Counter(3), 3), - (Counter(4), 4), - (Counter(5), 5), - (Counter(6), 6), - ), - ) - .await - .unwrap(); -} - -#[tokio::test] -async fn test_batch() { - setup_tracing(); - let session = Arc::new(create_new_session_builder().build().await.unwrap()); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", - ks - )) - .await - .unwrap(); - - let prepared_statement = session - .prepare(format!( - "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", - ks - )) - .await - .unwrap(); - - // TODO: Add API that supports binding values to statements in batch creation process, - // to avoid problem of statements/values count mismatch - use scylla::batch::Batch; - let mut batch: Batch = Default::default(); - batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", ks)[..]); - batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')", ks)[..]); - batch.append_statement(prepared_statement.clone()); - - let four_value: i32 = 4; - let hello_value: String = String::from("hello"); - let session_clone = session.clone(); - // We're spawning to a separate task here to test that it works even in that case, because in some scenarios - // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries) - // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on - // some lifetime for some unknown reason), so can't be spawned on tokio. - // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details - tokio::spawn(async move { - let values = ( - (1_i32, 2_i32, "abc"), - (), - (1_i32, &four_value, hello_value.as_str()), - ); - session_clone.batch(&batch, values).await.unwrap(); - }) - .await - .unwrap(); - - let mut results: Vec<(i32, i32, String)> = session - .query_unpaged(format!("SELECT a, b, c FROM {}.t_batch", ks), &[]) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, String)>() - .unwrap() - .collect::>() - .unwrap(); - - results.sort(); - assert_eq!( - results, - vec![ - (1, 2, String::from("abc")), - (1, 4, String::from("hello")), - (7, 11, String::from("")) - ] - ); - - // Test repreparing statement inside a batch - let mut batch: Batch = Default::default(); - batch.append_statement(prepared_statement); - let values = ((4_i32, 20_i32, "foobar"),); - - // This statement flushes the prepared statement cache - session - .ddl(format!( - "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", - ks - )) - .await - .unwrap(); - session.batch(&batch, values).await.unwrap(); - - let results: Vec<(i32, i32, String)> = session - .query_unpaged( - format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks), - &[], - ) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, String)>() - .unwrap() - .collect::>() - .unwrap(); - - assert_eq!(results, vec![(4, 20, String::from("foobar"))]); -} - -// This is a regression test for #1134. -#[tokio::test] -async fn test_batch_to_multiple_tables() { - setup_tracing(); - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session.use_keyspace(&ks, true).await.unwrap(); - session - .ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))") - .await - .unwrap(); - session - .ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))") - .await - .unwrap(); - - let prepared_statement = session - .prepare( - " - BEGIN BATCH - INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?); - INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?); - APPLY BATCH; - ", - ) - .await - .unwrap(); - - session - .execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma")) - .await - .unwrap(); -} - #[tokio::test] async fn test_token_awareness() { setup_tracing(); @@ -2358,133 +2164,6 @@ async fn test_rate_limit_exceeded_exception() { } } -// Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied. -#[tokio::test] -async fn test_batch_lwts() { - let session = create_new_session_builder().build().await.unwrap(); - - let ks = unique_keyspace_name(); - let mut create_ks = format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}", ks); - if scylla_supports_tablets(&session).await { - create_ks += " and TABLETS = { 'enabled': false}"; - } - session.ddl(create_ks).await.unwrap(); - session.use_keyspace(ks.clone(), false).await.unwrap(); - - session - .ddl("CREATE TABLE tab (p1 int, c1 int, r1 int, r2 int, primary key (p1, c1))") - .await - .unwrap(); - - session - .query_unpaged("INSERT INTO tab (p1, c1, r1, r2) VALUES (0, 0, 0, 0)", ()) - .await - .unwrap(); - - let mut batch: Batch = Batch::default(); - batch.append_statement("UPDATE tab SET r2 = 1 WHERE p1 = 0 AND c1 = 0 IF r1 = 0"); - batch.append_statement("INSERT INTO tab (p1, c1, r1, r2) VALUES (0, 123, 321, 312)"); - batch.append_statement("UPDATE tab SET r1 = 1 WHERE p1 = 0 AND c1 = 0 IF r2 = 0"); - - let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); - let batch_deserializer = batch_res.into_rows_result().unwrap(); - - // Scylla returns 5 columns, but Cassandra returns only 1 - let is_scylla: bool = batch_deserializer.column_specs().len() == 5; - - if is_scylla { - test_batch_lwts_for_scylla(&session, &batch, &batch_deserializer).await; - } else { - test_batch_lwts_for_cassandra(&session, &batch, &batch_deserializer).await; - } -} - -async fn test_batch_lwts_for_scylla( - session: &Session, - batch: &Batch, - query_rows_result: &QueryRowsResult, -) { - // Alias required by clippy - type IntOrNull = Option; - - // Returned columns are: - // [applied], p1, c1, r1, r2 - let batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = query_rows_result - .rows() - .unwrap() - .collect::>() - .unwrap(); - - let expected_batch_res_rows = vec![ - (true, Some(0), Some(0), Some(0), Some(0)), - (true, None, None, None, None), - (true, Some(0), Some(0), Some(0), Some(0)), - ]; - - assert_eq!(batch_res_rows, expected_batch_res_rows); - - let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); - let prepared_batch_res: QueryResult = - session.batch(&prepared_batch, ((), (), ())).await.unwrap(); - - let prepared_batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = - prepared_batch_res - .into_rows_result() - .unwrap() - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_prepared_batch_res_rows = vec![ - (false, Some(0), Some(0), Some(1), Some(1)), - (false, None, None, None, None), - (false, Some(0), Some(0), Some(1), Some(1)), - ]; - - assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); -} - -async fn test_batch_lwts_for_cassandra( - session: &Session, - batch: &Batch, - query_rows_result: &QueryRowsResult, -) { - // Alias required by clippy - type IntOrNull = Option; - - // Returned columns are: - // [applied] - let batch_res_rows: Vec<(bool,)> = query_rows_result - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_batch_res_rows = vec![(true,)]; - - assert_eq!(batch_res_rows, expected_batch_res_rows); - - let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); - let prepared_batch_res: QueryResult = - session.batch(&prepared_batch, ((), (), ())).await.unwrap(); - - // Returned columns are: - // [applied], p1, c1, r1, r2 - let prepared_batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = - prepared_batch_res - .into_rows_result() - .unwrap() - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_prepared_batch_res_rows = vec![(false, Some(0), Some(0), Some(1), Some(1))]; - - assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); -} - #[tokio::test] async fn test_keyspaces_to_fetch() { let ks1 = unique_keyspace_name();