Skip to content

Commit 74be1ef

Browse files
authored
Merge pull request #1216 from sylwiaszunejko/hackathon_query_execute_iter
Add tests for {query,execute}_iter API
2 parents a86d390 + 69290df commit 74be1ef

File tree

2 files changed

+298
-0
lines changed

2 files changed

+298
-0
lines changed

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod hygiene;
1111
mod large_batch_statements;
1212
mod lwt_optimisation;
1313
mod new_session;
14+
mod query_result;
1415
mod retries;
1516
mod self_identity;
1617
mod shards;
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
use assert_matches::assert_matches;
2+
use futures::TryStreamExt;
3+
use scylla::errors::PagerExecutionError;
4+
use scylla::{
5+
batch::{Batch, BatchType},
6+
client::session::Session,
7+
query::Query,
8+
};
9+
use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};
10+
11+
use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
12+
13+
const PAGE_SIZE: i32 = 100;
14+
const ROWS_PER_PARTITION: i32 = 1000;
15+
const PARTITION_KEY1: &str = "part";
16+
const PARTITION_KEY2: &str = "part2";
17+
18+
/// Initialize a cluster with a table and insert data into two partitions.
19+
/// Returns a session and the keyspace name.
20+
///
21+
/// # Example
22+
/// ```rust
23+
/// let (session, ks) = initialize_cluster_two_partitions().await;
24+
/// ```
25+
async fn initialize_cluster_two_partitions() -> (Session, String) {
26+
setup_tracing();
27+
let session = create_new_session_builder().build().await.unwrap();
28+
29+
let ks = unique_keyspace_name();
30+
31+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
32+
session
33+
.ddl(format!(
34+
"CREATE TABLE IF NOT EXISTS {}.t (k0 text, k1 int, v int, PRIMARY KEY(k0, k1))",
35+
ks
36+
))
37+
.await
38+
.unwrap();
39+
40+
let prepared = session
41+
.prepare(format!("INSERT INTO {}.t (k0, k1, v) VALUES (?, ?, ?)", ks))
42+
.await
43+
.unwrap();
44+
45+
let mut batch_part1 = Batch::new(BatchType::Unlogged);
46+
let mut batch_part2 = Batch::new(BatchType::Unlogged);
47+
let mut batch_values1 = Vec::new();
48+
let mut batch_values2 = Vec::new();
49+
50+
for i in 0..ROWS_PER_PARTITION {
51+
batch_part1.append_statement(prepared.clone());
52+
batch_values1.push((PARTITION_KEY1, i, i));
53+
batch_part2.append_statement(prepared.clone());
54+
batch_values2.push((
55+
PARTITION_KEY2,
56+
i + ROWS_PER_PARTITION,
57+
i + ROWS_PER_PARTITION,
58+
));
59+
}
60+
61+
session.batch(&batch_part1, &batch_values1).await.unwrap();
62+
session.batch(&batch_part2, &batch_values2).await.unwrap();
63+
64+
(session, ks)
65+
}
66+
67+
#[tokio::test]
68+
async fn query_single_page_should_only_iterate_over_rows_in_current_page() {
69+
let (session, ks) = initialize_cluster_two_partitions().await;
70+
71+
let mut query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
72+
query.set_page_size(PAGE_SIZE);
73+
74+
let paging_state = PagingState::start();
75+
let (rs_manual, paging_state_response) = session
76+
.query_single_page(query, (PARTITION_KEY1,), paging_state)
77+
.await
78+
.unwrap();
79+
let page_results = rs_manual
80+
.into_rows_result()
81+
.unwrap()
82+
.rows::<(String, i32, i32)>()
83+
.unwrap()
84+
.collect::<Result<Vec<_>, _>>()
85+
.unwrap();
86+
87+
assert!(page_results.len() <= PAGE_SIZE as usize);
88+
match paging_state_response {
89+
PagingStateResponse::HasMorePages { state: _ } => {}
90+
PagingStateResponse::NoMorePages => {
91+
panic!("Expected more pages");
92+
}
93+
}
94+
}
95+
96+
#[tokio::test]
97+
async fn query_iter_should_iterate_over_all_pages_asynchronously_single_partition() {
98+
let (session, ks) = initialize_cluster_two_partitions().await;
99+
100+
let mut query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
101+
query.set_page_size(PAGE_SIZE);
102+
103+
let query_result = session.query_iter(query, (PARTITION_KEY1,)).await.unwrap();
104+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
105+
106+
let mut i = 0;
107+
108+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
109+
assert_eq!(a, PARTITION_KEY1);
110+
assert_eq!(b, i);
111+
assert_eq!(c, i);
112+
i += 1;
113+
}
114+
assert_eq!(i, ROWS_PER_PARTITION);
115+
}
116+
117+
#[tokio::test]
118+
async fn query_iter_should_iterate_over_all_pages_asynchronously_cross_partition() {
119+
let (session, ks) = initialize_cluster_two_partitions().await;
120+
121+
let mut query = Query::new(format!("SELECT * FROM {}.t", ks));
122+
query.set_page_size(PAGE_SIZE);
123+
124+
let query_result = session.query_iter(query, ()).await.unwrap();
125+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
126+
127+
let mut i = 0;
128+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
129+
if i < ROWS_PER_PARTITION {
130+
assert_eq!(a, PARTITION_KEY1);
131+
} else {
132+
assert_eq!(a, PARTITION_KEY2);
133+
}
134+
assert_eq!(b, i);
135+
assert_eq!(c, i);
136+
i += 1;
137+
}
138+
assert_eq!(i, 2 * ROWS_PER_PARTITION);
139+
}
140+
141+
#[tokio::test]
142+
async fn query_iter_no_results() {
143+
let (session, ks) = initialize_cluster_two_partitions().await;
144+
145+
let query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
146+
147+
let query_result = session.query_iter(query, ("part3",)).await.unwrap();
148+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
149+
150+
assert_eq!(iter.try_next().await.unwrap(), None);
151+
}
152+
153+
#[tokio::test]
154+
async fn query_iter_prepare_error() {
155+
let (session, ks) = initialize_cluster_two_partitions().await;
156+
157+
// Wrong table name
158+
let query = Query::new(format!("SELECT * FROM {}.test where k0 = ?", ks));
159+
160+
assert_matches!(
161+
session.query_iter(query, (PARTITION_KEY1,)).await,
162+
Err(PagerExecutionError::PrepareError(_))
163+
);
164+
}
165+
166+
#[tokio::test]
167+
async fn query_iter_serialization_error() {
168+
let (session, ks) = initialize_cluster_two_partitions().await;
169+
170+
let query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
171+
172+
// Wrong value type
173+
assert_matches!(
174+
session.query_iter(query, (1,)).await,
175+
Err(PagerExecutionError::SerializationError(_))
176+
);
177+
}
178+
179+
#[tokio::test]
180+
async fn execute_single_page_should_only_iterate_over_rows_in_current_page() {
181+
let (session, ks) = initialize_cluster_two_partitions().await;
182+
183+
let mut prepared_query = session
184+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
185+
.await
186+
.unwrap();
187+
prepared_query.set_page_size(PAGE_SIZE);
188+
189+
let paging_state = PagingState::start();
190+
let (rs_manual, paging_state_response) = session
191+
.execute_single_page(&prepared_query, (PARTITION_KEY1,), paging_state)
192+
.await
193+
.unwrap();
194+
let page_results = rs_manual
195+
.into_rows_result()
196+
.unwrap()
197+
.rows::<(String, i32, i32)>()
198+
.unwrap()
199+
.collect::<Result<Vec<_>, _>>()
200+
.unwrap();
201+
202+
assert!(page_results.len() <= PAGE_SIZE as usize);
203+
match paging_state_response {
204+
PagingStateResponse::HasMorePages { state: _ } => {}
205+
PagingStateResponse::NoMorePages => {
206+
panic!("Expected more pages");
207+
}
208+
}
209+
}
210+
211+
#[tokio::test]
212+
async fn execute_iter_should_iterate_over_all_pages_asynchronously_single_partition() {
213+
let (session, ks) = initialize_cluster_two_partitions().await;
214+
215+
let mut prepared_query = session
216+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
217+
.await
218+
.unwrap();
219+
prepared_query.set_page_size(PAGE_SIZE);
220+
221+
let query_result = session
222+
.execute_iter(prepared_query, (PARTITION_KEY1,))
223+
.await
224+
.unwrap();
225+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
226+
227+
let mut i = 0;
228+
229+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
230+
assert_eq!(a, PARTITION_KEY1);
231+
assert_eq!(b, i);
232+
assert_eq!(c, i);
233+
i += 1;
234+
}
235+
assert_eq!(i, ROWS_PER_PARTITION);
236+
}
237+
238+
#[tokio::test]
239+
async fn execute_iter_should_iterate_over_all_pages_asynchronously_cross_partition() {
240+
let (session, ks) = initialize_cluster_two_partitions().await;
241+
242+
let mut prepared_query = session
243+
.prepare(format!("SELECT * FROM {}.t", ks))
244+
.await
245+
.unwrap();
246+
prepared_query.set_page_size(PAGE_SIZE);
247+
248+
let query_result = session.execute_iter(prepared_query, ()).await.unwrap();
249+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
250+
251+
let mut i = 0;
252+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
253+
if i < ROWS_PER_PARTITION {
254+
assert_eq!(a, PARTITION_KEY1);
255+
} else {
256+
assert_eq!(a, PARTITION_KEY2);
257+
}
258+
assert_eq!(b, i);
259+
assert_eq!(c, i);
260+
i += 1;
261+
}
262+
assert_eq!(i, 2 * ROWS_PER_PARTITION);
263+
}
264+
265+
#[tokio::test]
266+
async fn execute_iter_no_results() {
267+
let (session, ks) = initialize_cluster_two_partitions().await;
268+
269+
let prepared_query = session
270+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
271+
.await
272+
.unwrap();
273+
274+
let query_result = session
275+
.execute_iter(prepared_query, ("part3",))
276+
.await
277+
.unwrap();
278+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
279+
280+
assert_eq!(iter.try_next().await.unwrap(), None);
281+
}
282+
283+
#[tokio::test]
284+
async fn execute_iter_serialization_error() {
285+
let (session, ks) = initialize_cluster_two_partitions().await;
286+
287+
let prepared_query = session
288+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
289+
.await
290+
.unwrap();
291+
292+
// Wrong value type
293+
assert_matches!(
294+
session.execute_iter(prepared_query, (1,)).await,
295+
Err(PagerExecutionError::SerializationError(_))
296+
)
297+
}

0 commit comments

Comments
 (0)