Skip to content

Commit 3b9b4b5

Browse files
committed
client/session_test: Added tests for MonotonicTimestampGenerator
1 parent e6868ad commit 3b9b4b5

File tree

2 files changed

+134
-2
lines changed

2 files changed

+134
-2
lines changed

scylla/src/client/session_test.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::execution_profile::ExecutionProfile;
33
use super::session::Session;
44
use super::session_builder::SessionBuilder;
55
use crate as scylla;
6-
use crate::batch::{Batch, BatchStatement};
6+
use crate::batch::{Batch, BatchStatement, BatchType};
77
use crate::cluster::metadata::Strategy::NetworkTopologyStrategy;
88
use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType};
99
use crate::deserialize::DeserializeOwnedValue;
@@ -32,7 +32,7 @@ use scylla_cql::types::serialize::value::SerializeValue;
3232
use std::collections::{BTreeMap, HashMap};
3333
use std::collections::{BTreeSet, HashSet};
3434
use std::sync::atomic::{AtomicBool, Ordering};
35-
use std::sync::Arc;
35+
use std::sync::{Arc, Mutex};
3636
use tokio::net::TcpListener;
3737
use uuid::Uuid;
3838

@@ -1327,6 +1327,85 @@ async fn test_timestamp() {
13271327
assert_eq!(results, expected_results);
13281328
}
13291329

1330+
#[tokio::test]
1331+
async fn test_timestamp_generator() {
1332+
use crate::policies::timestamp_generator::TimestampGenerator;
1333+
use rand::random;
1334+
1335+
setup_tracing();
1336+
struct LocalTimestampGenerator {
1337+
generated_timestamps: Arc<Mutex<HashSet<i64>>>,
1338+
}
1339+
1340+
impl TimestampGenerator for LocalTimestampGenerator {
1341+
fn next_timestamp(&self) -> i64 {
1342+
let timestamp = random::<i64>().abs();
1343+
self.generated_timestamps.lock().unwrap().insert(timestamp);
1344+
timestamp
1345+
}
1346+
}
1347+
1348+
let timestamps = Arc::new(Mutex::new(HashSet::new()));
1349+
let generator = LocalTimestampGenerator {
1350+
generated_timestamps: timestamps.clone(),
1351+
};
1352+
1353+
let session = create_new_session_builder()
1354+
.timestamp_generator(Arc::new(generator))
1355+
.build()
1356+
.await
1357+
.unwrap();
1358+
let ks = unique_keyspace_name();
1359+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
1360+
session
1361+
.ddl(format!(
1362+
"CREATE TABLE IF NOT EXISTS {}.t_generator (a int primary key, b int)",
1363+
ks
1364+
))
1365+
.await
1366+
.unwrap();
1367+
let prepared = session
1368+
.prepare(format!(
1369+
"INSERT INTO {}.t_generator (a, b) VALUES (1, 1)",
1370+
ks
1371+
))
1372+
.await
1373+
.unwrap();
1374+
session.execute_unpaged(&prepared, []).await.unwrap();
1375+
let unprepared = Query::new(format!(
1376+
"INSERT INTO {}.t_generator (a, b) VALUES (2, 2)",
1377+
ks
1378+
));
1379+
session.query_unpaged(unprepared, []).await.unwrap();
1380+
let mut batch = Batch::new(BatchType::Unlogged);
1381+
let stmt = session
1382+
.prepare(format!(
1383+
"INSERT INTO {}.t_generator (a, b) VALUES (3, 3)",
1384+
ks
1385+
))
1386+
.await
1387+
.unwrap();
1388+
batch.append_statement(stmt);
1389+
session.batch(&batch, &((),)).await.unwrap();
1390+
1391+
let query_rows_result = session
1392+
.query_unpaged(
1393+
format!("SELECT a, b, WRITETIME(b) FROM {}.t_generator", ks),
1394+
&[],
1395+
)
1396+
.await
1397+
.unwrap()
1398+
.into_rows_result()
1399+
.unwrap();
1400+
1401+
let timestamps_locked = timestamps.lock().unwrap();
1402+
assert!(query_rows_result
1403+
.rows::<(i32, i32, i64)>()
1404+
.unwrap()
1405+
.map(|row_result| row_result.unwrap())
1406+
.all(|(_a, _b, writetime)| timestamps_locked.contains(&writetime)));
1407+
}
1408+
13301409
#[ignore = "works on remote Scylla instances only (local ones are too fast)"]
13311410
#[tokio::test]
13321411
async fn test_request_timeout() {

scylla/src/policies/timestamp_generator.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,56 @@ impl TimestampGenerator for MonotonicTimestampGenerator {
145145
}
146146
}
147147
}
148+
149+
#[test]
150+
fn monotonic_timestamp_generator_is_monotonic() {
151+
const NUMBER_OF_ITERATIONS: u32 = 1000;
152+
153+
let mut prev = None;
154+
let mut cur;
155+
let generator = MonotonicTimestampGenerator::new();
156+
for _ in 0..NUMBER_OF_ITERATIONS {
157+
cur = generator.next_timestamp();
158+
if let Some(prev_val) = prev {
159+
assert!(cur > prev_val);
160+
}
161+
prev = Some(cur);
162+
}
163+
}
164+
165+
#[test]
166+
fn monotonic_timestamp_generator_is_monotonic_with_concurrency() {
167+
use std::collections::HashSet;
168+
use std::sync::Arc;
169+
170+
const NUMBER_OF_ITERATIONS: usize = 1000;
171+
const NUMBER_OF_THREADS: usize = 10;
172+
let generator = Arc::new(MonotonicTimestampGenerator::new());
173+
let timestamps_sets: Vec<_> = std::thread::scope(|s| {
174+
(0..NUMBER_OF_THREADS)
175+
.map(|_| {
176+
s.spawn(|| {
177+
let timestamps: Vec<i64> = (0..NUMBER_OF_ITERATIONS)
178+
.map(|_| generator.next_timestamp())
179+
.collect();
180+
assert!(timestamps.windows(2).all(|w| w[0] < w[1]));
181+
let timestamps_set: HashSet<i64> = HashSet::from_iter(timestamps);
182+
assert_eq!(
183+
timestamps_set.len(),
184+
NUMBER_OF_ITERATIONS,
185+
"Colliding values in a single thread"
186+
);
187+
timestamps_set
188+
})
189+
})
190+
.map(|handle| handle.join().unwrap())
191+
.collect()
192+
});
193+
194+
let full_set: HashSet<i64> = timestamps_sets.iter().flatten().copied().collect();
195+
assert_eq!(
196+
full_set.len(),
197+
NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS,
198+
"Colliding values between threads"
199+
);
200+
}

0 commit comments

Comments
 (0)