Skip to content

Commit be5ed74

Browse files
authored
Support configurable timestamp resolution in rust simulation (#404)
* sim-rs: add config settings for timestamp resolution * sim-rs: use u64 instead of duration in timestamp * sim-rs: support coarse-grained timestamps in network layer * sim-rs: support configurable timestamp resolution
1 parent 28d2364 commit be5ed74

File tree

12 files changed

+172
-91
lines changed

12 files changed

+172
-91
lines changed

data/simulation/config.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ export interface Config {
2222
"treat-blocks-as-full": boolean;
2323
/** Only supported by Haskell simulation. */
2424
"cleanup-policies": CleanupPolicies;
25+
/**
26+
* The smallest unit of time to simulate, in fractions of a millisecond.
27+
* Only supported by Rust simulation. */
28+
"timestamp-resolution-ms": number;
2529

2630
// Leios Protocol Configuration
2731
"leios-variant": LeiosVariant;

data/simulation/config.default.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ multiplex-mini-protocols: true
1717
simulate-transactions: true
1818
treat-blocks-as-full: false
1919
cleanup-policies: ["cleanup-expired-vote"]
20+
timestamp-resolution-ms: 0.000001
2021

2122
################################################################################
2223
# Leios Protocol Configuration

data/simulation/config.schema.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,10 @@
347347
"description": "Only supported by Haskell simulation.",
348348
"type": "boolean"
349349
},
350+
"timestamp-resolution-ms": {
351+
"description": "The smallest unit of time to simulate, in fractions of a millisecond.\nOnly supported by Rust simulation.",
352+
"type": "number"
353+
},
350354
"treat-blocks-as-full": {
351355
"description": "When `true`, any delays and message sizes are calculated as if\neach block contained as much data as the expected average, rounded up.\nIn particular, for the sake of the above, we consider that:\n - Each RB includes a certificate.\n - Certificates contain votes from `vote-threshold` nodes.\n - Vote bundles vote for `ceil eb-generation-probability` EBs.\n - EBs reference `ceil (ib-generation-probability * leios-stage-length-slots)` IBs.\nOnly supported by Haskell simulation.",
352356
"type": "boolean"

sim-rs/sim-cli/src/events/aggregate.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
collections::{BTreeMap, BTreeSet},
3-
time::Duration,
4-
};
1+
use std::collections::{BTreeMap, BTreeSet};
52

63
use serde::Serialize;
74
use sim_core::{
@@ -236,7 +233,7 @@ impl TraceAggregator {
236233
self.current_time = event.time_s;
237234
if current_chunk != new_chunk {
238235
if new_chunk % 4 == 0 {
239-
let timestamp = Duration::from_secs((new_chunk / 4) as u64).into();
236+
let timestamp = Timestamp::from_secs((new_chunk / 4) as u64);
240237
self.tx_counts.push(self.produce_tx_counts(timestamp));
241238
}
242239
Some(self.produce_message())

sim-rs/sim-cli/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ async fn main() -> Result<()> {
140140
let monitor = tokio::spawn(EventMonitor::new(&config, events_source, args.output).run());
141141
pin!(monitor);
142142

143-
let clock_coordinator = ClockCoordinator::new();
143+
let clock_coordinator = ClockCoordinator::new(config.timestamp_resolution);
144144
let clock = clock_coordinator.clock();
145145
let tracker = EventTracker::new(events_sink, clock.clone(), &config.nodes);
146146
let mut simulation = Simulation::new(config, tracker, clock_coordinator).await?;

sim-rs/sim-core/src/clock.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
pin::Pin,
55
sync::{atomic::AtomicUsize, Arc},
66
task::{Context, Poll},
7+
time::Duration,
78
};
89

910
pub use coordinator::ClockCoordinator;
@@ -48,6 +49,7 @@ impl<T> Ord for FutureEvent<T> {
4849

4950
#[derive(Clone)]
5051
pub struct Clock {
52+
timestamp_resolution: Duration,
5153
time: Arc<AtomicTimestamp>,
5254
waiters: Arc<AtomicUsize>,
5355
tasks: Arc<AtomicUsize>,
@@ -56,12 +58,14 @@ pub struct Clock {
5658

5759
impl Clock {
5860
fn new(
61+
timestamp_resolution: Duration,
5962
time: Arc<AtomicTimestamp>,
6063
waiters: Arc<AtomicUsize>,
6164
tasks: Arc<AtomicUsize>,
6265
tx: mpsc::UnboundedSender<ClockEvent>,
6366
) -> Self {
6467
Self {
68+
timestamp_resolution,
6569
time,
6670
waiters,
6771
tasks,
@@ -79,6 +83,7 @@ impl Clock {
7983
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
8084
ClockBarrier {
8185
id,
86+
timestamp_resolution: self.timestamp_resolution,
8287
time: self.time.clone(),
8388
tasks: self.tasks.clone(),
8489
tx: self.tx.clone(),
@@ -88,6 +93,7 @@ impl Clock {
8893

8994
pub struct ClockBarrier {
9095
id: usize,
96+
timestamp_resolution: Duration,
9197
time: Arc<AtomicTimestamp>,
9298
tasks: Arc<AtomicUsize>,
9399
tx: mpsc::UnboundedSender<ClockEvent>,
@@ -107,7 +113,7 @@ impl ClockBarrier {
107113
}
108114

109115
pub fn wait_until(&mut self, timestamp: Timestamp) -> Waiter {
110-
self.wait(Some(timestamp))
116+
self.wait(Some(timestamp.with_resolution(self.timestamp_resolution)))
111117
}
112118

113119
pub fn wait_forever(&mut self) -> Waiter {

sim-rs/sim-core/src/clock/coordinator.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,30 @@ use std::{
44
atomic::{AtomicUsize, Ordering},
55
Arc,
66
},
7+
time::Duration,
78
};
89

910
use tokio::sync::{mpsc, oneshot};
1011

1112
use super::{timestamp::AtomicTimestamp, Clock, Timestamp};
1213

1314
pub struct ClockCoordinator {
15+
timestamp_resolution: Duration,
1416
time: Arc<AtomicTimestamp>,
1517
tx: mpsc::UnboundedSender<ClockEvent>,
1618
rx: mpsc::UnboundedReceiver<ClockEvent>,
1719
waiter_count: Arc<AtomicUsize>,
1820
tasks: Arc<AtomicUsize>,
1921
}
2022

21-
impl Default for ClockCoordinator {
22-
fn default() -> Self {
23-
Self::new()
24-
}
25-
}
26-
2723
impl ClockCoordinator {
28-
pub fn new() -> Self {
24+
pub fn new(timestamp_resolution: Duration) -> Self {
2925
let time = Arc::new(AtomicTimestamp::new(Timestamp::zero()));
3026
let (tx, rx) = mpsc::unbounded_channel();
3127
let waiter_count = Arc::new(AtomicUsize::new(0));
3228
let tasks = Arc::new(AtomicUsize::new(0));
3329
Self {
30+
timestamp_resolution,
3431
time,
3532
tx,
3633
rx,
@@ -41,6 +38,7 @@ impl ClockCoordinator {
4138

4239
pub fn clock(&self) -> Clock {
4340
Clock::new(
41+
self.timestamp_resolution,
4442
self.time.clone(),
4543
self.waiter_count.clone(),
4644
self.tasks.clone(),
@@ -129,9 +127,11 @@ mod tests {
129127

130128
use super::ClockCoordinator;
131129

130+
const TIMESTAMP_RESOLUTION: Duration = Duration::from_nanos(1);
131+
132132
#[tokio::test]
133133
async fn should_wait_once_all_workers_are_waiting() {
134-
let mut coordinator = ClockCoordinator::new();
134+
let mut coordinator = ClockCoordinator::new(TIMESTAMP_RESOLUTION);
135135
let clock = coordinator.clock();
136136
let t0 = clock.now();
137137
let t1 = t0 + Duration::from_millis(5);
@@ -158,7 +158,7 @@ mod tests {
158158

159159
#[tokio::test]
160160
async fn should_cancel_wait_when_wait_future_is_dropped() {
161-
let mut coordinator = ClockCoordinator::new();
161+
let mut coordinator = ClockCoordinator::new(TIMESTAMP_RESOLUTION);
162162
let clock = coordinator.clock();
163163
let t0 = clock.now();
164164
let t1 = t0 + Duration::from_millis(5);
@@ -186,7 +186,7 @@ mod tests {
186186

187187
#[tokio::test]
188188
async fn should_avoid_race_condition() {
189-
let mut coordinator = ClockCoordinator::new();
189+
let mut coordinator = ClockCoordinator::new(TIMESTAMP_RESOLUTION);
190190
let clock = coordinator.clock();
191191
let t0 = clock.now();
192192
let t1 = t0 + Duration::from_millis(5);
@@ -219,7 +219,7 @@ mod tests {
219219

220220
#[tokio::test]
221221
async fn should_allow_time_to_stand_still() {
222-
let mut coordinator = ClockCoordinator::new();
222+
let mut coordinator = ClockCoordinator::new(TIMESTAMP_RESOLUTION);
223223
let clock = coordinator.clock();
224224
let t0 = clock.now();
225225
let t1 = t0 + Duration::from_millis(5);
@@ -253,7 +253,7 @@ mod tests {
253253

254254
#[tokio::test]
255255
async fn should_allow_waiting_forever() {
256-
let mut coordinator = ClockCoordinator::new();
256+
let mut coordinator = ClockCoordinator::new(TIMESTAMP_RESOLUTION);
257257
let clock = coordinator.clock();
258258
let t0 = clock.now();
259259
let t1 = t0 + Duration::from_millis(5);

sim-rs/sim-core/src/clock/timestamp.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,57 +6,57 @@ use std::{
66

77
use serde::Serialize;
88

9+
const NANOS_PER_SEC: u64 = 1_000_000_000;
10+
911
/// A timestamp tracks the time from the start of the simulation.
1012
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
11-
pub struct Timestamp(Duration);
13+
pub struct Timestamp(u64);
1214

1315
impl Timestamp {
1416
pub fn zero() -> Self {
15-
Self(Duration::from_secs(0))
17+
Self(0)
1618
}
1719

1820
pub fn from_secs(secs: u64) -> Self {
19-
Self(Duration::from_secs(secs))
21+
Self(secs * NANOS_PER_SEC)
2022
}
2123

2224
pub fn checked_sub_duration(self, rhs: Duration) -> Option<Self> {
23-
Some(Self(self.0.checked_sub(rhs)?))
25+
Some(Self(self.0.checked_sub(rhs.as_nanos() as u64)?))
2426
}
25-
}
2627

27-
impl From<Duration> for Timestamp {
28-
fn from(value: Duration) -> Self {
29-
Self(value)
28+
pub fn with_resolution(self, delta: Duration) -> Self {
29+
Self(self.0.next_multiple_of(delta.as_nanos() as u64))
3030
}
3131
}
3232

3333
impl Add<Duration> for Timestamp {
3434
type Output = Timestamp;
3535

3636
fn add(self, rhs: Duration) -> Self::Output {
37-
Timestamp(self.0 + rhs)
37+
Self(self.0 + rhs.as_nanos() as u64)
3838
}
3939
}
4040

4141
impl AddAssign<Duration> for Timestamp {
4242
fn add_assign(&mut self, rhs: Duration) {
43-
self.0 += rhs;
43+
self.0 += rhs.as_nanos() as u64;
4444
}
4545
}
4646

4747
impl Sub<Timestamp> for Timestamp {
4848
type Output = Duration;
4949

5050
fn sub(self, rhs: Timestamp) -> Self::Output {
51-
self.0 - rhs.0
51+
Duration::from_nanos(self.0 - rhs.0)
5252
}
5353
}
5454

5555
impl Sub<Duration> for Timestamp {
5656
type Output = Timestamp;
5757

5858
fn sub(self, rhs: Duration) -> Self::Output {
59-
Self(self.0 - rhs)
59+
Self(self.0 - rhs.as_nanos() as u64)
6060
}
6161
}
6262

@@ -65,30 +65,20 @@ impl Serialize for Timestamp {
6565
where
6666
S: serde::Serializer,
6767
{
68-
serializer.serialize_f32(self.0.as_secs_f32())
68+
serializer.serialize_f64(self.0 as f64 / NANOS_PER_SEC as f64)
6969
}
7070
}
7171

7272
pub struct AtomicTimestamp(AtomicU64);
7373
impl AtomicTimestamp {
7474
pub fn new(val: Timestamp) -> Self {
75-
Self(AtomicU64::new(duration_to_u64(val.0)))
75+
Self(AtomicU64::new(val.0))
7676
}
7777
pub fn load(&self, order: Ordering) -> Timestamp {
78-
let val = self.0.load(order);
79-
Timestamp(u64_to_duration(val))
78+
Timestamp(self.0.load(order))
8079
}
8180

8281
pub fn store(&self, val: Timestamp, order: Ordering) {
83-
let val = duration_to_u64(val.0);
84-
self.0.store(val, order);
82+
self.0.store(val.0, order);
8583
}
8684
}
87-
88-
fn u64_to_duration(val: u64) -> Duration {
89-
Duration::from_nanos(val)
90-
}
91-
92-
fn duration_to_u64(val: Duration) -> u64 {
93-
val.as_nanos() as u64
94-
}

sim-rs/sim-core/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub struct RawParameters {
6060
pub leios_variant: LeiosVariant,
6161
pub relay_strategy: RelayStrategy,
6262
pub simulate_transactions: bool,
63+
pub timestamp_resolution_ms: f64,
6364

6465
// Leios protocol configuration
6566
pub leios_stage_length_slots: u64,
@@ -447,6 +448,7 @@ impl MockTransactionConfig {
447448
#[derive(Debug, Clone)]
448449
pub struct SimConfiguration {
449450
pub seed: u64,
451+
pub timestamp_resolution: Duration,
450452
pub slots: Option<u64>,
451453
pub emit_conformance_events: bool,
452454
pub aggregate_events: bool,
@@ -492,6 +494,7 @@ impl SimConfiguration {
492494
}
493495
Ok(Self {
494496
seed: 0,
497+
timestamp_resolution: duration_ms(params.timestamp_resolution_ms),
495498
slots: None,
496499
emit_conformance_events: false,
497500
aggregate_events: false,

0 commit comments

Comments
 (0)