Skip to content

Commit 7d6dc0b

Browse files
committed
tests: move batch statements tests to batch.rs module
New batch statement test cases are moved to exising batch.rs test suite. Additionally, the test cases for LWT and counter tables are unskipped and adjusted to use keyspace with tablets disabled. This allows executing the underlying test scenarios even though LWT and counters are not yet supported with tablets.
1 parent 0493ac5 commit 7d6dc0b

File tree

3 files changed

+364
-390
lines changed

3 files changed

+364
-390
lines changed

scylla/tests/integration/batch.rs

Lines changed: 364 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,82 @@
11
use scylla::batch::Batch;
22
use scylla::batch::BatchType;
3-
use scylla::errors::{ExecutionError, RequestAttemptError};
3+
use scylla::client::session::Session;
4+
use scylla::errors::{DbError, ExecutionError, RequestAttemptError};
45
use scylla::frame::frame_errors::BatchSerializationError;
56
use scylla::frame::frame_errors::CqlRequestSerializationError;
7+
use scylla::prepared_statement::PreparedStatement;
68
use scylla::query::Query;
9+
use scylla::value::{Counter, CqlValue, MaybeUnset};
10+
use std::collections::HashMap;
11+
use std::string::String;
712

8-
use crate::utils::create_new_session_builder;
913
use crate::utils::setup_tracing;
1014
use crate::utils::unique_keyspace_name;
1115
use crate::utils::PerformDDL;
16+
use crate::utils::{create_new_session_builder, scylla_supports_tablets};
1217

1318
use assert_matches::assert_matches;
1419

