|
| 1 | +use std::{ |
| 2 | + sync::atomic::AtomicI64, |
| 3 | + time::{SystemTime, UNIX_EPOCH}, |
| 4 | +}; |
| 5 | + |
| 6 | +use std::sync::atomic::Ordering; |
| 7 | +use std::sync::Mutex; |
| 8 | +use tokio::time::{Duration, Instant}; |
| 9 | +use tracing::warn; |
| 10 | + |
| 11 | +/// Trait used to represent a timestamp generator |
| 12 | +pub trait TimestampGenerator: Send + Sync { |
| 13 | + /// This generates a new timestamp |
| 14 | + fn next_timestamp(&self) -> i64; |
| 15 | +} |
| 16 | + |
| 17 | +/// Basic timestamp generator. Provides no guarantees, if system clock returns |
| 18 | +/// time before UNIX epoch it panics. |
| 19 | +#[derive(Default)] |
| 20 | +pub struct SimpleTimestampGenerator {} |
| 21 | + |
| 22 | +impl SimpleTimestampGenerator { |
| 23 | + pub fn new() -> Self { |
| 24 | + SimpleTimestampGenerator {} |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +impl TimestampGenerator for SimpleTimestampGenerator { |
| 29 | + fn next_timestamp(&self) -> i64 { |
| 30 | + SystemTime::now() |
| 31 | + .duration_since(UNIX_EPOCH) |
| 32 | + .unwrap() |
| 33 | + .as_micros() as i64 |
| 34 | + } |
| 35 | +} |
| 36 | + |
| 37 | +/// Warning configuration for MonotonicTimestampGenerator |
| 38 | +struct MonotonicTimestampGeneratorWarningsCfg { |
| 39 | + warning_threshold: Duration, |
| 40 | + warning_interval: Duration, |
| 41 | +} |
| 42 | + |
| 43 | +/// Monotonic timestamp generator. Guarantees monotonicity of timestamps. |
| 44 | +/// If system clock will not provide an increased timestamp, then the timestamp will |
| 45 | +/// be artificially increased. If the config is provided and the clock skew is bigger than |
| 46 | +/// warning_threshold (by default 1 second), then the user will be warned about |
| 47 | +/// the skew repeatedly, with warning_interval provided in the settings (by default 1 second). |
| 48 | +/// Remember that this generator only guarantees monotonicity within one instance of this struct! |
| 49 | +/// If you create multiple instances the monotonicity guarantee becomes void. |
| 50 | +pub struct MonotonicTimestampGenerator { |
| 51 | + last: AtomicI64, |
| 52 | + last_warning: Mutex<Instant>, |
| 53 | + config: Option<MonotonicTimestampGeneratorWarningsCfg>, |
| 54 | +} |
| 55 | + |
| 56 | +impl MonotonicTimestampGenerator { |
| 57 | + /// Creates a new monotonic timestamp generator with default settings |
| 58 | + pub fn new() -> Self { |
| 59 | + MonotonicTimestampGenerator { |
| 60 | + last: AtomicI64::new(0), |
| 61 | + last_warning: Mutex::new(Instant::now()), |
| 62 | + config: Some(MonotonicTimestampGeneratorWarningsCfg { |
| 63 | + warning_threshold: Duration::from_secs(1), |
| 64 | + warning_interval: Duration::from_secs(1), |
| 65 | + }), |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | + pub fn with_warning_times( |
| 70 | + mut self, |
| 71 | + warning_threshold: Duration, |
| 72 | + warning_interval: Duration, |
| 73 | + ) -> Self { |
| 74 | + self.config = Some(MonotonicTimestampGeneratorWarningsCfg { |
| 75 | + warning_threshold, |
| 76 | + warning_interval, |
| 77 | + }); |
| 78 | + self |
| 79 | + } |
| 80 | + |
| 81 | + pub fn without_warnings(mut self) -> Self { |
| 82 | + self.config = None; |
| 83 | + self |
| 84 | + } |
| 85 | + |
| 86 | + // This is guaranteed to return a monotonic timestamp. If clock skew is detected |
| 87 | + // then this method will increment the last timestamp. |
| 88 | + fn compute_next(&self, last: i64) -> i64 { |
| 89 | + let current = SystemTime::now().duration_since(UNIX_EPOCH); |
| 90 | + if let Ok(cur_time) = current { |
| 91 | + // We have generated a valid timestamp |
| 92 | + let u_cur = cur_time.as_micros() as i64; |
| 93 | + if u_cur > last { |
| 94 | + // We have generated a valid, monotonic timestamp |
| 95 | + return u_cur; |
| 96 | + } else if let Some(cfg) = self.config.as_ref() { |
| 97 | + // We have detected clock skew, we will increment the last timestamp, and check if we should warn the user |
| 98 | + if last - u_cur > cfg.warning_threshold.as_micros() as i64 { |
| 99 | + // We have detected a clock skew bigger than the threshold, we check if we warned the user recently |
| 100 | + let mut last_warn = self.last_warning.lock().unwrap(); |
| 101 | + let now = Instant::now(); |
| 102 | + if now >= last_warn.checked_add(cfg.warning_interval).unwrap() { |
| 103 | + // We have not warned the user recently, we will warn the user |
| 104 | + *last_warn = now; |
| 105 | + drop(last_warn); |
| 106 | + warn!( |
| 107 | + "Clock skew detected. The current time ({}) was {} \ |
| 108 | + microseconds behind the last generated timestamp ({}). \ |
| 109 | + The next generated timestamp will be artificially incremented \ |
| 110 | + to guarantee monotonicity.", |
| 111 | + u_cur, |
| 112 | + last - u_cur, |
| 113 | + last |
| 114 | + ) |
| 115 | + } |
| 116 | + } |
| 117 | + } |
| 118 | + } else { |
| 119 | + // We have generated a timestamp before UNIX epoch, we will warn the user and increment the last timestamp |
| 120 | + warn!("Clock skew detected. The current time was behind UNIX epoch."); |
| 121 | + } |
| 122 | + |
| 123 | + last + 1 |
| 124 | + } |
| 125 | +} |
| 126 | + |
| 127 | +impl Default for MonotonicTimestampGenerator { |
| 128 | + fn default() -> Self { |
| 129 | + Self::new() |
| 130 | + } |
| 131 | +} |
| 132 | + |
| 133 | +impl TimestampGenerator for MonotonicTimestampGenerator { |
| 134 | + fn next_timestamp(&self) -> i64 { |
| 135 | + loop { |
| 136 | + let last = self.last.load(Ordering::SeqCst); |
| 137 | + let cur = self.compute_next(last); |
| 138 | + if self |
| 139 | + .last |
| 140 | + .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) |
| 141 | + .is_ok() |
| 142 | + { |
| 143 | + return cur; |
| 144 | + } |
| 145 | + } |
| 146 | + } |
| 147 | +} |
0 commit comments