Skip to content

Commit 0493ac5

Browse files
committed
tests: add integration tests for batch statements
1 parent ee98512 commit 0493ac5

File tree

2 files changed

+388
-0
lines changed

2 files changed

+388
-0
lines changed
Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
2+
use assert_matches::assert_matches;
3+
use scylla::batch::{Batch, BatchType};
4+
use scylla::client::session::Session;
5+
use scylla::errors::ExecutionError;
6+
use scylla::prepared_statement::PreparedStatement;
7+
use scylla::query::Query;
8+
use scylla::value::{CqlValue, MaybeUnset};
9+
use std::collections::HashMap;
10+
use std::string::String;
11+
12+
const BATCH_COUNT: usize = 100;
13+
14+
async fn create_test_session(table_name: &str) -> (Session, String) {
15+
let session = create_new_session_builder().build().await.unwrap();
16+
let ks = unique_keyspace_name();
17+
session
18+
.ddl(format!(
19+
"CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks))
20+
.await
21+
.unwrap();
22+
session.use_keyspace(&ks, false).await.unwrap();
23+
session
24+
.ddl(format!(
25+
"CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))",
26+
table_name
27+
))
28+
.await
29+
.unwrap();
30+
31+
(session, ks)
32+
}
33+
34+
async fn create_counter_tables(session: &Session, ks: &str) {
35+
for &table in ["counter1", "counter2", "counter3"].iter() {
36+
session
37+
.ddl(format!(
38+
"CREATE TABLE {}.{} (k0 text PRIMARY KEY, c counter)",
39+
ks, table
40+
))
41+
.await
42+
.unwrap();
43+
}
44+
}
45+
46+
async fn verify_batch_insert(session: &Session, keyspace: &str, test_name: &str, count: usize) {
47+
let select_query = format!(
48+
"SELECT k0, k1, v FROM {}.{} WHERE k0 = ?",
49+
keyspace, test_name
50+
);
51+
let query_result = session
52+
.query_unpaged(select_query, (test_name,))
53+
.await
54+
.unwrap()
55+
.into_rows_result()
56+
.unwrap();
57+
let rows: Vec<(String, i32, i32)> = query_result
58+
.rows::<(String, i32, i32)>()
59+
.unwrap()
60+
.map(|r| r.unwrap())
61+
.collect();
62+
assert_eq!(rows.len(), count);
63+
for (k0, k1, v) in rows {
64+
assert_eq!(k0, test_name);
65+
assert_eq!(v, k1 + 1);
66+
}
67+
}
68+
69+
async fn prepare_insert_statement(session: &Session, ks: &str, table: &str) -> PreparedStatement {
70+
let query_str = format!("INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)", ks, table);
71+
session.prepare(Query::new(query_str)).await.unwrap()
72+
}
73+
74+
#[tokio::test]
75+
async fn test_batch_of_simple_statements() {
76+
setup_tracing();
77+
let test_name = String::from("test_batch_simple_statements");
78+
let (session, ks) = create_test_session(&test_name).await;
79+
80+
let mut batch = Batch::new(BatchType::Unlogged);
81+
for i in 0..BATCH_COUNT {
82+
let simple_statement = Query::new(format!(
83+
"INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})",
84+
ks,
85+
&test_name,
86+
&test_name,
87+
i,
88+
i + 1
89+
));
90+
batch.append_statement(simple_statement);
91+
}
92+
session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap();
93+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
94+
}
95+
96+
#[tokio::test]
97+
async fn test_batch_of_prepared_statements() {
98+
setup_tracing();
99+
let test_name = String::from("test_batch_prepared_statements");
100+
let (session, ks) = create_test_session(&test_name).await;
101+
102+
let prepared = prepare_insert_statement(&session, &ks, &test_name).await;
103+
let mut batch = Batch::new(BatchType::Unlogged);
104+
let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT);
105+
for i in 0..BATCH_COUNT as i32 {
106+
batch.append_statement(prepared.clone());
107+
batch_values.push((test_name.as_str(), i, i + 1));
108+
}
109+
session.batch(&batch, batch_values).await.unwrap();
110+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
111+
}
112+
113+
#[tokio::test]
114+
async fn test_prepared_batch() {
115+
setup_tracing();
116+
let test_name = String::from("test_prepared_batch");
117+
let (session, ks) = create_test_session(&test_name).await;
118+
119+
let mut batch = Batch::new(BatchType::Unlogged);
120+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
121+
let query_str = format!(
122+
"INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)",
123+
ks, &test_name
124+
);
125+
for i in 0..BATCH_COUNT as i32 {
126+
batch.append_statement(Query::new(query_str.clone()));
127+
batch_values.push((&test_name, i, i + 1));
128+
}
129+
let prepared_batch = session.prepare_batch(&batch).await.unwrap();
130+
session.batch(&prepared_batch, batch_values).await.unwrap();
131+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
132+
}
133+
134+
#[tokio::test]
135+
async fn test_batch_of_bound_statements_with_unset_values() {
136+
setup_tracing();
137+
let test_name = String::from("test_batch_bound_statements_with_unset_values");
138+
let (session, ks) = create_test_session(&test_name).await;
139+
140+
let prepared = prepare_insert_statement(&session, &ks, &test_name).await;
141+
let mut batch1 = Batch::new(BatchType::Unlogged);
142+
let mut batch1_values = Vec::with_capacity(BATCH_COUNT);
143+
for i in 0..BATCH_COUNT as i32 {
144+
batch1.append_statement(prepared.clone());
145+
batch1_values.push((test_name.as_str(), i, i + 1));
146+
}
147+
session.batch(&batch1, batch1_values).await.unwrap();
148+
149+
// Update v to (k1 + 2), but for every 20th row leave v unset.
150+
let mut batch2 = Batch::new(BatchType::Unlogged);
151+
let mut batch2_values = Vec::with_capacity(BATCH_COUNT);
152+
for i in 0..BATCH_COUNT as i32 {
153+
batch2.append_statement(prepared.clone());
154+
if i % 20 == 0 {
155+
batch2_values.push((
156+
MaybeUnset::Set(&test_name),
157+
MaybeUnset::Set(i),
158+
MaybeUnset::Unset,
159+
));
160+
} else {
161+
batch2_values.push((
162+
MaybeUnset::Set(&test_name),
163+
MaybeUnset::Set(i),
164+
MaybeUnset::Set(i + 2),
165+
));
166+
}
167+
}
168+
session.batch(&batch2, batch2_values).await.unwrap();
169+
170+
// Verify that rows with k1 % 20 == 0 retain the original value.
171+
let select_query = format!("SELECT k0, k1, v FROM {}.{} WHERE k0 = ?", ks, &test_name);
172+
let query_result = session
173+
.query_unpaged(select_query, (&test_name,))
174+
.await
175+
.unwrap()
176+
.into_rows_result()
177+
.unwrap();
178+
let rows: Vec<(String, i32, i32)> = query_result
179+
.rows::<(String, i32, i32)>()
180+
.unwrap()
181+
.map(|r| r.unwrap())
182+
.collect();
183+
assert_eq!(
184+
rows.len(),
185+
BATCH_COUNT,
186+
"Expected {} rows, got {}",
187+
BATCH_COUNT,
188+
rows.len()
189+
);
190+
for (k0, k1, v) in rows {
191+
assert_eq!(k0, test_name);
192+
assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 });
193+
}
194+
}
195+
196+
#[tokio::test]
197+
async fn test_batch_of_bound_statements_named_variables() {
198+
setup_tracing();
199+
let test_name = String::from("test_batch_bound_statements_named_variables");
200+
let (session, ks) = create_test_session(&test_name).await;
201+
202+
let query_str = format!(
203+
"INSERT INTO {}.{} (k0, k1, v) VALUES (:k0, :k1, :v)",
204+
ks, &test_name
205+
);
206+
let prepared = session.prepare(query_str).await.unwrap();
207+
208+
let mut batch = Batch::new(BatchType::Unlogged);
209+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
210+
for i in 0..BATCH_COUNT as i32 {
211+
batch.append_statement(prepared.clone());
212+
let mut values = HashMap::new();
213+
values.insert("k0", CqlValue::Text(test_name.clone()));
214+
values.insert("k1", CqlValue::Int(i));
215+
values.insert("v", CqlValue::Int(i + 1));
216+
batch_values.push(values);
217+
}
218+
session.batch(&batch, batch_values).await.unwrap();
219+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
220+
}
221+
222+
#[tokio::test]
223+
async fn test_batch_of_mixed_bound_and_simple_statements() {
224+
setup_tracing();
225+
let test_name = String::from("test_batch_mixed_bound_and_simple_statements");
226+
let (session, ks) = create_test_session(&test_name).await;
227+
228+
let query_str = format!(
229+
"INSERT INTO {}.{} (k0, k1, v) VALUES (?, ?, ?)",
230+
ks, &test_name
231+
);
232+
let prepared_bound = session
233+
.prepare(Query::new(query_str.clone()))
234+
.await
235+
.unwrap();
236+
237+
let mut batch = Batch::new(BatchType::Unlogged);
238+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
239+
for i in 0..BATCH_COUNT as i32 {
240+
if i % 2 == 1 {
241+
let simple_statement = Query::new(format!(
242+
"INSERT INTO {}.{} (k0, k1, v) VALUES ('{}', {}, {})",
243+
ks,
244+
&test_name,
245+
&test_name,
246+
i,
247+
i + 1
248+
));
249+
batch.append_statement(simple_statement);
250+
batch_values.push(vec![]);
251+
} else {
252+
batch.append_statement(prepared_bound.clone());
253+
batch_values.push(vec![
254+
CqlValue::Text(test_name.clone()),
255+
CqlValue::Int(i),
256+
CqlValue::Int(i + 1),
257+
]);
258+
}
259+
}
260+
session.batch(&batch, batch_values).await.unwrap();
261+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
262+
}
263+
264+
/// TODO: Remove #[ignore] once LWTs are supported with tablets.
265+
#[tokio::test]
266+
#[ignore]
267+
async fn test_cas_batch() {
268+
setup_tracing();
269+
let test_name = String::from("test_cas_batch");
270+
let (session, ks) = create_test_session(&test_name).await;
271+
272+
let prepared = prepare_insert_statement(&session, &ks, &test_name).await;
273+
let mut batch = Batch::new(BatchType::Unlogged);
274+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
275+
for i in 0..BATCH_COUNT as i32 {
276+
batch.append_statement(prepared.clone());
277+
batch_values.push((&test_name, i, i + 1));
278+
}
279+
let result = session.batch(&batch, batch_values.clone()).await.unwrap();
280+
let (applied,): (bool,) = result
281+
.into_rows_result()
282+
.unwrap()
283+
.first_row::<(bool,)>()
284+
.unwrap();
285+
assert!(applied, "First CAS batch should be applied");
286+
287+
verify_batch_insert(&session, &ks, &test_name, BATCH_COUNT).await;
288+
289+
let result2 = session.batch(&batch, batch_values).await.unwrap();
290+
let (applied2,): (bool,) = result2
291+
.into_rows_result()
292+
.unwrap()
293+
.first_row::<(bool,)>()
294+
.unwrap();
295+
assert!(applied2, "Second CAS batch should not be applied");
296+
}
297+
298+
/// TODO: Remove #[ignore] once counters are supported with tablets.
299+
#[tokio::test]
300+
#[ignore]
301+
async fn test_counter_batch() {
302+
setup_tracing();
303+
let test_name = String::from("test_counter_batch");
304+
let (session, ks) = create_test_session(&test_name).await;
305+
create_counter_tables(&session, &ks).await;
306+
307+
let mut batch = Batch::new(BatchType::Counter);
308+
let mut batch_values = Vec::with_capacity(3);
309+
for i in 1..=3 {
310+
let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i);
311+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
312+
batch.append_statement(prepared);
313+
batch_values.push((i, &test_name));
314+
}
315+
session.batch(&batch, batch_values).await.unwrap();
316+
317+
for i in 1..=3 {
318+
let query_str = format!("SELECT c FROM {}.counter{} WHERE k0 = ?", ks, i);
319+
let query_result = session
320+
.query_unpaged(query_str, (&test_name,))
321+
.await
322+
.unwrap()
323+
.into_rows_result()
324+
.unwrap();
325+
let row = query_result.single_row::<(i64,)>().unwrap();
326+
let (c,) = row;
327+
assert_eq!(c, i as i64);
328+
}
329+
}
330+
331+
/// TODO: Remove #[ignore] once counters are supported with tablets.
332+
#[tokio::test]
333+
#[ignore]
334+
async fn test_fail_logged_batch_with_counter_increment() {
335+
setup_tracing();
336+
let test_name = String::from("test_fail_logged_batch");
337+
let (session, ks) = create_test_session(&test_name).await;
338+
create_counter_tables(&session, &ks).await;
339+
340+
let mut batch = Batch::new(BatchType::Logged);
341+
let mut batch_values: Vec<_> = Vec::with_capacity(3);
342+
for i in 1..=3 {
343+
let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i);
344+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
345+
batch.append_statement(prepared);
346+
batch_values.push((i, &test_name));
347+
}
348+
let err = session.batch(&batch, batch_values).await.unwrap_err();
349+
assert_matches!(
350+
err,
351+
ExecutionError::BadQuery(_),
352+
"Expected a BadQuery error when using counter statements in a LOGGED batch"
353+
);
354+
}
355+
356+
/// TODO: Remove #[ignore] once counters are supported with tablets.
357+
#[tokio::test]
358+
#[ignore]
359+
async fn test_fail_counter_batch_with_non_counter_increment() {
360+
setup_tracing();
361+
let test_name = String::from("test_fail_counter_batch");
362+
let (session, ks) = create_test_session(&test_name).await;
363+
create_counter_tables(&session, &ks).await;
364+
365+
let mut batch = Batch::new(BatchType::Counter);
366+
let mut batch_values: Vec<Vec<CqlValue>> = Vec::new();
367+
for i in 1..=3 {
368+
let query_str = format!("UPDATE {}.counter{} SET c = c + ? WHERE k0 = ?", ks, i);
369+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
370+
batch.append_statement(prepared);
371+
batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]);
372+
}
373+
374+
let prepared = prepare_insert_statement(&session, &ks, &test_name).await;
375+
batch.append_statement(prepared);
376+
batch_values.push(vec![
377+
CqlValue::Text(test_name.clone()),
378+
CqlValue::Int(0),
379+
CqlValue::Int(1),
380+
]);
381+
let err = session.batch(&batch, batch_values).await.unwrap_err();
382+
assert_matches!(
383+
err,
384+
ExecutionError::BadQuery(_),
385+
"Expected a BadQuery error when including a non-counter statement in a COUNTER batch"
386+
);
387+
}

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod batch;
2+
mod batch_statements;
23
mod consistency;
34
mod cql_collections;
45
mod cql_types;

0 commit comments

Comments
 (0)