20+
const BATCH_COUNT: usize = 100;
21+
22+
async fn create_test_session(table_name: &str, supports_tablets: bool) -> Session {
23+
let session = create_new_session_builder().build().await.unwrap();
24+
let ks = unique_keyspace_name();
25+
26+
let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks);
27+
28+
if !supports_tablets && scylla_supports_tablets(&session).await {
29+
create_ks += " AND TABLETS = {'enabled': false}"
30+
}
31+
session.ddl(create_ks).await.unwrap();
32+
session.use_keyspace(&ks, false).await.unwrap();
33+
session
34+
.ddl(format!(
35+
"CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))",
36+
table_name
37+
))
38+
.await
39+
.unwrap();
40+
session
41+
}
42+
43+
async fn create_counter_tables(session: &Session) {
44+
for &table in ["counter1", "counter2", "counter3"].iter() {
45+
session
46+
.ddl(format!(
47+
"CREATE TABLE {} (k0 text PRIMARY KEY, c counter)",
48+
table
49+
))
50+
.await
51+
.unwrap();
52+
}
53+
}
54+
55+
async fn verify_batch_insert(session: &Session, test_name: &str, count: usize) {
56+
let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", test_name);
57+
let query_result = session
58+
.query_unpaged(select_query, (test_name,))
59+
.await
60+
.unwrap()
61+
.into_rows_result()
62+
.unwrap();
63+
let rows: Vec<(String, i32, i32)> = query_result
64+
.rows::<(String, i32, i32)>()
65+
.unwrap()
66+
.map(|r| r.unwrap())
67+
.collect();
68+
assert_eq!(rows.len(), count);
69+
for (k0, k1, v) in rows {
70+
assert_eq!(k0, test_name);
71+
assert_eq!(v, k1 + 1);
72+
}
73+
}
74+
75+
async fn prepare_insert_statement(session: &Session, table: &str) -> PreparedStatement {
76+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", table);
77+
session.prepare(Query::new(query_str)).await.unwrap()
78+
}
79+
1580
#[tokio::test]
1681
#[ntest::timeout(60000)]
1782
async fn batch_statements_and_values_mismatch_detected() {
@@ -76,3 +141,300 @@ async fn batch_statements_and_values_mismatch_detected() {
76141
)
77142
}
78143
}
144+
145+
#[tokio::test]
146+
async fn test_batch_of_simple_statements() {
147+
setup_tracing();
148+
let test_name = String::from("test_batch_simple_statements");
149+
let session = create_test_session(&test_name, true).await;
150+
151+
let mut batch = Batch::new(BatchType::Unlogged);
152+
for i in 0..BATCH_COUNT {
153+
let simple_statement = Query::new(format!(
154+
"INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})",
155+
&test_name,
156+
&test_name,
157+
i,
158+
i + 1
159+
));
160+
batch.append_statement(simple_statement);
161+
}
162+
session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap();
163+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
164+
}
165+
166+
#[tokio::test]
167+
async fn test_batch_of_prepared_statements() {
168+
setup_tracing();
169+
let test_name = String::from("test_batch_prepared_statements");
170+
let session = create_test_session(&test_name, true).await;
171+
172+
let prepared = prepare_insert_statement(&session, &test_name).await;
173+
let mut batch = Batch::new(BatchType::Unlogged);
174+
let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT);
175+
for i in 0..BATCH_COUNT as i32 {
176+
batch.append_statement(prepared.clone());
177+
batch_values.push((test_name.as_str(), i, i + 1));
178+
}
179+
session.batch(&batch, batch_values).await.unwrap();
180+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
181+
}
182+
183+
#[tokio::test]
184+
async fn test_prepared_batch() {
185+
setup_tracing();
186+
let test_name = String::from("test_prepared_batch");
187+
let session = create_test_session(&test_name, true).await;
188+
189+
let mut batch = Batch::new(BatchType::Unlogged);
190+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
191+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name);
192+
for i in 0..BATCH_COUNT as i32 {
193+
batch.append_statement(Query::new(query_str.clone()));
194+
batch_values.push((&test_name, i, i + 1));
195+
}
196+
let prepared_batch = session.prepare_batch(&batch).await.unwrap();
197+
session.batch(&prepared_batch, batch_values).await.unwrap();
198+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
199+
}
200+
201+
#[tokio::test]
202+
async fn test_batch_of_prepared_statements_with_unset_values() {
203+
setup_tracing();
204+
let test_name = String::from("test_batch_prepared_statements_with_unset_values");
205+
let session = create_test_session(&test_name, true).await;
206+
207+
let prepared = prepare_insert_statement(&session, &test_name).await;
208+
let mut batch1 = Batch::new(BatchType::Unlogged);
209+
let mut batch1_values = Vec::with_capacity(BATCH_COUNT);
210+
for i in 0..BATCH_COUNT as i32 {
211+
batch1.append_statement(prepared.clone());
212+
batch1_values.push((test_name.as_str(), i, i + 1));
213+
}
214+
session.batch(&batch1, batch1_values).await.unwrap();
215+
216+
// Update v to (k1 + 2), but for every 20th row leave v unset.
217+
let mut batch2 = Batch::new(BatchType::Unlogged);
218+
let mut batch2_values = Vec::with_capacity(BATCH_COUNT);
219+
for i in 0..BATCH_COUNT as i32 {
220+
batch2.append_statement(prepared.clone());
221+
if i % 20 == 0 {
222+
batch2_values.push((
223+
MaybeUnset::Set(&test_name),
224+
MaybeUnset::Set(i),
225+
MaybeUnset::Unset,
226+
));
227+
} else {
228+
batch2_values.push((
229+
MaybeUnset::Set(&test_name),
230+
MaybeUnset::Set(i),
231+
MaybeUnset::Set(i + 2),
232+
));
233+
}
234+
}
235+
session.batch(&batch2, batch2_values).await.unwrap();
236+
237+
// Verify that rows with k1 % 20 == 0 retain the original value.
238+
let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", &test_name);
239+
let query_result = session
240+
.query_unpaged(select_query, (&test_name,))
241+
.await
242+
.unwrap()
243+
.into_rows_result()
244+
.unwrap();
245+
let rows: Vec<(String, i32, i32)> = query_result
246+
.rows::<(String, i32, i32)>()
247+
.unwrap()
248+
.map(|r| r.unwrap())
249+
.collect();
250+
assert_eq!(
251+
rows.len(),
252+
BATCH_COUNT,
253+
"Expected {} rows, got {}",
254+
BATCH_COUNT,
255+
rows.len()
256+
);
257+
for (k0, k1, v) in rows {
258+
assert_eq!(k0, test_name);
259+
assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 });
260+
}
261+
}
262+
263+
#[tokio::test]
264+
async fn test_batch_of_prepared_statements_named_variables() {
265+
setup_tracing();
266+
let test_name = String::from("test_batch_prepared_statements_named_variables");
267+
let session = create_test_session(&test_name, true).await;
268+
269+
let query_str = format!(
270+
"INSERT INTO {} (k0, k1, v) VALUES (:k0, :k1, :v)",
271+
&test_name
272+
);
273+
let prepared = session.prepare(query_str).await.unwrap();
274+
275+
let mut batch = Batch::new(BatchType::Unlogged);
276+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
277+
for i in 0..BATCH_COUNT as i32 {
278+
batch.append_statement(prepared.clone());
279+
let mut values = HashMap::new();
280+
values.insert("k0", CqlValue::Text(test_name.clone()));
281+
values.insert("k1", CqlValue::Int(i));
282+
values.insert("v", CqlValue::Int(i + 1));
283+
batch_values.push(values);
284+
}
285+
session.batch(&batch, batch_values).await.unwrap();
286+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
287+
}
288+
289+
#[tokio::test]
290+
async fn test_batch_of_mixed_prepared_and_simple_statements() {
291+
setup_tracing();
292+
let test_name = String::from("test_batch_mixed_prepared_and_simple_statements");
293+
let session = create_test_session(&test_name, true).await;
294+
295+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name);
296+
let prepared_bound = session
297+
.prepare(Query::new(query_str.clone()))
298+
.await
299+
.unwrap();
300+
301+
let mut batch = Batch::new(BatchType::Unlogged);
302+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
303+
for i in 0..BATCH_COUNT as i32 {
304+
if i % 2 == 1 {
305+
let simple_statement = Query::new(format!(
306+
"INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})",
307+
&test_name,
308+
&test_name,
309+
i,
310+
i + 1
311+
));
312+
batch.append_statement(simple_statement);
313+
batch_values.push(vec![]);
314+
} else {
315+
batch.append_statement(prepared_bound.clone());
316+
batch_values.push(vec![
317+
CqlValue::Text(test_name.clone()),
318+
CqlValue::Int(i),
319+
CqlValue::Int(i + 1),
320+
]);
321+
}
322+
}
323+
session.batch(&batch, batch_values).await.unwrap();
324+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
325+
}
326+
327+
#[tokio::test]
328+
async fn test_cas_batch() {
329+
setup_tracing();
330+
let test_name = String::from("test_cas_batch");
331+
let session = create_test_session(&test_name, false).await;
332+
333+
let query_str = format!(
334+
"INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS",
335+
&test_name
336+
);
337+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
338+
let mut batch = Batch::new(BatchType::Unlogged);
339+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
340+
for i in 0..BATCH_COUNT as i32 {
341+
batch.append_statement(prepared.clone());
342+
batch_values.push((&test_name, i, i + 1));
343+
}
344+
let result = session.batch(&batch, batch_values.clone()).await.unwrap();
345+
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
346+
result.into_rows_result().unwrap().first_row().unwrap();
347+
assert!(row.0, "First CAS batch should be applied");
348+
349+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
350+
351+
let result2 = session.batch(&batch, batch_values).await.unwrap();
352+
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
353+
result2.into_rows_result().unwrap().first_row().unwrap();
354+
assert!(!row.0, "Second CAS batch should not be applied");
355+
}
356+
357+
#[tokio::test]
358+
async fn test_counter_batch() {
359+
setup_tracing();
360+
let test_name = String::from("test_counter_batch");
361+
let session = create_test_session(&test_name, false).await;
362+
create_counter_tables(&session).await;
363+
364+
let mut batch = Batch::new(BatchType::Counter);
365+
let mut batch_values = Vec::with_capacity(3);
366+
for i in 1..=3 {
367+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
368+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
369+
batch.append_statement(prepared);
370+
batch_values.push((Counter(i), &test_name));
371+
}
372+
session.batch(&batch, batch_values).await.unwrap();
373+
374+
for i in 1..=3 {
375+
let query_str = format!("SELECT c FROM counter{} WHERE k0 = ?", i);
376+
let query_result = session
377+
.query_unpaged(query_str, (&test_name,))
378+
.await
379+
.unwrap()
380+
.into_rows_result()
381+
.unwrap();
382+
let row = query_result.single_row::<(Counter,)>().unwrap();
383+
let (c,) = row;
384+
assert_eq!(c, Counter(i));
385+
}
386+
}
387+
388+
#[tokio::test]
389+
async fn test_fail_logged_batch_with_counter_increment() {
390+
setup_tracing();
391+
let test_name = String::from("test_fail_logged_batch");
392+
let session = create_test_session(&test_name, false).await;
393+
create_counter_tables(&session).await;
394+
395+
let mut batch = Batch::new(BatchType::Logged);
396+
let mut batch_values: Vec<_> = Vec::with_capacity(3);
397+
for i in 1..=3 {
398+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
399+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
400+
batch.append_statement(prepared);
401+
batch_values.push((Counter(i), &test_name));
402+
}
403+
let err = session.batch(&batch, batch_values).await.unwrap_err();
404+
assert_matches!(
405+
err,
406+
ExecutionError::LastAttemptError(RequestAttemptError::DbError(DbError::Invalid, _)),
407+
"Expected an Invalid DbError when using counter statements in a LOGGED batch"
408+
);
409+
}
410+
411+
#[tokio::test]
412+
async fn test_fail_counter_batch_with_non_counter_increment() {
413+
setup_tracing();
414+
let test_name = String::from("test_fail_counter_batch");
415+
let session = create_test_session(&test_name, false).await;
416+
create_counter_tables(&session).await;
417+
418+
let mut batch = Batch::new(BatchType::Counter);
419+
let mut batch_values: Vec<Vec<CqlValue>> = Vec::new();
420+
for i in 1..=3 {
421+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
422+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
423+
batch.append_statement(prepared);
424+
batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]);
425+
}
426+
427+
let prepared = prepare_insert_statement(&session, &test_name).await;
428+
batch.append_statement(prepared);
429+
batch_values.push(vec![
430+
CqlValue::Text(test_name.clone()),
431+
CqlValue::Int(0),
432+
CqlValue::Int(1),
433+
]);
434+
let err = session.batch(&batch, batch_values).await.unwrap_err();
435+
assert_matches!(
436+
err,
437+
ExecutionError::BadQuery(_),
438+
"Expected a BadQuery error when including a non-counter statement in a COUNTER batch"
439+
);
440+
}

0 commit comments

Comments
 (0)