Skip to content

Commit 811f0bf

Browse files
committed
transport/timestamp_generator: Added MonotonicTimestampGenerator
Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation
1 parent be14812 commit 811f0bf

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

scylla/src/transport/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod retry_policy;
1818
pub mod session;
1919
pub mod session_builder;
2020
pub mod speculative_execution;
21+
pub mod timestamp_generator;
2122
pub mod topology;
2223

2324
pub use crate::frame::{Authenticator, Compression};
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::{
2+
sync::atomic::AtomicI64,
3+
time::{SystemTime, UNIX_EPOCH},
4+
};
5+
6+
use async_trait::async_trait;
7+
use futures::lock::Mutex;
8+
use std::sync::atomic::Ordering;
9+
use tokio::time::{Duration, Instant};
10+
use tracing::warn;
11+
12+
#[async_trait]
13+
pub(crate) trait TimestampGenerator {
14+
async fn next_timestamp(&self) -> i64;
15+
}
16+
17+
pub struct MonotonicTimestampGenerator {
18+
last: AtomicI64,
19+
last_warning: Mutex<Instant>,
20+
warning_threshold_us: i64,
21+
warning_interval_ms: i64,
22+
}
23+
24+
impl MonotonicTimestampGenerator {
25+
pub fn new_with_settings(warning_threshold_us: i64, warning_interval_ms: i64) -> Self {
26+
MonotonicTimestampGenerator {
27+
last: AtomicI64::new(0),
28+
last_warning: Mutex::new(Instant::now()),
29+
warning_threshold_us,
30+
warning_interval_ms,
31+
}
32+
}
33+
pub fn new() -> Self {
34+
MonotonicTimestampGenerator::new_with_settings(1000000, 1000)
35+
}
36+
37+
// This is guaranteed to return a monotonic timestamp. If clock skew is detected
38+
// then this method will increment the last timestamp.
39+
async fn compute_next(&self, last: i64) -> i64 {
40+
let current = SystemTime::now().duration_since(UNIX_EPOCH);
41+
if let Ok(cur_time) = current {
42+
let u_cur = cur_time.as_micros() as i64;
43+
if u_cur > last {
44+
return u_cur;
45+
} else if self.warning_threshold_us >= 0 && last - u_cur > self.warning_threshold_us {
46+
let mut last_warn = self.last_warning.lock().await;
47+
let now = Instant::now();
48+
if now
49+
>= last_warn
50+
.checked_add(Duration::from_millis(self.warning_interval_ms as u64))
51+
.unwrap()
52+
{
53+
*last_warn = now;
54+
drop(last_warn);
55+
warn!(
56+
"Clock skew detected. The current time ({}) was {} \
57+
microseconds behind the last generated timestamp ({}). \
58+
The next generated timestamp will be artificially incremented \
59+
to guarantee monotonicity.",
60+
u_cur,
61+
last - u_cur,
62+
last
63+
)
64+
}
65+
}
66+
} else {
67+
warn!("Clock skew detected. The current time was behind UNIX epoch.");
68+
}
69+
70+
last + 1
71+
}
72+
}
73+
74+
impl Default for MonotonicTimestampGenerator {
75+
fn default() -> Self {
76+
Self::new()
77+
}
78+
}
79+
80+
#[async_trait]
81+
impl TimestampGenerator for MonotonicTimestampGenerator {
82+
async fn next_timestamp(&self) -> i64 {
83+
loop {
84+
let last = self.last.load(Ordering::SeqCst);
85+
let cur = self.compute_next(last).await;
86+
if self
87+
.last
88+
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst)
89+
.is_ok()
90+
{
91+
return cur;
92+
}
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)