Skip to content

Commit 8206bc0

Browse files

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod hygiene;
1111
mod large_batch_statements;
1212
mod lwt_optimisation;
1313
mod new_session;
14+
mod query_result;
1415
mod retries;
1516
mod self_identity;
1617
mod shards;
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
use futures::TryStreamExt;
2+
use scylla::{
3+
batch::{Batch, BatchType},
4+
client::session::Session,
5+
query::Query,
6+
};
7+
use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};
8+
9+
use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
10+
11+
const PAGE_SIZE: i32 = 100;
12+
const ROWS_PER_PARTITION: i32 = 1000;
13+
const PARTITION_KEY1: &str = "part";
14+
const PARTITION_KEY2: &str = "part2";
15+
16+
/// Initialize a cluster with a table and insert data into two partitions.
17+
/// Returns a session and the keyspace name.
18+
///
19+
/// # Example
20+
/// ```rust
21+
/// let (session, ks) = initialize_cluster_two_partitions().await;
22+
/// ```
23+
async fn initialize_cluster_two_partitions() -> (Session, String) {
24+
setup_tracing();
25+
let session = create_new_session_builder().build().await.unwrap();
26+
27+
let ks = unique_keyspace_name();
28+
29+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
30+
session
31+
.ddl(format!(
32+
"CREATE TABLE IF NOT EXISTS {}.t (k0 text, k1 int, v int, PRIMARY KEY(k0, k1))",
33+
ks
34+
))
35+
.await
36+
.unwrap();
37+
38+
let prepared = session
39+
.prepare(format!("INSERT INTO {}.t (k0, k1, v) VALUES (?, ?, ?)", ks))
40+
.await
41+
.unwrap();
42+
43+
let mut batch_part1 = Batch::new(BatchType::Unlogged);
44+
let mut batch_part2 = Batch::new(BatchType::Unlogged);
45+
let mut batch_values1 = Vec::new();
46+
let mut batch_values2 = Vec::new();
47+
48+
for i in 0..ROWS_PER_PARTITION {
49+
batch_part1.append_statement(prepared.clone());
50+
batch_values1.push((PARTITION_KEY1, i, i));
51+
batch_part2.append_statement(prepared.clone());
52+
batch_values2.push((
53+
PARTITION_KEY2,
54+
i + ROWS_PER_PARTITION,
55+
i + ROWS_PER_PARTITION,
56+
));
57+
}
58+
59+
session.batch(&batch_part1, &batch_values1).await.unwrap();
60+
session.batch(&batch_part2, &batch_values2).await.unwrap();
61+
62+
(session, ks)
63+
}
64+
65+
#[tokio::test]
66+
async fn query_single_page_should_only_iterate_over_rows_in_current_page() {
67+
let (session, ks) = initialize_cluster_two_partitions().await;
68+
69+
let mut query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
70+
query.set_page_size(PAGE_SIZE);
71+
72+
let paging_state = PagingState::start();
73+
let (rs_manual, paging_state_response) = session
74+
.query_single_page(query, (PARTITION_KEY1,), paging_state)
75+
.await
76+
.unwrap();
77+
let page_results = rs_manual
78+
.into_rows_result()
79+
.unwrap()
80+
.rows::<(String, i32, i32)>()
81+
.unwrap()
82+
.collect::<Result<Vec<_>, _>>()
83+
.unwrap();
84+
85+
assert!(page_results.len() <= PAGE_SIZE as usize);
86+
match paging_state_response {
87+
PagingStateResponse::HasMorePages { state: _ } => {}
88+
PagingStateResponse::NoMorePages => {
89+
panic!("Expected more pages");
90+
}
91+
}
92+
}
93+
94+
#[tokio::test]
95+
async fn query_iter_should_iterate_over_all_pages_asynchronously_single_partition() {
96+
let (session, ks) = initialize_cluster_two_partitions().await;
97+
98+
let mut query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
99+
query.set_page_size(PAGE_SIZE);
100+
101+
let query_result = session.query_iter(query, (PARTITION_KEY1,)).await.unwrap();
102+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
103+
104+
let mut i = 0;
105+
106+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
107+
assert_eq!(a, PARTITION_KEY1);
108+
assert_eq!(b, i);
109+
assert_eq!(c, i);
110+
i += 1;
111+
}
112+
assert_eq!(i, ROWS_PER_PARTITION);
113+
}
114+
115+
#[tokio::test]
116+
async fn query_iter_should_iterate_over_all_pages_asynchronously_cross_partition() {
117+
let (session, ks) = initialize_cluster_two_partitions().await;
118+
119+
let mut query = Query::new(format!("SELECT * FROM {}.t", ks));
120+
query.set_page_size(PAGE_SIZE);
121+
122+
let query_result = session.query_iter(query, ()).await.unwrap();
123+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
124+
125+
let mut i = 0;
126+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
127+
if i < ROWS_PER_PARTITION {
128+
assert_eq!(a, PARTITION_KEY1);
129+
} else {
130+
assert_eq!(a, PARTITION_KEY2);
131+
}
132+
assert_eq!(b, i);
133+
assert_eq!(c, i);
134+
i += 1;
135+
}
136+
assert_eq!(i, 2 * ROWS_PER_PARTITION);
137+
}
138+
139+
#[tokio::test]
140+
async fn execute_single_page_should_only_iterate_over_rows_in_current_page() {
141+
let (session, ks) = initialize_cluster_two_partitions().await;
142+
143+
let mut prepared_query = session
144+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
145+
.await
146+
.unwrap();
147+
prepared_query.set_page_size(PAGE_SIZE);
148+
149+
let paging_state = PagingState::start();
150+
let (rs_manual, paging_state_response) = session
151+
.execute_single_page(&prepared_query, (PARTITION_KEY1,), paging_state)
152+
.await
153+
.unwrap();
154+
let page_results = rs_manual
155+
.into_rows_result()
156+
.unwrap()
157+
.rows::<(String, i32, i32)>()
158+
.unwrap()
159+
.collect::<Result<Vec<_>, _>>()
160+
.unwrap();
161+
162+
assert!(page_results.len() <= PAGE_SIZE as usize);
163+
match paging_state_response {
164+
PagingStateResponse::HasMorePages { state: _ } => {}
165+
PagingStateResponse::NoMorePages => {
166+
panic!("Expected more pages");
167+
}
168+
}
169+
}
170+
171+
#[tokio::test]
172+
async fn execute_iter_should_iterate_over_all_pages_asynchronously_single_partition() {
173+
let (session, ks) = initialize_cluster_two_partitions().await;
174+
175+
let mut prepared_query = session
176+
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
177+
.await
178+
.unwrap();
179+
prepared_query.set_page_size(PAGE_SIZE);
180+
181+
let query_result = session
182+
.execute_iter(prepared_query, (PARTITION_KEY1,))
183+
.await
184+
.unwrap();
185+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
186+
187+
let mut i = 0;
188+
189+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
190+
assert_eq!(a, PARTITION_KEY1);
191+
assert_eq!(b, i);
192+
assert_eq!(c, i);
193+
i += 1;
194+
}
195+
assert_eq!(i, ROWS_PER_PARTITION);
196+
}
197+
198+
#[tokio::test]
199+
async fn execute_iter_should_iterate_over_all_pages_asynchronously_cross_partition() {
200+
let (session, ks) = initialize_cluster_two_partitions().await;
201+
202+
let mut prepared_query = session
203+
.prepare(format!("SELECT * FROM {}.t", ks))
204+
.await
205+
.unwrap();
206+
prepared_query.set_page_size(PAGE_SIZE);
207+
208+
let query_result = session.execute_iter(prepared_query, ()).await.unwrap();
209+
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();
210+
211+
let mut i = 0;
212+
while let Some((a, b, c)) = iter.try_next().await.unwrap() {
213+
if i < ROWS_PER_PARTITION {
214+
assert_eq!(a, PARTITION_KEY1);
215+
} else {
216+
assert_eq!(a, PARTITION_KEY2);
217+
}
218+
assert_eq!(b, i);
219+
assert_eq!(c, i);
220+
i += 1;
221+
}
222+
assert_eq!(i, 2 * ROWS_PER_PARTITION);
223+
}

0 commit comments

Comments
 (0)