Skip to content

Commit 3c0da90

Browse files
committed
transport/timestamp_generator: Added MonotonicTimestampGenerator
Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation
1 parent be14812 commit 3c0da90

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

scylla/src/transport/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod retry_policy;
1818
pub mod session;
1919
pub mod session_builder;
2020
pub mod speculative_execution;
21+
pub mod timestamp_generator;
2122
pub mod topology;
2223

2324
pub use crate::frame::{Authenticator, Compression};
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::{
2+
ops::DerefMut,
3+
sync::atomic::AtomicI64,
4+
time::{SystemTime, UNIX_EPOCH},
5+
};
6+
7+
use futures::lock::Mutex;
8+
use std::sync::atomic::Ordering;
9+
use tokio::time::{Duration, Instant};
10+
use tracing::warn;
11+
12+
pub(crate) trait TimestampGenerator {
13+
async fn next(&self) -> i64;
14+
}
15+
16+
pub struct MonotonicTimestampGenerator {
17+
last: AtomicI64,
18+
last_warning: Mutex<Instant>,
19+
warning_threshold_us: i64,
20+
warning_interval_ms: i64,
21+
}
22+
23+
impl MonotonicTimestampGenerator {
24+
pub fn new_with_settings(warning_threshold_us: i64, warning_interval_ms: i64) -> Self {
25+
MonotonicTimestampGenerator {
26+
last: AtomicI64::new(0),
27+
last_warning: Mutex::new(Instant::now()),
28+
warning_threshold_us,
29+
warning_interval_ms,
30+
}
31+
}
32+
pub fn new() -> Self {
33+
MonotonicTimestampGenerator::new_with_settings(1000000, 1000)
34+
}
35+
36+
// This is guaranteed to return a monotonic timestamp. If clock skew is detected
37+
// then this method will increment the last timestamp.
38+
async fn compute_next(&self, last: i64) -> i64 {
39+
let current = SystemTime::now().duration_since(UNIX_EPOCH);
40+
if let Ok(cur_time) = current {
41+
let u_cur = cur_time.as_micros() as i64;
42+
if u_cur > last {
43+
return u_cur;
44+
} else if self.warning_threshold_us >= 0 && last - u_cur > self.warning_threshold_us {
45+
let mut last_warn = self.last_warning.lock().await;
46+
let now = Instant::now();
47+
if now
48+
>= last_warn
49+
.checked_add(Duration::from_millis(self.warning_interval_ms as u64))
50+
.unwrap()
51+
{
52+
*last_warn.deref_mut() = now;
53+
drop(last_warn);
54+
warn!(
55+
"Clock skew detected. The current time ({}) was {} \
56+
microseconds behind the last generated timestamp ({}). \
57+
The next generated timestamp will be artificially incremented \
58+
to guarantee monotonicity.",
59+
u_cur,
60+
last - u_cur,
61+
last
62+
)
63+
}
64+
}
65+
} else {
66+
warn!("Clock skew detected. The current time was behind UNIX epoch.");
67+
}
68+
69+
last + 1
70+
}
71+
}
72+
73+
impl Default for MonotonicTimestampGenerator {
74+
fn default() -> Self {
75+
Self::new()
76+
}
77+
}
78+
79+
impl TimestampGenerator for MonotonicTimestampGenerator {
80+
async fn next(&self) -> i64 {
81+
loop {
82+
let last = self.last.load(Ordering::SeqCst);
83+
let cur = self.compute_next(last).await;
84+
if self
85+
.last
86+
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst)
87+
.is_ok()
88+
{
89+
return cur;
90+
}
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)