Skip to content

Commit b74d4a5

Browse files
author
Dylan Wolff
committed
Adding TimeModels to Shuttle
1 parent 668c1e0 commit b74d4a5

File tree

12 files changed

+1283
-32
lines changed

12 files changed

+1283
-32
lines changed

shuttle/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ cfg-if = "1.0"
1515
generator = "0.8.1"
1616
hex = "0.4.2"
1717
owo-colors = "3.5.0"
18+
pin-project = "1.1.3"
1819
rand_core = "0.6.4"
1920
rand = "0.8.5"
2021
rand_pcg = "0.3.1"
@@ -38,7 +39,6 @@ tempfile = "3.2.0"
3839
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
3940
tracing-subscriber = { version = "0.3.9", features = ["env-filter"] }
4041
trybuild = "1.0"
41-
pin-project = "1.1.3"
4242
# The following line is necessary to ensure vector-clocks are used for integration tests
4343
# To run performance tests without vector clocks using `cargo bench`, this line must be commented out
4444
shuttle = { path = ".", features = ["vector-clocks"] }

shuttle/src/runtime/execution.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::runtime::task::labels::Labels;
55
use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
66
use crate::runtime::thread::continuation::PooledContinuation;
77
use crate::scheduler::{Schedule, Scheduler};
8+
use crate::sync::time::TimeModel;
89
use crate::sync::{ResourceSignature, ResourceType};
910
use crate::thread::thread_fn;
1011
use crate::{Config, MaxSteps};
@@ -48,15 +49,21 @@ thread_local! {
4849
/// static variable, but clients get access to it by calling `ExecutionState::with`.
4950
pub(crate) struct Execution {
5051
scheduler: Rc<RefCell<dyn Scheduler>>,
52+
time_model: Rc<RefCell<dyn TimeModel>>,
5153
initial_schedule: Schedule,
5254
}
5355

5456
impl Execution {
5557
/// Construct a new execution that will use the given scheduler. The execution should then be
5658
/// invoked via its `run` method, which takes as input the closure for task 0.
57-
pub(crate) fn new(scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
59+
pub(crate) fn new(
60+
scheduler: Rc<RefCell<dyn Scheduler>>,
61+
initial_schedule: Schedule,
62+
time_model: Rc<RefCell<dyn TimeModel>>,
63+
) -> Self {
5864
Self {
5965
scheduler,
66+
time_model,
6067
initial_schedule,
6168
}
6269
}
@@ -77,6 +84,7 @@ impl Execution {
7784
let state = RefCell::new(ExecutionState::new(
7885
config.clone(),
7986
Rc::clone(&self.scheduler),
87+
Rc::clone(&self.time_model),
8088
self.initial_schedule.clone(),
8189
));
8290

@@ -95,6 +103,7 @@ impl Execution {
95103

96104
// Cleanup the state before it goes out of `EXECUTION_STATE` scope
97105
ExecutionState::cleanup();
106+
self.time_model.borrow_mut().reset();
98107
});
99108
}
100109

@@ -281,6 +290,10 @@ pub(crate) struct ExecutionState {
281290
// Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation
282291
// on each scheduling decision. Should not be used outside of the `schedule` function
283292
runnable_tasks: Vec<*const Task>,
293+
294+
// Counter for unique timing resource ids (Sleeps, Timeouts and Intervals)
295+
pub(crate) timer_id_counter: u64,
296+
pub(crate) time_model: Rc<RefCell<dyn TimeModel>>,
284297
}
285298

286299
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -305,7 +318,12 @@ impl ScheduledTask {
305318
}
306319

307320
impl ExecutionState {
308-
fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
321+
fn new(
322+
config: Config,
323+
scheduler: Rc<RefCell<dyn Scheduler>>,
324+
time_model: Rc<RefCell<dyn TimeModel>>,
325+
initial_schedule: Schedule,
326+
) -> Self {
309327
Self {
310328
config,
311329
tasks: SmallVec::new(),
@@ -322,17 +340,21 @@ impl ExecutionState {
322340
has_cleaned_up: false,
323341
top_level_span: tracing::Span::current(),
324342
runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS),
343+
time_model,
344+
timer_id_counter: 0,
325345
}
326346
}
327347

328348
/// Invoke a closure with access to the current execution state. Library code uses this to gain
329349
/// access to the state of the execution to influence scheduling (e.g. to register a task as
330350
/// blocked).
331351
#[inline]
352+
#[track_caller]
332353
pub(crate) fn with<F, T>(f: F) -> T
333354
where
334355
F: FnOnce(&mut ExecutionState) -> T,
335356
{
357+
trace!("ExecutionState::with from {}", Location::caller());
336358
Self::try_with(f).expect("Shuttle internal error: cannot access ExecutionState. are you trying to access a Shuttle primitive from outside a Shuttle test?")
337359
}
338360

@@ -550,6 +572,8 @@ impl ExecutionState {
550572
TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear());
551573
LABELS.with(|cell| cell.borrow_mut().clear());
552574

575+
Self::with(|s| s.timer_id_counter = 0);
576+
553577
#[cfg(debug_assertions)]
554578
Self::with(|state| state.has_cleaned_up = true);
555579

@@ -666,11 +690,19 @@ impl ExecutionState {
666690
Self::with(|state| state.context_switches)
667691
}
668692

693+
pub(crate) fn num_tasks(&self) -> usize {
694+
self.tasks.len()
695+
}
696+
669697
#[track_caller]
670698
pub(crate) fn new_resource_signature(resource_type: ResourceType) -> ResourceSignature {
671699
ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type))
672700
}
673701

702+
pub(crate) fn num_runnable() -> usize {
703+
Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count())
704+
}
705+
674706
pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> {
675707
self.storage
676708
.get(key.into())

shuttle/src/runtime/runner.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::runtime::task::{Task, TaskId};
33
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
44
use crate::scheduler::metrics::MetricsScheduler;
55
use crate::scheduler::{Schedule, Scheduler};
6+
use crate::sync::time::{frozen::FrozenTimeModel, TimeModel};
67
use crate::Config;
78
use std::cell::RefCell;
89
use std::fmt;
@@ -52,18 +53,33 @@ impl Drop for ResetSpanOnDrop {
5253
/// function as many times as dictated by the scheduler; each execution has its scheduling decisions
5354
/// resolved by the scheduler, which can make different choices for each execution.
5455
#[derive(Debug)]
55-
pub struct Runner<S: ?Sized + Scheduler> {
56+
pub struct Runner<S: ?Sized + Scheduler, T: TimeModel> {
5657
scheduler: Rc<RefCell<MetricsScheduler<S>>>,
58+
time_model: Rc<RefCell<T>>,
5759
config: Config,
5860
}
5961

60-
impl<S: Scheduler + 'static> Runner<S> {
62+
impl<S: Scheduler + 'static> Runner<S, FrozenTimeModel> {
6163
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
6264
pub fn new(scheduler: S, config: Config) -> Self {
6365
let metrics_scheduler = MetricsScheduler::new(scheduler);
6466

6567
Self {
6668
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
69+
time_model: Rc::new(RefCell::new(FrozenTimeModel::new())),
70+
config,
71+
}
72+
}
73+
}
74+
75+
impl<S: Scheduler + 'static, T: TimeModel + 'static> Runner<S, T> {
76+
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
77+
pub fn new_with_time_model(scheduler: S, time_model: T, config: Config) -> Self {
78+
let metrics_scheduler = MetricsScheduler::new(scheduler);
79+
80+
Self {
81+
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
82+
time_model: Rc::new(RefCell::new(time_model)),
6783
config,
6884
}
6985
}
@@ -96,7 +112,7 @@ impl<S: Scheduler + 'static> Runner<S> {
96112
Some(s) => s,
97113
};
98114

99-
let execution = Execution::new(self.scheduler.clone(), schedule);
115+
let execution = Execution::new(self.scheduler.clone(), schedule, self.time_model.clone());
100116
let f = Arc::clone(&f);
101117

102118
// This is a slightly lazy way to ensure that everything outside of the "execution" span gets
@@ -122,7 +138,6 @@ pub struct PortfolioRunner {
122138
stop_on_first_failure: bool,
123139
config: Config,
124140
}
125-
126141
impl PortfolioRunner {
127142
/// Construct a new `PortfolioRunner` with no schedulers. If `stop_on_first_failure` is true,
128143
/// all schedulers will be terminated as soon as any fails; if false, they will keep running

shuttle/src/runtime/thread/continuation.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,18 @@ unsafe impl Send for PooledContinuation {}
257257
/// Possibly yield back to the executor to perform a context switch.
258258
pub(crate) fn switch() {
259259
crate::annotations::record_tick();
260+
// While there are no runnable tasks and tasks are able to be woken by the time model, continue waking tasks
261+
while ExecutionState::num_runnable() == 0
262+
&& ExecutionState::with(|s| Rc::clone(&s.time_model))
263+
.borrow_mut()
264+
.wake_next()
265+
{}
266+
260267
if ExecutionState::maybe_yield() {
261268
let r = generator::yield_(ContinuationOutput::Yielded).unwrap();
262269
assert!(matches!(r, ContinuationInput::Resume));
263270
}
271+
ExecutionState::with(|s| Rc::clone(&s.time_model)).borrow_mut().step();
264272
}
265273

266274
#[cfg(test)]

shuttle/src/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod mpsc;
77
mod mutex;
88
mod once;
99
mod rwlock;
10+
pub mod time;
1011

1112
pub use barrier::{Barrier, BarrierWaitResult};
1213
pub use condvar::{Condvar, WaitTimeoutResult};
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::{
2+
cmp::{max, Reverse},
3+
collections::{BinaryHeap, HashMap},
4+
task::Waker,
5+
};
6+
7+
use tracing::{debug, warn};
8+
9+
use crate::{current::TaskId, runtime::execution::ExecutionState};
10+
11+
use super::{Duration, Instant, TimeDistribution, TimeModel};
12+
13+
/// A time model where time advances by a constant amount for each scheduling step
14+
#[derive(Clone, Debug)]
15+
pub struct ConstantSteppedTimeModel {
16+
distribution: ConstantTimeDistribution,
17+
current_step_size: std::time::Duration,
18+
current_time_elapsed: std::time::Duration,
19+
waiters: BinaryHeap<Reverse<(std::time::Duration, TaskId, u64)>>,
20+
wakers: HashMap<u64, Waker>,
21+
}
22+
23+
unsafe impl Send for ConstantSteppedTimeModel {}
24+
25+
impl ConstantSteppedTimeModel {
26+
/// Create a ConstantSteppedTimeModel
27+
pub fn new(step_size: std::time::Duration) -> Self {
28+
let distribution = ConstantTimeDistribution::new(step_size);
29+
Self {
30+
distribution,
31+
current_step_size: distribution.sample(),
32+
current_time_elapsed: std::time::Duration::from_secs(0),
33+
waiters: BinaryHeap::new(),
34+
wakers: HashMap::new(),
35+
}
36+
}
37+
38+
fn unblock_expired(&mut self) {
39+
while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| {
40+
if *t <= self.current_time_elapsed {
41+
Some(*sleep_id)
42+
} else {
43+
None
44+
}
45+
}) {
46+
_ = self.waiters.pop();
47+
if let Some(waker) = self.wakers.remove(&waker_key) {
48+
waker.wake();
49+
}
50+
}
51+
}
52+
53+
/// Get the currently sleeping tasks and deadlines. May contain duplicates
54+
pub fn get_waiters(&self) -> &[Reverse<(std::time::Duration, TaskId, u64)>] {
55+
self.waiters.as_slice()
56+
}
57+
58+
/// Manually wake a task without affecting the global clock
59+
pub fn wake_frozen(&mut self, sleep_id: u64) {
60+
if let Some(waker) = self.wakers.remove(&sleep_id) {
61+
waker.wake();
62+
}
63+
}
64+
}
65+
66+
impl TimeModel for ConstantSteppedTimeModel {
67+
fn pause(&mut self) {
68+
warn!("Pausing stepped model has no effect")
69+
}
70+
71+
fn resume(&mut self) {
72+
warn!("Resuming stepped model has no effect")
73+
}
74+
75+
fn step(&mut self) {
76+
debug!("step");
77+
self.current_time_elapsed += self.current_step_size;
78+
self.unblock_expired();
79+
}
80+
81+
fn reset(&mut self) {
82+
self.current_step_size = self.distribution.sample();
83+
self.current_time_elapsed = std::time::Duration::from_secs(0);
84+
self.waiters.clear();
85+
self.wakers.clear();
86+
}
87+
88+
fn instant(&self) -> Instant {
89+
Instant::Simulated(self.current_time_elapsed)
90+
}
91+
92+
fn wake_next(&mut self) -> bool {
93+
if self.waiters.is_empty() {
94+
return false;
95+
}
96+
if let Some(Reverse((time, _, _))) = self.waiters.peek() {
97+
self.current_time_elapsed = max(self.current_time_elapsed, *time);
98+
}
99+
self.unblock_expired();
100+
true
101+
}
102+
103+
fn advance(&mut self, dur: Duration) {
104+
self.current_time_elapsed += dur.unwrap_std();
105+
}
106+
107+
fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option<Waker>) -> bool {
108+
let deadline = deadline.unwrap_simulated();
109+
if deadline <= self.current_time_elapsed {
110+
return true;
111+
}
112+
113+
if let Some(waker) = waker {
114+
let task_id = ExecutionState::with(|s| s.current().id());
115+
let item = (deadline, task_id, sleep_id);
116+
self.waiters.push(Reverse(item));
117+
self.wakers.insert(sleep_id, waker);
118+
}
119+
false
120+
}
121+
122+
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
123+
self
124+
}
125+
}
126+
127+
/// A constant distribution; each sample returns the same time
128+
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
129+
pub struct ConstantTimeDistribution {
130+
/// The time that will be returned on sampling
131+
pub time: std::time::Duration,
132+
}
133+
134+
impl ConstantTimeDistribution {
135+
/// Create a new constant time distribution
136+
pub fn new(time: std::time::Duration) -> Self {
137+
Self { time }
138+
}
139+
}
140+
141+
impl TimeDistribution<std::time::Duration> for ConstantTimeDistribution {
142+
fn sample(&self) -> std::time::Duration {
143+
self.time
144+
}
145+
}

0 commit comments

Comments
 (0)