Skip to content

Commit d6b9e09

Browse files
Move timestamp test to query.rs and add missing cases
1 parent f1b259d commit d6b9e09

File tree

3 files changed

+299
-194
lines changed

3 files changed

+299
-194
lines changed

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod hygiene;
1111
mod large_batch_statements;
1212
mod lwt_optimisation;
1313
mod new_session;
14+
mod query;
1415
mod query_result;
1516
mod retries;
1617
mod self_identity;

scylla/tests/integration/query.rs

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
2+
use scylla::{
3+
batch::{Batch, BatchType},
4+
query::Query,
5+
response::PagingState,
6+
};
7+
use std::collections::HashSet;
8+
use std::sync::{Arc, Mutex};
9+
10+
#[tokio::test]
11+
async fn test_timestamp() {
12+
setup_tracing();
13+
let session = create_new_session_builder().build().await.unwrap();
14+
let ks = unique_keyspace_name();
15+
16+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
17+
session
18+
.ddl(format!(
19+
"CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))",
20+
ks
21+
))
22+
.await
23+
.unwrap();
24+
25+
session.await_schema_agreement().await.unwrap();
26+
27+
let query_str = format!("INSERT INTO {}.t_timestamp (a, b) VALUES (?, ?)", ks);
28+
29+
// test regular query timestamps
30+
31+
let mut regular_query = Query::new(query_str.to_string());
32+
33+
regular_query.set_timestamp(Some(420));
34+
session
35+
.query_unpaged(regular_query.clone(), ("regular query", "higher timestamp"))
36+
.await
37+
.unwrap();
38+
39+
regular_query.set_timestamp(Some(42));
40+
session
41+
.query_unpaged(regular_query.clone(), ("regular query", "lower timestamp"))
42+
.await
43+
.unwrap();
44+
45+
// test regular query iter timestamps
46+
47+
let mut regular_query = Query::new(query_str.to_string());
48+
49+
regular_query.set_timestamp(Some(420));
50+
session
51+
.query_iter(
52+
regular_query.clone(),
53+
("regular query iter", "higher timestamp"),
54+
)
55+
.await
56+
.unwrap();
57+
58+
regular_query.set_timestamp(Some(42));
59+
session
60+
.query_iter(
61+
regular_query.clone(),
62+
("regular query iter", "lower timestamp"),
63+
)
64+
.await
65+
.unwrap();
66+
67+
// test regular query single page timestamps
68+
69+
let mut regular_query = Query::new(query_str.to_string());
70+
71+
regular_query.set_timestamp(Some(420));
72+
session
73+
.query_single_page(
74+
regular_query.clone(),
75+
("regular query single page", "higher timestamp"),
76+
PagingState::start(),
77+
)
78+
.await
79+
.unwrap();
80+
81+
regular_query.set_timestamp(Some(42));
82+
session
83+
.query_single_page(
84+
regular_query.clone(),
85+
("regular query single page", "lower timestamp"),
86+
PagingState::start(),
87+
)
88+
.await
89+
.unwrap();
90+
91+
// test prepared statement timestamps
92+
93+
let mut prepared_statement = session.prepare(query_str.clone()).await.unwrap();
94+
95+
prepared_statement.set_timestamp(Some(420));
96+
session
97+
.execute_unpaged(&prepared_statement, ("prepared query", "higher timestamp"))
98+
.await
99+
.unwrap();
100+
101+
prepared_statement.set_timestamp(Some(42));
102+
session
103+
.execute_unpaged(&prepared_statement, ("prepared query", "lower timestamp"))
104+
.await
105+
.unwrap();
106+
107+
// test prepared statement iter timestamps
108+
109+
let mut prepared_statement = session.prepare(query_str.clone()).await.unwrap();
110+
111+
prepared_statement.set_timestamp(Some(420));
112+
session
113+
.execute_iter(
114+
prepared_statement.clone(),
115+
("prepared query iter", "higher timestamp"),
116+
)
117+
.await
118+
.unwrap();
119+
120+
prepared_statement.set_timestamp(Some(42));
121+
session
122+
.execute_iter(
123+
prepared_statement,
124+
("prepared query iter", "lower timestamp"),
125+
)
126+
.await
127+
.unwrap();
128+
129+
// test prepared statement single page timestamps
130+
131+
let mut prepared_statement = session.prepare(query_str).await.unwrap();
132+
133+
prepared_statement.set_timestamp(Some(420));
134+
session
135+
.execute_single_page(
136+
&prepared_statement,
137+
("prepared query single page", "higher timestamp"),
138+
PagingState::start(),
139+
)
140+
.await
141+
.unwrap();
142+
143+
prepared_statement.set_timestamp(Some(42));
144+
session
145+
.execute_single_page(
146+
&prepared_statement,
147+
("prepared query single page", "lower timestamp"),
148+
PagingState::start(),
149+
)
150+
.await
151+
.unwrap();
152+
153+
// test batch statement timestamps
154+
155+
let mut batch: Batch = Default::default();
156+
batch.append_statement(regular_query);
157+
batch.append_statement(prepared_statement);
158+
159+
batch.set_timestamp(Some(420));
160+
session
161+
.batch(
162+
&batch,
163+
(
164+
("first query in batch", "higher timestamp"),
165+
("second query in batch", "higher timestamp"),
166+
),
167+
)
168+
.await
169+
.unwrap();
170+
171+
batch.set_timestamp(Some(42));
172+
session
173+
.batch(
174+
&batch,
175+
(
176+
("first query in batch", "lower timestamp"),
177+
("second query in batch", "lower timestamp"),
178+
),
179+
)
180+
.await
181+
.unwrap();
182+
183+
let query_rows_result = session
184+
.query_unpaged(
185+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_timestamp", ks),
186+
&[],
187+
)
188+
.await
189+
.unwrap()
190+
.into_rows_result()
191+
.unwrap();
192+
193+
let mut results = query_rows_result
194+
.rows::<(&str, &str, i64)>()
195+
.unwrap()
196+
.map(Result::unwrap)
197+
.collect::<Vec<_>>();
198+
results.sort();
199+
200+
let expected_results = [
201+
("first query in batch", "higher timestamp", 420),
202+
("prepared query", "higher timestamp", 420),
203+
("prepared query iter", "higher timestamp", 420),
204+
("prepared query single page", "higher timestamp", 420),
205+
("regular query", "higher timestamp", 420),
206+
("regular query iter", "higher timestamp", 420),
207+
("regular query single page", "higher timestamp", 420),
208+
("second query in batch", "higher timestamp", 420),
209+
]
210+
.into_iter()
211+
.collect::<Vec<_>>();
212+
213+
assert_eq!(results, expected_results);
214+
}
215+
216+
#[tokio::test]
217+
async fn test_timestamp_generator() {
218+
use rand::random;
219+
use scylla::policies::timestamp_generator::TimestampGenerator;
220+
221+
setup_tracing();
222+
struct LocalTimestampGenerator {
223+
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
224+
}
225+
226+
impl TimestampGenerator for LocalTimestampGenerator {
227+
fn next_timestamp(&self) -> i64 {
228+
let timestamp = random::<i64>().abs();
229+
self.generated_timestamps.lock().unwrap().insert(timestamp);
230+
timestamp
231+
}
232+
}
233+
234+
let timestamps = Arc::new(Mutex::new(HashSet::new()));
235+
let generator = LocalTimestampGenerator {
236+
generated_timestamps: timestamps.clone(),
237+
};
238+
239+
let session = create_new_session_builder()
240+
.timestamp_generator(Arc::new(generator))
241+
.build()
242+
.await
243+
.unwrap();
244+
let ks = unique_keyspace_name();
245+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
246+
session
247+
.ddl(format!(
248+
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
249+
ks
250+
))
251+
.await
252+
.unwrap();
253+
254+
let prepared = session
255+
.prepare(format!(
256+
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
257+
ks
258+
))
259+
.await
260+
.unwrap();
261+
session.execute_unpaged(&prepared, []).await.unwrap();
262+
263+
let unprepared = Query::new(format!(
264+
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
265+
ks
266+
));
267+
session.query_unpaged(unprepared, []).await.unwrap();
268+
269+
let mut batch = Batch::new(BatchType::Unlogged);
270+
let stmt = session
271+
.prepare(format!(
272+
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
273+
ks
274+
))
275+
.await
276+
.unwrap();
277+
batch.append_statement(stmt);
278+
session.batch(&batch, &((),)).await.unwrap();
279+
280+
let query_rows_result = session
281+
.query_unpaged(
282+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
283+
&[],
284+
)
285+
.await
286+
.unwrap()
287+
.into_rows_result()
288+
.unwrap();
289+
290+
let timestamps_locked = timestamps.lock().unwrap();
291+
assert!(query_rows_result
292+
.rows::<(i32, i32, i64)>()
293+
.unwrap()
294+
.map(|row_result| row_result.unwrap())
295+
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
296+
}

0 commit comments

Comments
 (0)