Skip to content

Commit 4da5785

Browse files
Move timestamp test to query.rs and add missing cases
1 parent f1b259d commit 4da5785

File tree

3 files changed

+303
-194
lines changed

3 files changed

+303
-194
lines changed

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod hygiene;
1111
mod large_batch_statements;
1212
mod lwt_optimisation;
1313
mod new_session;
14+
mod query;
1415
mod query_result;
1516
mod retries;
1617
mod self_identity;

scylla/tests/integration/query.rs

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
2+
use scylla::{
3+
batch::{Batch, BatchType},
4+
client::session::Session,
5+
observability::tracing::TracingInfo,
6+
query::Query,
7+
response::{query_result::QueryResult, PagingState, PagingStateResponse},
8+
};
9+
use scylla_cql::Consistency;
10+
use std::collections::HashSet;
11+
use std::sync::{Arc, Mutex};
12+
use uuid::Uuid;
13+
14+
#[tokio::test]
15+
async fn test_timestamp() {
16+
setup_tracing();
17+
let session = create_new_session_builder().build().await.unwrap();
18+
let ks = unique_keyspace_name();
19+
20+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
21+
session
22+
.ddl(format!(
23+
"CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))",
24+
ks
25+
))
26+
.await
27+
.unwrap();
28+
29+
session.await_schema_agreement().await.unwrap();
30+
31+
let query_str = format!("INSERT INTO {}.t_timestamp (a, b) VALUES (?, ?)", ks);
32+
33+
// test regular query timestamps
34+
35+
let mut regular_query = Query::new(query_str.to_string());
36+
37+
regular_query.set_timestamp(Some(420));
38+
session
39+
.query_unpaged(regular_query.clone(), ("regular query", "higher timestamp"))
40+
.await
41+
.unwrap();
42+
43+
regular_query.set_timestamp(Some(42));
44+
session
45+
.query_unpaged(regular_query.clone(), ("regular query", "lower timestamp"))
46+
.await
47+
.unwrap();
48+
49+
// test regular query iter timestamps
50+
51+
let mut regular_query = Query::new(query_str.to_string());
52+
53+
regular_query.set_timestamp(Some(420));
54+
session
55+
.query_iter(
56+
regular_query.clone(),
57+
("regular query iter", "higher timestamp"),
58+
)
59+
.await
60+
.unwrap();
61+
62+
regular_query.set_timestamp(Some(42));
63+
session
64+
.query_iter(
65+
regular_query.clone(),
66+
("regular query iter", "lower timestamp"),
67+
)
68+
.await
69+
.unwrap();
70+
71+
// test regular query single page timestamps
72+
73+
let mut regular_query = Query::new(query_str.to_string());
74+
75+
regular_query.set_timestamp(Some(420));
76+
session
77+
.query_single_page(
78+
regular_query.clone(),
79+
("regular query single page", "higher timestamp"),
80+
PagingState::start(),
81+
)
82+
.await
83+
.unwrap();
84+
85+
regular_query.set_timestamp(Some(42));
86+
session
87+
.query_single_page(
88+
regular_query.clone(),
89+
("regular query single page", "lower timestamp"),
90+
PagingState::start(),
91+
)
92+
.await
93+
.unwrap();
94+
95+
// test prepared statement timestamps
96+
97+
let mut prepared_statement = session.prepare(query_str.clone()).await.unwrap();
98+
99+
prepared_statement.set_timestamp(Some(420));
100+
session
101+
.execute_unpaged(&prepared_statement, ("prepared query", "higher timestamp"))
102+
.await
103+
.unwrap();
104+
105+
prepared_statement.set_timestamp(Some(42));
106+
session
107+
.execute_unpaged(&prepared_statement, ("prepared query", "lower timestamp"))
108+
.await
109+
.unwrap();
110+
111+
// test prepared statement iter timestamps
112+
113+
let mut prepared_statement = session.prepare(query_str.clone()).await.unwrap();
114+
115+
prepared_statement.set_timestamp(Some(420));
116+
session
117+
.execute_iter(
118+
prepared_statement.clone(),
119+
("prepared query iter", "higher timestamp"),
120+
)
121+
.await
122+
.unwrap();
123+
124+
prepared_statement.set_timestamp(Some(42));
125+
session
126+
.execute_iter(
127+
prepared_statement,
128+
("prepared query iter", "lower timestamp"),
129+
)
130+
.await
131+
.unwrap();
132+
133+
// test prepared statement single page timestamps
134+
135+
let mut prepared_statement = session.prepare(query_str).await.unwrap();
136+
137+
prepared_statement.set_timestamp(Some(420));
138+
session
139+
.execute_single_page(
140+
&prepared_statement,
141+
("prepared query single page", "higher timestamp"),
142+
PagingState::start(),
143+
)
144+
.await
145+
.unwrap();
146+
147+
prepared_statement.set_timestamp(Some(42));
148+
session
149+
.execute_single_page(
150+
&prepared_statement,
151+
("prepared query single page", "lower timestamp"),
152+
PagingState::start(),
153+
)
154+
.await
155+
.unwrap();
156+
157+
// test batch statement timestamps
158+
159+
let mut batch: Batch = Default::default();
160+
batch.append_statement(regular_query);
161+
batch.append_statement(prepared_statement);
162+
163+
batch.set_timestamp(Some(420));
164+
session
165+
.batch(
166+
&batch,
167+
(
168+
("first query in batch", "higher timestamp"),
169+
("second query in batch", "higher timestamp"),
170+
),
171+
)
172+
.await
173+
.unwrap();
174+
175+
batch.set_timestamp(Some(42));
176+
session
177+
.batch(
178+
&batch,
179+
(
180+
("first query in batch", "lower timestamp"),
181+
("second query in batch", "lower timestamp"),
182+
),
183+
)
184+
.await
185+
.unwrap();
186+
187+
let query_rows_result = session
188+
.query_unpaged(
189+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_timestamp", ks),
190+
&[],
191+
)
192+
.await
193+
.unwrap()
194+
.into_rows_result()
195+
.unwrap();
196+
197+
let mut results = query_rows_result
198+
.rows::<(&str, &str, i64)>()
199+
.unwrap()
200+
.map(Result::unwrap)
201+
.collect::<Vec<_>>();
202+
results.sort();
203+
204+
let expected_results = [
205+
("first query in batch", "higher timestamp", 420),
206+
("prepared query", "higher timestamp", 420),
207+
("prepared query iter", "higher timestamp", 420),
208+
("prepared query single page", "higher timestamp", 420),
209+
("regular query", "higher timestamp", 420),
210+
("regular query iter", "higher timestamp", 420),
211+
("regular query single page", "higher timestamp", 420),
212+
("second query in batch", "higher timestamp", 420),
213+
]
214+
.into_iter()
215+
.collect::<Vec<_>>();
216+
217+
assert_eq!(results, expected_results);
218+
}
219+
220+
#[tokio::test]
221+
async fn test_timestamp_generator() {
222+
use rand::random;
223+
use scylla::policies::timestamp_generator::TimestampGenerator;
224+
225+
setup_tracing();
226+
struct LocalTimestampGenerator {
227+
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
228+
}
229+
230+
impl TimestampGenerator for LocalTimestampGenerator {
231+
fn next_timestamp(&self) -> i64 {
232+
let timestamp = random::<i64>().abs();
233+
self.generated_timestamps.lock().unwrap().insert(timestamp);
234+
timestamp
235+
}
236+
}
237+
238+
let timestamps = Arc::new(Mutex::new(HashSet::new()));
239+
let generator = LocalTimestampGenerator {
240+
generated_timestamps: timestamps.clone(),
241+
};
242+
243+
let session = create_new_session_builder()
244+
.timestamp_generator(Arc::new(generator))
245+
.build()
246+
.await
247+
.unwrap();
248+
let ks = unique_keyspace_name();
249+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
250+
session
251+
.ddl(format!(
252+
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
253+
ks
254+
))
255+
.await
256+
.unwrap();
257+
258+
let prepared = session
259+
.prepare(format!(
260+
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
261+
ks
262+
))
263+
.await
264+
.unwrap();
265+
session.execute_unpaged(&prepared, []).await.unwrap();
266+
267+
let unprepared = Query::new(format!(
268+
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
269+
ks
270+
));
271+
session.query_unpaged(unprepared, []).await.unwrap();
272+
273+
let mut batch = Batch::new(BatchType::Unlogged);
274+
let stmt = session
275+
.prepare(format!(
276+
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
277+
ks
278+
))
279+
.await
280+
.unwrap();
281+
batch.append_statement(stmt);
282+
session.batch(&batch, &((),)).await.unwrap();
283+
284+
let query_rows_result = session
285+
.query_unpaged(
286+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
287+
&[],
288+
)
289+
.await
290+
.unwrap()
291+
.into_rows_result()
292+
.unwrap();
293+
294+
let timestamps_locked = timestamps.lock().unwrap();
295+
assert!(query_rows_result
296+
.rows::<(i32, i32, i64)>()
297+
.unwrap()
298+
.map(|row_result| row_result.unwrap())
299+
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
300+
}

0 commit comments

Comments
 (0)