From 0493ac516f357eb35f696120f3e82f62e27f10bc Mon Sep 17 00:00:00 2001 From: Dmitriy Kruglov Date: Tue, 11 Feb 2025 13:43:36 +0100 Subject: [PATCH 1/3] tests: add integration tests for batch statements --- scylla/tests/integration/batch_statements.rs | 387 +++++++++++++++++++ scylla/tests/integration/main.rs | 1 + 2 files changed, 388 insertions(+) create mode 100644 scylla/tests/integration/batch_statements.rs diff --git a/scylla/tests/integration/batch_statements.rs b/scylla/tests/integration/batch_statements.rs new file mode 100644 index 0000000000..4403bdb7fc --- /dev/null +++ b/scylla/tests/integration/batch_statements.rs @@ -0,0 +1,387 @@ +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; +use assert_matches::assert_matches; +use scylla::batch::{Batch, BatchType}; +use scylla::client::session::Session; +use scylla::errors::ExecutionError; +use scylla::prepared_statement::PreparedStatement; +use scylla::query::Query; +use scylla::value::{CqlValue, MaybeUnset}; +use std::collections::HashMap; +use std::string::String; + +const BATCH_COUNT: usize = 100; + +async fn create_test_session(table_name: &str) -> (Session, String) { + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + session + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) + .await + .unwrap(); + session.use_keyspace(&ks, false).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))", + table_name + )) + .await + .unwrap(); + + (session, ks) +} + +async fn create_counter_tables(session: &Session, ks: &str) { + for &table in ["counter1", "counter2", "counter3"].iter() { + session + .ddl(format!( + "CREATE TABLE {}.{} (k0 text PRIMARY KEY, c counter)", + ks, table + )) + .await + .unwrap(); + } +} + +async fn verify_batch_insert(session: &Session, keyspace: &str, test_name: &str, count: usize) { + let select_query = format!( + "SELECT k0, k1, v FROM {}.{} WHERE k0 = ?", + keyspace, test_name + ); + let query_result = session + .query_unpaged(select_query, (test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!(rows.len(), count); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, k1 + 1); + } +} + +async fn prepare_insert_statement(session: &Session, ks: &str, table: &str) -> PreparedStatement { + let query_str = format!("INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", ks, table); + session.prepare(Query::new(query_str)).await.unwrap() +} + +#[tokio::test] +async fn test_batch_of_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_simple_statements"); + let (session, ks) = create_test_session(&test_name).await; + + let mut batch = Batch::new(BatchType::Unlogged); + for i in 0..BATCH_COUNT { + let simple_statement = Query::new(format!( + "INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})", + ks, + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + } + session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap(); + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_prepared_statements() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements"); + let (session, ks) = create_test_session(&test_name).await; + + let prepared = prepare_insert_statement(&session, &ks, &test_name).await; + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + batch_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_prepared_batch() { + setup_tracing(); + let test_name = String::from("test_prepared_batch"); + let (session, ks) = create_test_session(&test_name).await; + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + let query_str = format!( + "INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", + ks, &test_name + ); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(Query::new(query_str.clone())); + batch_values.push((&test_name, i, i + 1)); + } + let prepared_batch = session.prepare_batch(&batch).await.unwrap(); + session.batch(&prepared_batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_bound_statements_with_unset_values() { + setup_tracing(); + let test_name = String::from("test_batch_bound_statements_with_unset_values"); + let (session, ks) = create_test_session(&test_name).await; + + let prepared = prepare_insert_statement(&session, &ks, &test_name).await; + let mut batch1 = Batch::new(BatchType::Unlogged); + let mut batch1_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch1.append_statement(prepared.clone()); + batch1_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch1, batch1_values).await.unwrap(); + + // Update v to (k1 + 2), but for every 20th row leave v unset. + let mut batch2 = Batch::new(BatchType::Unlogged); + let mut batch2_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch2.append_statement(prepared.clone()); + if i % 20 == 0 { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Unset, + )); + } else { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Set(i + 2), + )); + } + } + session.batch(&batch2, batch2_values).await.unwrap(); + + // Verify that rows with k1 % 20 == 0 retain the original value. + let select_query = format!("SELECT k0, k1, v FROM {}.{} WHERE k0 = ?", ks, &test_name); + let query_result = session + .query_unpaged(select_query, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!( + rows.len(), + BATCH_COUNT, + "Expected {} rows, got {}", + BATCH_COUNT, + rows.len() + ); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 }); + } +} + +#[tokio::test] +async fn test_batch_of_bound_statements_named_variables() { + setup_tracing(); + let test_name = String::from("test_batch_bound_statements_named_variables"); + let (session, ks) = create_test_session(&test_name).await; + + let query_str = format!( + "INSERT INTO {}.{} (k0, k1, v) VALUES (:k0, :k1, :v)", + ks, &test_name + ); + let prepared = session.prepare(query_str).await.unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + let mut values = HashMap::new(); + values.insert("k0", CqlValue::Text(test_name.clone())); + values.insert("k1", CqlValue::Int(i)); + values.insert("v", CqlValue::Int(i + 1)); + batch_values.push(values); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_mixed_bound_and_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_mixed_bound_and_simple_statements"); + let (session, ks) = create_test_session(&test_name).await; + + let query_str = format!( + "INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", + ks, &test_name + ); + let prepared_bound = session + .prepare(Query::new(query_str.clone())) + .await + .unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + if i % 2 == 1 { + let simple_statement = Query::new(format!( + "INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})", + ks, + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + batch_values.push(vec![]); + } else { + batch.append_statement(prepared_bound.clone()); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(i), + CqlValue::Int(i + 1), + ]); + } + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; +} + +/// TODO: Remove #[ignore] once LWTs are supported with tablets. +#[tokio::test] +#[ignore] +async fn test_cas_batch() { + setup_tracing(); + let test_name = String::from("test_cas_batch"); + let (session, ks) = create_test_session(&test_name).await; + + let prepared = prepare_insert_statement(&session, &ks, &test_name).await; + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + batch_values.push((&test_name, i, i + 1)); + } + let result = session.batch(&batch, batch_values.clone()).await.unwrap(); + let (applied,): (bool,) = result + .into_rows_result() + .unwrap() + .first_row::<(bool,)>() + .unwrap(); + assert!(applied, "First CAS batch should be applied"); + + verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; + + let result2 = session.batch(&batch, batch_values).await.unwrap(); + let (applied2,): (bool,) = result2 + .into_rows_result() + .unwrap() + .first_row::<(bool,)>() + .unwrap(); + assert!(applied2, "Second CAS batch should not be applied"); +} + +/// TODO: Remove #[ignore] once counters are supported with tablets. +#[tokio::test] +#[ignore] +async fn test_counter_batch() { + setup_tracing(); + let test_name = String::from("test_counter_batch"); + let (session, ks) = create_test_session(&test_name).await; + create_counter_tables(&session, &ks).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((i, &test_name)); + } + session.batch(&batch, batch_values).await.unwrap(); + + for i in 1..=3 { + let query_str = format!("SELECT c FROM {}.counter{} WHERE k0 = ?", ks, i); + let query_result = session + .query_unpaged(query_str, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let row = query_result.single_row::<(i64,)>().unwrap(); + let (c,) = row; + assert_eq!(c, i as i64); + } +} + +/// TODO: Remove #[ignore] once counters are supported with tablets. +#[tokio::test] +#[ignore] +async fn test_fail_logged_batch_with_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_logged_batch"); + let (session, ks) = create_test_session(&test_name).await; + create_counter_tables(&session, &ks).await; + + let mut batch = Batch::new(BatchType::Logged); + let mut batch_values: Vec<_> = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((i, &test_name)); + } + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::BadQuery(_), + "Expected a BadQuery error when using counter statements in a LOGGED batch" + ); +} + +/// TODO: Remove #[ignore] once counters are supported with tablets. +#[tokio::test] +#[ignore] +async fn test_fail_counter_batch_with_non_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_counter_batch"); + let (session, ks) = create_test_session(&test_name).await; + create_counter_tables(&session, &ks).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values: Vec> = Vec::new(); + for i in 1..=3 { + let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]); + } + + let prepared = prepare_insert_statement(&session, &ks, &test_name).await; + batch.append_statement(prepared); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(0), + CqlValue::Int(1), + ]); + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::BadQuery(_), + "Expected a BadQuery error when including a non-counter statement in a COUNTER batch" + ); +} diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index a87649ef48..a67a870dbc 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -1,4 +1,5 @@ mod batch; +mod batch_statements; mod consistency; mod cql_collections; mod cql_types; From 7d6dc0b2c9454ed3a0b3c3104eb00934dae7de83 Mon Sep 17 00:00:00 2001 From: Dmitriy Kruglov Date: Tue, 11 Feb 2025 22:38:47 +0100 Subject: [PATCH 2/3] tests: move batch statements tests to batch.rs module New batch statement test cases are moved to exising batch.rs test suite. Additionally, the test cases for LWT and counter tables are unskipped and adjusted to use keyspace with tablets disabled. This allows executing the underlying test scenarios even though LWT and counters are not yet supported with tablets. --- scylla/tests/integration/batch.rs | 366 +++++++++++++++++- scylla/tests/integration/batch_statements.rs | 387 ------------------- scylla/tests/integration/main.rs | 1 - 3 files changed, 364 insertions(+), 390 deletions(-) delete mode 100644 scylla/tests/integration/batch_statements.rs diff --git a/scylla/tests/integration/batch.rs b/scylla/tests/integration/batch.rs index 29868e7160..b6a017d17a 100644 --- a/scylla/tests/integration/batch.rs +++ b/scylla/tests/integration/batch.rs @@ -1,17 +1,82 @@ use scylla::batch::Batch; use scylla::batch::BatchType; -use scylla::errors::{ExecutionError, RequestAttemptError}; +use scylla::client::session::Session; +use scylla::errors::{DbError, ExecutionError, RequestAttemptError}; use scylla::frame::frame_errors::BatchSerializationError; use scylla::frame::frame_errors::CqlRequestSerializationError; +use scylla::prepared_statement::PreparedStatement; use scylla::query::Query; +use scylla::value::{Counter, CqlValue, MaybeUnset}; +use std::collections::HashMap; +use std::string::String; -use crate::utils::create_new_session_builder; use crate::utils::setup_tracing; use crate::utils::unique_keyspace_name; use crate::utils::PerformDDL; +use crate::utils::{create_new_session_builder, scylla_supports_tablets}; use assert_matches::assert_matches; +const BATCH_COUNT: usize = 100; + +async fn create_test_session(table_name: &str, supports_tablets: bool) -> Session { + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); + + if !supports_tablets && scylla_supports_tablets(&session).await { + create_ks += " AND TABLETS = {'enabled': false}" + } + session.ddl(create_ks).await.unwrap(); + session.use_keyspace(&ks, false).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))", + table_name + )) + .await + .unwrap(); + session +} + +async fn create_counter_tables(session: &Session) { + for &table in ["counter1", "counter2", "counter3"].iter() { + session + .ddl(format!( + "CREATE TABLE {} (k0 text PRIMARY KEY, c counter)", + table + )) + .await + .unwrap(); + } +} + +async fn verify_batch_insert(session: &Session, test_name: &str, count: usize) { + let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", test_name); + let query_result = session + .query_unpaged(select_query, (test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!(rows.len(), count); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, k1 + 1); + } +} + +async fn prepare_insert_statement(session: &Session, table: &str) -> PreparedStatement { + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", table); + session.prepare(Query::new(query_str)).await.unwrap() +} + #[tokio::test] #[ntest::timeout(60000)] async fn batch_statements_and_values_mismatch_detected() { @@ -76,3 +141,300 @@ async fn batch_statements_and_values_mismatch_detected() { ) } } + +#[tokio::test] +async fn test_batch_of_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_simple_statements"); + let session = create_test_session(&test_name, true).await; + + let mut batch = Batch::new(BatchType::Unlogged); + for i in 0..BATCH_COUNT { + let simple_statement = Query::new(format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})", + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + } + session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_prepared_statements() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements"); + let session = create_test_session(&test_name, true).await; + + let prepared = prepare_insert_statement(&session, &test_name).await; + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + batch_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_prepared_batch() { + setup_tracing(); + let test_name = String::from("test_prepared_batch"); + let session = create_test_session(&test_name, true).await; + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(Query::new(query_str.clone())); + batch_values.push((&test_name, i, i + 1)); + } + let prepared_batch = session.prepare_batch(&batch).await.unwrap(); + session.batch(&prepared_batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_prepared_statements_with_unset_values() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements_with_unset_values"); + let session = create_test_session(&test_name, true).await; + + let prepared = prepare_insert_statement(&session, &test_name).await; + let mut batch1 = Batch::new(BatchType::Unlogged); + let mut batch1_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch1.append_statement(prepared.clone()); + batch1_values.push((test_name.as_str(), i, i + 1)); + } + session.batch(&batch1, batch1_values).await.unwrap(); + + // Update v to (k1 + 2), but for every 20th row leave v unset. + let mut batch2 = Batch::new(BatchType::Unlogged); + let mut batch2_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch2.append_statement(prepared.clone()); + if i % 20 == 0 { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Unset, + )); + } else { + batch2_values.push(( + MaybeUnset::Set(&test_name), + MaybeUnset::Set(i), + MaybeUnset::Set(i + 2), + )); + } + } + session.batch(&batch2, batch2_values).await.unwrap(); + + // Verify that rows with k1 % 20 == 0 retain the original value. + let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", &test_name); + let query_result = session + .query_unpaged(select_query, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let rows: Vec<(String, i32, i32)> = query_result + .rows::<(String, i32, i32)>() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + assert_eq!( + rows.len(), + BATCH_COUNT, + "Expected {} rows, got {}", + BATCH_COUNT, + rows.len() + ); + for (k0, k1, v) in rows { + assert_eq!(k0, test_name); + assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 }); + } +} + +#[tokio::test] +async fn test_batch_of_prepared_statements_named_variables() { + setup_tracing(); + let test_name = String::from("test_batch_prepared_statements_named_variables"); + let session = create_test_session(&test_name, true).await; + + let query_str = format!( + "INSERT INTO {} (k0, k1, v) VALUES (:k0, :k1, :v)", + &test_name + ); + let prepared = session.prepare(query_str).await.unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + let mut values = HashMap::new(); + values.insert("k0", CqlValue::Text(test_name.clone())); + values.insert("k1", CqlValue::Int(i)); + values.insert("v", CqlValue::Int(i + 1)); + batch_values.push(values); + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_batch_of_mixed_prepared_and_simple_statements() { + setup_tracing(); + let test_name = String::from("test_batch_mixed_prepared_and_simple_statements"); + let session = create_test_session(&test_name, true).await; + + let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name); + let prepared_bound = session + .prepare(Query::new(query_str.clone())) + .await + .unwrap(); + + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + if i % 2 == 1 { + let simple_statement = Query::new(format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})", + &test_name, + &test_name, + i, + i + 1 + )); + batch.append_statement(simple_statement); + batch_values.push(vec![]); + } else { + batch.append_statement(prepared_bound.clone()); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(i), + CqlValue::Int(i + 1), + ]); + } + } + session.batch(&batch, batch_values).await.unwrap(); + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +} + +#[tokio::test] +async fn test_cas_batch() { + setup_tracing(); + let test_name = String::from("test_cas_batch"); + let session = create_test_session(&test_name, false).await; + + let query_str = format!( + "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS", + &test_name + ); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + let mut batch = Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::with_capacity(BATCH_COUNT); + for i in 0..BATCH_COUNT as i32 { + batch.append_statement(prepared.clone()); + batch_values.push((&test_name, i, i + 1)); + } + let result = session.batch(&batch, batch_values.clone()).await.unwrap(); + let row: (bool, Option, Option, Option) = + result.into_rows_result().unwrap().first_row().unwrap(); + assert!(row.0, "First CAS batch should be applied"); + + verify_batch_insert(&session, &test_name, BATCH_COUNT).await; + + let result2 = session.batch(&batch, batch_values).await.unwrap(); + let row: (bool, Option, Option, Option) = + result2.into_rows_result().unwrap().first_row().unwrap(); + assert!(!row.0, "Second CAS batch should not be applied"); +} + +#[tokio::test] +async fn test_counter_batch() { + setup_tracing(); + let test_name = String::from("test_counter_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((Counter(i), &test_name)); + } + session.batch(&batch, batch_values).await.unwrap(); + + for i in 1..=3 { + let query_str = format!("SELECT c FROM counter{} WHERE k0 = ?", i); + let query_result = session + .query_unpaged(query_str, (&test_name,)) + .await + .unwrap() + .into_rows_result() + .unwrap(); + let row = query_result.single_row::<(Counter,)>().unwrap(); + let (c,) = row; + assert_eq!(c, Counter(i)); + } +} + +#[tokio::test] +async fn test_fail_logged_batch_with_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_logged_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Logged); + let mut batch_values: Vec<_> = Vec::with_capacity(3); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push((Counter(i), &test_name)); + } + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::LastAttemptError(RequestAttemptError::DbError(DbError::Invalid, _)), + "Expected an Invalid DbError when using counter statements in a LOGGED batch" + ); +} + +#[tokio::test] +async fn test_fail_counter_batch_with_non_counter_increment() { + setup_tracing(); + let test_name = String::from("test_fail_counter_batch"); + let session = create_test_session(&test_name, false).await; + create_counter_tables(&session).await; + + let mut batch = Batch::new(BatchType::Counter); + let mut batch_values: Vec> = Vec::new(); + for i in 1..=3 { + let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i); + let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + batch.append_statement(prepared); + batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]); + } + + let prepared = prepare_insert_statement(&session, &test_name).await; + batch.append_statement(prepared); + batch_values.push(vec![ + CqlValue::Text(test_name.clone()), + CqlValue::Int(0), + CqlValue::Int(1), + ]); + let err = session.batch(&batch, batch_values).await.unwrap_err(); + assert_matches!( + err, + ExecutionError::BadQuery(_), + "Expected a BadQuery error when including a non-counter statement in a COUNTER batch" + ); +} diff --git a/scylla/tests/integration/batch_statements.rs b/scylla/tests/integration/batch_statements.rs deleted file mode 100644 index 4403bdb7fc..0000000000 --- a/scylla/tests/integration/batch_statements.rs +++ /dev/null @@ -1,387 +0,0 @@ -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; -use assert_matches::assert_matches; -use scylla::batch::{Batch, BatchType}; -use scylla::client::session::Session; -use scylla::errors::ExecutionError; -use scylla::prepared_statement::PreparedStatement; -use scylla::query::Query; -use scylla::value::{CqlValue, MaybeUnset}; -use std::collections::HashMap; -use std::string::String; - -const BATCH_COUNT: usize = 100; - -async fn create_test_session(table_name: &str) -> (Session, String) { - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - session - .ddl(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) - .await - .unwrap(); - session.use_keyspace(&ks, false).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))", - table_name - )) - .await - .unwrap(); - - (session, ks) -} - -async fn create_counter_tables(session: &Session, ks: &str) { - for &table in ["counter1", "counter2", "counter3"].iter() { - session - .ddl(format!( - "CREATE TABLE {}.{} (k0 text PRIMARY KEY, c counter)", - ks, table - )) - .await - .unwrap(); - } -} - -async fn verify_batch_insert(session: &Session, keyspace: &str, test_name: &str, count: usize) { - let select_query = format!( - "SELECT k0, k1, v FROM {}.{} WHERE k0 = ?", - keyspace, test_name - ); - let query_result = session - .query_unpaged(select_query, (test_name,)) - .await - .unwrap() - .into_rows_result() - .unwrap(); - let rows: Vec<(String, i32, i32)> = query_result - .rows::<(String, i32, i32)>() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - assert_eq!(rows.len(), count); - for (k0, k1, v) in rows { - assert_eq!(k0, test_name); - assert_eq!(v, k1 + 1); - } -} - -async fn prepare_insert_statement(session: &Session, ks: &str, table: &str) -> PreparedStatement { - let query_str = format!("INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", ks, table); - session.prepare(Query::new(query_str)).await.unwrap() -} - -#[tokio::test] -async fn test_batch_of_simple_statements() { - setup_tracing(); - let test_name = String::from("test_batch_simple_statements"); - let (session, ks) = create_test_session(&test_name).await; - - let mut batch = Batch::new(BatchType::Unlogged); - for i in 0..BATCH_COUNT { - let simple_statement = Query::new(format!( - "INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})", - ks, - &test_name, - &test_name, - i, - i + 1 - )); - batch.append_statement(simple_statement); - } - session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap(); - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; -} - -#[tokio::test] -async fn test_batch_of_prepared_statements() { - setup_tracing(); - let test_name = String::from("test_batch_prepared_statements"); - let (session, ks) = create_test_session(&test_name).await; - - let prepared = prepare_insert_statement(&session, &ks, &test_name).await; - let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch.append_statement(prepared.clone()); - batch_values.push((test_name.as_str(), i, i + 1)); - } - session.batch(&batch, batch_values).await.unwrap(); - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; -} - -#[tokio::test] -async fn test_prepared_batch() { - setup_tracing(); - let test_name = String::from("test_prepared_batch"); - let (session, ks) = create_test_session(&test_name).await; - - let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::with_capacity(BATCH_COUNT); - let query_str = format!( - "INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", - ks, &test_name - ); - for i in 0..BATCH_COUNT as i32 { - batch.append_statement(Query::new(query_str.clone())); - batch_values.push((&test_name, i, i + 1)); - } - let prepared_batch = session.prepare_batch(&batch).await.unwrap(); - session.batch(&prepared_batch, batch_values).await.unwrap(); - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; -} - -#[tokio::test] -async fn test_batch_of_bound_statements_with_unset_values() { - setup_tracing(); - let test_name = String::from("test_batch_bound_statements_with_unset_values"); - let (session, ks) = create_test_session(&test_name).await; - - let prepared = prepare_insert_statement(&session, &ks, &test_name).await; - let mut batch1 = Batch::new(BatchType::Unlogged); - let mut batch1_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch1.append_statement(prepared.clone()); - batch1_values.push((test_name.as_str(), i, i + 1)); - } - session.batch(&batch1, batch1_values).await.unwrap(); - - // Update v to (k1 + 2), but for every 20th row leave v unset. - let mut batch2 = Batch::new(BatchType::Unlogged); - let mut batch2_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch2.append_statement(prepared.clone()); - if i % 20 == 0 { - batch2_values.push(( - MaybeUnset::Set(&test_name), - MaybeUnset::Set(i), - MaybeUnset::Unset, - )); - } else { - batch2_values.push(( - MaybeUnset::Set(&test_name), - MaybeUnset::Set(i), - MaybeUnset::Set(i + 2), - )); - } - } - session.batch(&batch2, batch2_values).await.unwrap(); - - // Verify that rows with k1 % 20 == 0 retain the original value. - let select_query = format!("SELECT k0, k1, v FROM {}.{} WHERE k0 = ?", ks, &test_name); - let query_result = session - .query_unpaged(select_query, (&test_name,)) - .await - .unwrap() - .into_rows_result() - .unwrap(); - let rows: Vec<(String, i32, i32)> = query_result - .rows::<(String, i32, i32)>() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - assert_eq!( - rows.len(), - BATCH_COUNT, - "Expected {} rows, got {}", - BATCH_COUNT, - rows.len() - ); - for (k0, k1, v) in rows { - assert_eq!(k0, test_name); - assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 }); - } -} - -#[tokio::test] -async fn test_batch_of_bound_statements_named_variables() { - setup_tracing(); - let test_name = String::from("test_batch_bound_statements_named_variables"); - let (session, ks) = create_test_session(&test_name).await; - - let query_str = format!( - "INSERT INTO {}.{} (k0, k1, v) VALUES (:k0, :k1, :v)", - ks, &test_name - ); - let prepared = session.prepare(query_str).await.unwrap(); - - let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch.append_statement(prepared.clone()); - let mut values = HashMap::new(); - values.insert("k0", CqlValue::Text(test_name.clone())); - values.insert("k1", CqlValue::Int(i)); - values.insert("v", CqlValue::Int(i + 1)); - batch_values.push(values); - } - session.batch(&batch, batch_values).await.unwrap(); - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; -} - -#[tokio::test] -async fn test_batch_of_mixed_bound_and_simple_statements() { - setup_tracing(); - let test_name = String::from("test_batch_mixed_bound_and_simple_statements"); - let (session, ks) = create_test_session(&test_name).await; - - let query_str = format!( - "INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", - ks, &test_name - ); - let prepared_bound = session - .prepare(Query::new(query_str.clone())) - .await - .unwrap(); - - let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - if i % 2 == 1 { - let simple_statement = Query::new(format!( - "INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})", - ks, - &test_name, - &test_name, - i, - i + 1 - )); - batch.append_statement(simple_statement); - batch_values.push(vec![]); - } else { - batch.append_statement(prepared_bound.clone()); - batch_values.push(vec![ - CqlValue::Text(test_name.clone()), - CqlValue::Int(i), - CqlValue::Int(i + 1), - ]); - } - } - session.batch(&batch, batch_values).await.unwrap(); - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; -} - -/// TODO: Remove #[ignore] once LWTs are supported with tablets. -#[tokio::test] -#[ignore] -async fn test_cas_batch() { - setup_tracing(); - let test_name = String::from("test_cas_batch"); - let (session, ks) = create_test_session(&test_name).await; - - let prepared = prepare_insert_statement(&session, &ks, &test_name).await; - let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch.append_statement(prepared.clone()); - batch_values.push((&test_name, i, i + 1)); - } - let result = session.batch(&batch, batch_values.clone()).await.unwrap(); - let (applied,): (bool,) = result - .into_rows_result() - .unwrap() - .first_row::<(bool,)>() - .unwrap(); - assert!(applied, "First CAS batch should be applied"); - - verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await; - - let result2 = session.batch(&batch, batch_values).await.unwrap(); - let (applied2,): (bool,) = result2 - .into_rows_result() - .unwrap() - .first_row::<(bool,)>() - .unwrap(); - assert!(applied2, "Second CAS batch should not be applied"); -} - -/// TODO: Remove #[ignore] once counters are supported with tablets. -#[tokio::test] -#[ignore] -async fn test_counter_batch() { - setup_tracing(); - let test_name = String::from("test_counter_batch"); - let (session, ks) = create_test_session(&test_name).await; - create_counter_tables(&session, &ks).await; - - let mut batch = Batch::new(BatchType::Counter); - let mut batch_values = Vec::with_capacity(3); - for i in 1..=3 { - let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); - let prepared = session.prepare(Query::new(query_str)).await.unwrap(); - batch.append_statement(prepared); - batch_values.push((i, &test_name)); - } - session.batch(&batch, batch_values).await.unwrap(); - - for i in 1..=3 { - let query_str = format!("SELECT c FROM {}.counter{} WHERE k0 = ?", ks, i); - let query_result = session - .query_unpaged(query_str, (&test_name,)) - .await - .unwrap() - .into_rows_result() - .unwrap(); - let row = query_result.single_row::<(i64,)>().unwrap(); - let (c,) = row; - assert_eq!(c, i as i64); - } -} - -/// TODO: Remove #[ignore] once counters are supported with tablets. -#[tokio::test] -#[ignore] -async fn test_fail_logged_batch_with_counter_increment() { - setup_tracing(); - let test_name = String::from("test_fail_logged_batch"); - let (session, ks) = create_test_session(&test_name).await; - create_counter_tables(&session, &ks).await; - - let mut batch = Batch::new(BatchType::Logged); - let mut batch_values: Vec<_> = Vec::with_capacity(3); - for i in 1..=3 { - let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); - let prepared = session.prepare(Query::new(query_str)).await.unwrap(); - batch.append_statement(prepared); - batch_values.push((i, &test_name)); - } - let err = session.batch(&batch, batch_values).await.unwrap_err(); - assert_matches!( - err, - ExecutionError::BadQuery(_), - "Expected a BadQuery error when using counter statements in a LOGGED batch" - ); -} - -/// TODO: Remove #[ignore] once counters are supported with tablets. -#[tokio::test] -#[ignore] -async fn test_fail_counter_batch_with_non_counter_increment() { - setup_tracing(); - let test_name = String::from("test_fail_counter_batch"); - let (session, ks) = create_test_session(&test_name).await; - create_counter_tables(&session, &ks).await; - - let mut batch = Batch::new(BatchType::Counter); - let mut batch_values: Vec> = Vec::new(); - for i in 1..=3 { - let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i); - let prepared = session.prepare(Query::new(query_str)).await.unwrap(); - batch.append_statement(prepared); - batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]); - } - - let prepared = prepare_insert_statement(&session, &ks, &test_name).await; - batch.append_statement(prepared); - batch_values.push(vec![ - CqlValue::Text(test_name.clone()), - CqlValue::Int(0), - CqlValue::Int(1), - ]); - let err = session.batch(&batch, batch_values).await.unwrap_err(); - assert_matches!( - err, - ExecutionError::BadQuery(_), - "Expected a BadQuery error when including a non-counter statement in a COUNTER batch" - ); -} diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index a67a870dbc..a87649ef48 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -1,5 +1,4 @@ mod batch; -mod batch_statements; mod consistency; mod cql_collections; mod cql_types; From fc9def4a1908e16df077d59e08dbf8f8317a565a Mon Sep 17 00:00:00 2001 From: Dmitriy Kruglov Date: Wed, 12 Feb 2025 23:32:00 +0100 Subject: [PATCH 3/3] tests: move batch tests from session.rs to batch.rs module --- scylla/tests/integration/batch.rs | 308 +++++++++++++++++++++++--- scylla/tests/integration/session.rs | 323 +--------------------------- 2 files changed, 283 insertions(+), 348 deletions(-) diff --git a/scylla/tests/integration/batch.rs b/scylla/tests/integration/batch.rs index b6a017d17a..65e22ec516 100644 --- a/scylla/tests/integration/batch.rs +++ b/scylla/tests/integration/batch.rs @@ -1,3 +1,7 @@ +use crate::utils::setup_tracing; +use crate::utils::unique_keyspace_name; +use crate::utils::PerformDDL; +use crate::utils::{create_new_session_builder, scylla_supports_tablets}; use scylla::batch::Batch; use scylla::batch::BatchType; use scylla::client::session::Session; @@ -9,13 +13,10 @@ use scylla::query::Query; use scylla::value::{Counter, CqlValue, MaybeUnset}; use std::collections::HashMap; use std::string::String; - -use crate::utils::setup_tracing; -use crate::utils::unique_keyspace_name; -use crate::utils::PerformDDL; -use crate::utils::{create_new_session_builder, scylla_supports_tablets}; +use std::sync::Arc; use assert_matches::assert_matches; +use scylla::response::query_result::{QueryResult, QueryRowsResult}; const BATCH_COUNT: usize = 100; @@ -25,6 +26,8 @@ async fn create_test_session(table_name: &str, supports_tablets: bool) -> Sessio let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); + // Need to disable tablets in this test because they don't support counters yet. + // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e). if !supports_tablets && scylla_supports_tablets(&session).await { create_ks += " AND TABLETS = {'enabled': false}" } @@ -142,6 +145,145 @@ async fn batch_statements_and_values_mismatch_detected() { } } +#[tokio::test] +async fn test_batch() { + setup_tracing(); + let session = Arc::new(create_new_session_builder().build().await.unwrap()); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", + ks + )) + .await + .unwrap(); + + let prepared_statement = session + .prepare(format!( + "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", + ks + )) + .await + .unwrap(); + + // TODO: Add API that supports binding values to statements in batch creation process, + // to avoid problem of statements/values count mismatch + use scylla::batch::Batch; + let mut batch: Batch = Default::default(); + batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", ks)[..]); + batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')", ks)[..]); + batch.append_statement(prepared_statement.clone()); + + let four_value: i32 = 4; + let hello_value: String = String::from("hello"); + let session_clone = session.clone(); + // We're spawning to a separate task here to test that it works even in that case, because in some scenarios + // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries) + // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on + // some lifetime for some unknown reason), so can't be spawned on tokio. + // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details + tokio::spawn(async move { + let values = ( + (1_i32, 2_i32, "abc"), + (), + (1_i32, &four_value, hello_value.as_str()), + ); + session_clone.batch(&batch, values).await.unwrap(); + }) + .await + .unwrap(); + + let mut results: Vec<(i32, i32, String)> = session + .query_unpaged(format!("SELECT a, b, c FROM {}.t_batch", ks), &[]) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, String)>() + .unwrap() + .collect::>() + .unwrap(); + + results.sort(); + assert_eq!( + results, + vec![ + (1, 2, String::from("abc")), + (1, 4, String::from("hello")), + (7, 11, String::from("")) + ] + ); + + // Test repreparing statement inside a batch + let mut batch: Batch = Default::default(); + batch.append_statement(prepared_statement); + let values = ((4_i32, 20_i32, "foobar"),); + + // This statement flushes the prepared statement cache + session + .ddl(format!( + "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", + ks + )) + .await + .unwrap(); + session.batch(&batch, values).await.unwrap(); + + let results: Vec<(i32, i32, String)> = session + .query_unpaged( + format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks), + &[], + ) + .await + .unwrap() + .into_rows_result() + .unwrap() + .rows::<(i32, i32, String)>() + .unwrap() + .collect::>() + .unwrap(); + + assert_eq!(results, vec![(4, 20, String::from("foobar"))]); +} + +// This is a regression test for #1134. +#[tokio::test] +async fn test_batch_to_multiple_tables() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); + session.use_keyspace(&ks, true).await.unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))") + .await + .unwrap(); + session + .ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))") + .await + .unwrap(); + + let prepared_statement = session + .prepare( + " + BEGIN BATCH + INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?); + INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?); + APPLY BATCH; + ", + ) + .await + .unwrap(); + + session + .execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma")) + .await + .unwrap(); +} + #[tokio::test] async fn test_batch_of_simple_statements() { setup_tracing(); @@ -324,34 +466,148 @@ async fn test_batch_of_mixed_prepared_and_simple_statements() { verify_batch_insert(&session, &test_name, BATCH_COUNT).await; } +// Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied. #[tokio::test] -async fn test_cas_batch() { - setup_tracing(); - let test_name = String::from("test_cas_batch"); +async fn test_batch_lwts() { + let test_name = String::from("test_batch_lwts"); let session = create_test_session(&test_name, false).await; - let query_str = format!( - "INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS", - &test_name - ); - let prepared = session.prepare(Query::new(query_str)).await.unwrap(); + session + .query_unpaged( + format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name), + (&test_name, 0, 0), + ) + .await + .unwrap(); + let mut batch = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::with_capacity(BATCH_COUNT); - for i in 0..BATCH_COUNT as i32 { - batch.append_statement(prepared.clone()); - batch_values.push((&test_name, i, i + 1)); + batch.append_statement( + format!( + "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0", + &test_name, &test_name + ) + .as_str(), + ); + batch.append_statement( + format!( + "INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)", + &test_name, &test_name + ) + .as_str(), + ); + batch.append_statement( + format!( + "UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0", + &test_name, &test_name + ) + .as_str(), + ); + + let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); + let batch_deserializer = batch_res.into_rows_result().unwrap(); + + // Scylla always returns a result set with additional 'applied' column, which indicates whether the statement was applied. + // In contrast, Cassandra returns only 'applied' column for applied transactions; and 'applied' column + // with result set for rejected ones. + // This is described in https://opensource.docs.scylladb.com/stable/features/lwt.html and + // https://opensource.docs.scylladb.com/stable/kb/lwt-differences.html). + let is_scylla: bool = batch_deserializer.column_specs().len() == 4; + + if is_scylla { + test_batch_lwts_for_scylla(&session, &batch, &batch_deserializer, &test_name).await; + } else { + test_batch_lwts_for_cassandra(&session, &batch, &batch_deserializer, &test_name).await; } - let result = session.batch(&batch, batch_values.clone()).await.unwrap(); - let row: (bool, Option, Option, Option) = - result.into_rows_result().unwrap().first_row().unwrap(); - assert!(row.0, "First CAS batch should be applied"); +} - verify_batch_insert(&session, &test_name, BATCH_COUNT).await; +async fn test_batch_lwts_for_scylla( + session: &Session, + batch: &Batch, + query_rows_result: &QueryRowsResult, + k0_value: &str, +) { + // Alias required by clippy + type IntOrNull = Option; + type StrOrNull = Option; + + // Returned columns are: + // [applied], k0, k1, v + let batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = query_rows_result + .rows() + .unwrap() + .collect::>() + .unwrap(); + + let k0_value = k0_value.to_string(); + let expected_batch_res_rows = vec![ + (true, Some(k0_value.clone()), Some(0), Some(0)), + (true, None, None, None), + (true, Some(k0_value.clone()), Some(0), Some(0)), + ]; + + assert_eq!(batch_res_rows, expected_batch_res_rows); + + let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); + let prepared_batch_res: QueryResult = + session.batch(&prepared_batch, ((), (), ())).await.unwrap(); + + let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res + .into_rows_result() + .unwrap() + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_prepared_batch_res_rows = vec![ + (false, Some(k0_value.clone()), Some(0), Some(1)), + (false, None, None, None), + (false, Some(k0_value.clone()), Some(0), Some(1)), + ]; + + assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); +} + +async fn test_batch_lwts_for_cassandra( + session: &Session, + batch: &Batch, + query_rows_result: &QueryRowsResult, + k0_value: &str, +) { + // Alias required by clippy + type IntOrNull = Option; + type StrOrNull = Option; + + // Returned columns are: + // [applied] + let batch_res_rows: Vec<(bool,)> = query_rows_result + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_batch_res_rows = vec![(true,)]; + + assert_eq!(batch_res_rows, expected_batch_res_rows); + + let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); + let prepared_batch_res: QueryResult = + session.batch(&prepared_batch, ((), (), ())).await.unwrap(); + + // Returned columns are: + // [applied], k0, k1, v + let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res + .into_rows_result() + .unwrap() + .rows() + .unwrap() + .map(|r| r.unwrap()) + .collect(); + + let expected_prepared_batch_res_rows = + vec![(false, Some(k0_value.to_string()), Some(0), Some(1))]; - let result2 = session.batch(&batch, batch_values).await.unwrap(); - let row: (bool, Option, Option, Option) = - result2.into_rows_result().unwrap().first_row().unwrap(); - assert!(!row.0, "Second CAS batch should not be applied"); + assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); } #[tokio::test] diff --git a/scylla/tests/integration/session.rs b/scylla/tests/integration/session.rs index 6a6b4f4140..3dad603967 100644 --- a/scylla/tests/integration/session.rs +++ b/scylla/tests/integration/session.rs @@ -35,7 +35,7 @@ use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; use uuid::Uuid; -use scylla::response::query_result::{QueryResult, QueryRowsResult}; +use scylla::response::query_result::QueryResult; #[tokio::test] async fn test_connection_failure() { @@ -166,200 +166,6 @@ async fn test_unprepared_statement() { assert_eq!(results_from_manual_paging, results); } -#[tokio::test] -async fn test_counter_batch() { - use scylla::value::Counter; - use scylla_cql::frame::request::batch::BatchType; - - setup_tracing(); - let session = Arc::new(create_new_session_builder().build().await.unwrap()); - let ks = unique_keyspace_name(); - - // Need to disable tablets in this test because they don't support counters yet. - // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e). - let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); - if scylla_supports_tablets(&session).await { - create_ks += " AND TABLETS = {'enabled': false}" - } - - session.ddl(create_ks).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (key int PRIMARY KEY, value counter)", - ks - )) - .await - .unwrap(); - - let statement_str = format!("UPDATE {}.t_batch SET value = value + ? WHERE key = ?", ks); - let query = Query::from(statement_str); - let prepared = session.prepare(query.clone()).await.unwrap(); - - let mut counter_batch = Batch::new(BatchType::Counter); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - counter_batch.append_statement(query.clone()); - counter_batch.append_statement(prepared.clone()); - - // Check that we do not get a server error - the driver - // should send a COUNTER batch instead of a LOGGED (default) one. - session - .batch( - &counter_batch, - ( - (Counter(1), 1), - (Counter(2), 2), - (Counter(3), 3), - (Counter(4), 4), - (Counter(5), 5), - (Counter(6), 6), - ), - ) - .await - .unwrap(); -} - -#[tokio::test] -async fn test_batch() { - setup_tracing(); - let session = Arc::new(create_new_session_builder().build().await.unwrap()); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session - .ddl(format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", - ks - )) - .await - .unwrap(); - - let prepared_statement = session - .prepare(format!( - "INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", - ks - )) - .await - .unwrap(); - - // TODO: Add API that supports binding values to statements in batch creation process, - // to avoid problem of statements/values count mismatch - use scylla::batch::Batch; - let mut batch: Batch = Default::default(); - batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", ks)[..]); - batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')", ks)[..]); - batch.append_statement(prepared_statement.clone()); - - let four_value: i32 = 4; - let hello_value: String = String::from("hello"); - let session_clone = session.clone(); - // We're spawning to a separate task here to test that it works even in that case, because in some scenarios - // (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries) - // the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on - // some lifetime for some unknown reason), so can't be spawned on tokio. - // See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details - tokio::spawn(async move { - let values = ( - (1_i32, 2_i32, "abc"), - (), - (1_i32, &four_value, hello_value.as_str()), - ); - session_clone.batch(&batch, values).await.unwrap(); - }) - .await - .unwrap(); - - let mut results: Vec<(i32, i32, String)> = session - .query_unpaged(format!("SELECT a, b, c FROM {}.t_batch", ks), &[]) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, String)>() - .unwrap() - .collect::>() - .unwrap(); - - results.sort(); - assert_eq!( - results, - vec![ - (1, 2, String::from("abc")), - (1, 4, String::from("hello")), - (7, 11, String::from("")) - ] - ); - - // Test repreparing statement inside a batch - let mut batch: Batch = Default::default(); - batch.append_statement(prepared_statement); - let values = ((4_i32, 20_i32, "foobar"),); - - // This statement flushes the prepared statement cache - session - .ddl(format!( - "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", - ks - )) - .await - .unwrap(); - session.batch(&batch, values).await.unwrap(); - - let results: Vec<(i32, i32, String)> = session - .query_unpaged( - format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks), - &[], - ) - .await - .unwrap() - .into_rows_result() - .unwrap() - .rows::<(i32, i32, String)>() - .unwrap() - .collect::>() - .unwrap(); - - assert_eq!(results, vec![(4, 20, String::from("foobar"))]); -} - -// This is a regression test for #1134. -#[tokio::test] -async fn test_batch_to_multiple_tables() { - setup_tracing(); - let session = create_new_session_builder().build().await.unwrap(); - let ks = unique_keyspace_name(); - - session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - session.use_keyspace(&ks, true).await.unwrap(); - session - .ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))") - .await - .unwrap(); - session - .ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))") - .await - .unwrap(); - - let prepared_statement = session - .prepare( - " - BEGIN BATCH - INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?); - INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?); - APPLY BATCH; - ", - ) - .await - .unwrap(); - - session - .execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma")) - .await - .unwrap(); -} - #[tokio::test] async fn test_token_awareness() { setup_tracing(); @@ -2358,133 +2164,6 @@ async fn test_rate_limit_exceeded_exception() { } } -// Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied. -#[tokio::test] -async fn test_batch_lwts() { - let session = create_new_session_builder().build().await.unwrap(); - - let ks = unique_keyspace_name(); - let mut create_ks = format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}", ks); - if scylla_supports_tablets(&session).await { - create_ks += " and TABLETS = { 'enabled': false}"; - } - session.ddl(create_ks).await.unwrap(); - session.use_keyspace(ks.clone(), false).await.unwrap(); - - session - .ddl("CREATE TABLE tab (p1 int, c1 int, r1 int, r2 int, primary key (p1, c1))") - .await - .unwrap(); - - session - .query_unpaged("INSERT INTO tab (p1, c1, r1, r2) VALUES (0, 0, 0, 0)", ()) - .await - .unwrap(); - - let mut batch: Batch = Batch::default(); - batch.append_statement("UPDATE tab SET r2 = 1 WHERE p1 = 0 AND c1 = 0 IF r1 = 0"); - batch.append_statement("INSERT INTO tab (p1, c1, r1, r2) VALUES (0, 123, 321, 312)"); - batch.append_statement("UPDATE tab SET r1 = 1 WHERE p1 = 0 AND c1 = 0 IF r2 = 0"); - - let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); - let batch_deserializer = batch_res.into_rows_result().unwrap(); - - // Scylla returns 5 columns, but Cassandra returns only 1 - let is_scylla: bool = batch_deserializer.column_specs().len() == 5; - - if is_scylla { - test_batch_lwts_for_scylla(&session, &batch, &batch_deserializer).await; - } else { - test_batch_lwts_for_cassandra(&session, &batch, &batch_deserializer).await; - } -} - -async fn test_batch_lwts_for_scylla( - session: &Session, - batch: &Batch, - query_rows_result: &QueryRowsResult, -) { - // Alias required by clippy - type IntOrNull = Option; - - // Returned columns are: - // [applied], p1, c1, r1, r2 - let batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = query_rows_result - .rows() - .unwrap() - .collect::>() - .unwrap(); - - let expected_batch_res_rows = vec![ - (true, Some(0), Some(0), Some(0), Some(0)), - (true, None, None, None, None), - (true, Some(0), Some(0), Some(0), Some(0)), - ]; - - assert_eq!(batch_res_rows, expected_batch_res_rows); - - let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); - let prepared_batch_res: QueryResult = - session.batch(&prepared_batch, ((), (), ())).await.unwrap(); - - let prepared_batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = - prepared_batch_res - .into_rows_result() - .unwrap() - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_prepared_batch_res_rows = vec![ - (false, Some(0), Some(0), Some(1), Some(1)), - (false, None, None, None, None), - (false, Some(0), Some(0), Some(1), Some(1)), - ]; - - assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); -} - -async fn test_batch_lwts_for_cassandra( - session: &Session, - batch: &Batch, - query_rows_result: &QueryRowsResult, -) { - // Alias required by clippy - type IntOrNull = Option; - - // Returned columns are: - // [applied] - let batch_res_rows: Vec<(bool,)> = query_rows_result - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_batch_res_rows = vec![(true,)]; - - assert_eq!(batch_res_rows, expected_batch_res_rows); - - let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap(); - let prepared_batch_res: QueryResult = - session.batch(&prepared_batch, ((), (), ())).await.unwrap(); - - // Returned columns are: - // [applied], p1, c1, r1, r2 - let prepared_batch_res_rows: Vec<(bool, IntOrNull, IntOrNull, IntOrNull, IntOrNull)> = - prepared_batch_res - .into_rows_result() - .unwrap() - .rows() - .unwrap() - .map(|r| r.unwrap()) - .collect(); - - let expected_prepared_batch_res_rows = vec![(false, Some(0), Some(0), Some(1), Some(1))]; - - assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows); -} - #[tokio::test] async fn test_keyspaces_to_fetch() { let ks1 = unique_keyspace_name();