Skip to content

Commit 8d2436f

Browse files
author
Dylan Wolff
committed
Adding TimeModels to Shuttle
1 parent da33d50 commit 8d2436f

File tree

12 files changed

+1563
-33
lines changed

12 files changed

+1563
-33
lines changed

shuttle/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ cfg-if = "1.0"
1515
generator = "0.8.1"
1616
hex = "0.4.2"
1717
owo-colors = "3.5.0"
18+
pin-project = "1.1.3"
1819
rand_core = "0.6.4"
1920
rand = "0.8.5"
2021
rand_pcg = "0.3.1"
@@ -38,7 +39,6 @@ tempfile = "3.2.0"
3839
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
3940
tracing-subscriber = { version = "0.3.9", features = ["env-filter"] }
4041
trybuild = "1.0"
41-
pin-project = "1.1.3"
4242
# The following line is necessary to ensure vector-clocks are used for integration tests
4343
# To run performance tests without vector clocks using `cargo bench`, this line must be commented out
4444
shuttle = { path = ".", features = ["vector-clocks"] }
@@ -55,6 +55,7 @@ annotation = ["dep:serde", "dep:serde_json", "dep:regex"]
5555
# are otherwise always enabled via a dev-dependency to ensure all *test* assertions utilizing vector
5656
# clocks behave correctly during testing
5757
bench-no-vector-clocks = []
58+
advanced-time-models = []
5859

5960
[[bench]]
6061
name = "lock"

shuttle/src/runtime/execution.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::runtime::task::labels::Labels;
55
use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
66
use crate::runtime::thread::continuation::PooledContinuation;
77
use crate::scheduler::{Schedule, Scheduler};
8+
use crate::sync::time::TimeModel;
89
use crate::sync::{ResourceSignature, ResourceType};
910
use crate::thread::thread_fn;
1011
use crate::{backtrace_enabled, Config, MaxSteps};
@@ -90,15 +91,21 @@ thread_local! {
9091
/// static variable, but clients get access to it by calling `ExecutionState::with`.
9192
pub(crate) struct Execution {
9293
scheduler: Rc<RefCell<dyn Scheduler>>,
94+
time_model: Rc<RefCell<dyn TimeModel>>,
9395
initial_schedule: Schedule,
9496
}
9597

9698
impl Execution {
9799
/// Construct a new execution that will use the given scheduler. The execution should then be
98100
/// invoked via its `run` method, which takes as input the closure for task 0.
99-
pub(crate) fn new(scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
101+
pub(crate) fn new(
102+
scheduler: Rc<RefCell<dyn Scheduler>>,
103+
initial_schedule: Schedule,
104+
time_model: Rc<RefCell<dyn TimeModel>>,
105+
) -> Self {
100106
Self {
101107
scheduler,
108+
time_model,
102109
initial_schedule,
103110
}
104111
}
@@ -137,7 +144,11 @@ impl Execution {
137144
where
138145
F: FnOnce() + Send + 'static,
139146
{
140-
let state = RefCell::new(ExecutionState::new(config.clone(), Rc::clone(&self.scheduler)));
147+
let state = RefCell::new(ExecutionState::new(
148+
config.clone(),
149+
Rc::clone(&self.scheduler),
150+
Rc::clone(&self.time_model),
151+
));
141152

142153
init_panic_hook(config.clone());
143154
CurrentSchedule::init(self.initial_schedule.clone());
@@ -243,6 +254,12 @@ impl Execution {
243254
#[inline]
244255
fn run_to_competion(&mut self, immediately_return_on_panic: bool) -> Result<(), StepError> {
245256
loop {
257+
// While there are no runnable tasks and tasks are able to be woken by the time model, continue waking tasks
258+
while ExecutionState::num_runnable() == 0
259+
&& ExecutionState::with(|s| Rc::clone(&s.time_model))
260+
.borrow_mut()
261+
.wake_next()
262+
{}
246263
let next_step: Option<Rc<RefCell<PooledContinuation>>> = ExecutionState::with(|state| {
247264
state.schedule()?;
248265
state.advance_to_next_task();
@@ -338,6 +355,10 @@ pub(crate) struct ExecutionState {
338355
// Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation
339356
// on each scheduling decision. Should not be used outside of the `schedule` function
340357
runnable_tasks: Vec<*const Task>,
358+
359+
// Counter for unique timing resource ids (Sleeps, Timeouts and Intervals)
360+
pub(crate) timer_id_counter: u64,
361+
pub(crate) time_model: Rc<RefCell<dyn TimeModel>>,
341362
}
342363

343364
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -362,7 +383,11 @@ impl ScheduledTask {
362383
}
363384

364385
impl ExecutionState {
365-
fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>) -> Self {
386+
fn new(
387+
config: Config,
388+
scheduler: Rc<RefCell<dyn Scheduler>>,
389+
time_model: Rc<RefCell<dyn TimeModel>>,
390+
) -> Self {
366391
Self {
367392
config,
368393
tasks: SmallVec::new(),
@@ -378,17 +403,21 @@ impl ExecutionState {
378403
has_cleaned_up: false,
379404
top_level_span: tracing::Span::current(),
380405
runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS),
406+
time_model,
407+
timer_id_counter: 0,
381408
}
382409
}
383410

384411
/// Invoke a closure with access to the current execution state. Library code uses this to gain
385412
/// access to the state of the execution to influence scheduling (e.g. to register a task as
386413
/// blocked).
387414
#[inline]
415+
#[track_caller]
388416
pub(crate) fn with<F, T>(f: F) -> T
389417
where
390418
F: FnOnce(&mut ExecutionState) -> T,
391419
{
420+
trace!("ExecutionState::with from {}", Location::caller());
392421
Self::try_with(f).expect("Shuttle internal error: cannot access ExecutionState. are you trying to access a Shuttle primitive from outside a Shuttle test?")
393422
}
394423

@@ -604,6 +633,8 @@ impl ExecutionState {
604633
TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear());
605634
LABELS.with(|cell| cell.borrow_mut().clear());
606635

636+
Self::with(|s| s.timer_id_counter = 0);
637+
607638
#[cfg(debug_assertions)]
608639
Self::with(|state| state.has_cleaned_up = true);
609640

@@ -619,6 +650,13 @@ impl ExecutionState {
619650
/// is different from the currently running task, indicating that the current task should yield
620651
/// its execution.
621652
pub(crate) fn maybe_yield() -> bool {
653+
// While there are no runnable tasks and tasks are able to be woken by the time model, continue waking tasks
654+
while ExecutionState::num_runnable() == 0
655+
&& ExecutionState::with(|s| Rc::clone(&s.time_model))
656+
.borrow_mut()
657+
.wake_next()
658+
{}
659+
622660
Self::with(|state| {
623661
if std::thread::panicking() {
624662
return true;
@@ -723,11 +761,19 @@ impl ExecutionState {
723761
Self::with(|state| state.context_switches)
724762
}
725763

764+
pub(crate) fn num_tasks(&self) -> usize {
765+
self.tasks.len()
766+
}
767+
726768
#[track_caller]
727769
pub(crate) fn new_resource_signature(resource_type: ResourceType) -> ResourceSignature {
728770
ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type))
729771
}
730772

773+
pub(crate) fn num_runnable() -> usize {
774+
Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count())
775+
}
776+
731777
pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> {
732778
self.storage
733779
.get(key.into())

shuttle/src/runtime/runner.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::runtime::task::{Task, TaskId};
33
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
44
use crate::scheduler::metrics::MetricsScheduler;
55
use crate::scheduler::{Schedule, Scheduler};
6+
use crate::sync::time::{frozen::FrozenTimeModel, TimeModel};
67
use crate::Config;
78
use std::cell::RefCell;
89
use std::fmt;
@@ -52,18 +53,33 @@ impl Drop for ResetSpanOnDrop {
5253
/// function as many times as dictated by the scheduler; each execution has its scheduling decisions
5354
/// resolved by the scheduler, which can make different choices for each execution.
5455
#[derive(Debug)]
55-
pub struct Runner<S: ?Sized + Scheduler> {
56+
pub struct Runner<S: ?Sized + Scheduler, T: TimeModel> {
5657
scheduler: Rc<RefCell<MetricsScheduler<S>>>,
58+
time_model: Rc<RefCell<T>>,
5759
config: Config,
5860
}
5961

60-
impl<S: Scheduler + 'static> Runner<S> {
62+
impl<S: Scheduler + 'static> Runner<S, FrozenTimeModel> {
6163
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
6264
pub fn new(scheduler: S, config: Config) -> Self {
6365
let metrics_scheduler = MetricsScheduler::new(scheduler);
6466

6567
Self {
6668
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
69+
time_model: Rc::new(RefCell::new(FrozenTimeModel::new())),
70+
config,
71+
}
72+
}
73+
}
74+
75+
impl<S: Scheduler + 'static, T: TimeModel + 'static> Runner<S, T> {
76+
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
77+
pub fn new_with_time_model(scheduler: S, time_model: T, config: Config) -> Self {
78+
let metrics_scheduler = MetricsScheduler::new(scheduler);
79+
80+
Self {
81+
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
82+
time_model: Rc::new(RefCell::new(time_model)),
6783
config,
6884
}
6985
}
@@ -96,7 +112,7 @@ impl<S: Scheduler + 'static> Runner<S> {
96112
Some(s) => s,
97113
};
98114

99-
let execution = Execution::new(self.scheduler.clone(), schedule);
115+
let execution = Execution::new(self.scheduler.clone(), schedule, self.time_model.clone());
100116
let f = Arc::clone(&f);
101117

102118
// This is a slightly lazy way to ensure that everything outside of the "execution" span gets
@@ -122,7 +138,6 @@ pub struct PortfolioRunner {
122138
stop_on_first_failure: bool,
123139
config: Config,
124140
}
125-
126141
impl PortfolioRunner {
127142
/// Construct a new `PortfolioRunner` with no schedulers. If `stop_on_first_failure` is true,
128143
/// all schedulers will be terminated as soon as any fails; if false, they will keep running

shuttle/src/runtime/thread/continuation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,12 @@ unsafe impl Send for PooledContinuation {}
257257
/// Possibly yield back to the executor to perform a context switch.
258258
pub(crate) fn switch() {
259259
crate::annotations::record_tick();
260+
260261
if ExecutionState::maybe_yield() {
261262
let r = generator::yield_(ContinuationOutput::Yielded).unwrap();
262263
assert!(matches!(r, ContinuationInput::Resume));
263264
}
265+
ExecutionState::with(|s| Rc::clone(&s.time_model)).borrow_mut().step();
264266
}
265267

266268
#[cfg(test)]

shuttle/src/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod mpsc;
77
mod mutex;
88
mod once;
99
mod rwlock;
10+
pub mod time;
1011

1112
pub use barrier::{Barrier, BarrierWaitResult};
1213
pub use condvar::{Condvar, WaitTimeoutResult};
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
use std::{
2+
cmp::{max, Reverse},
3+
collections::{BinaryHeap, HashMap},
4+
task::Waker,
5+
};
6+
7+
use tracing::{debug, warn};
8+
9+
use crate::{current::TaskId, runtime::execution::ExecutionState};
10+
11+
use super::{Duration, Instant, TimeDistribution, TimeModel};
12+
13+
/// A time model where time advances by a constant amount for each scheduling step
14+
#[derive(Clone, Debug)]
15+
pub struct ConstantSteppedTimeModel {
16+
distribution: ConstantTimeDistribution,
17+
current_step_size: std::time::Duration,
18+
current_time_elapsed: std::time::Duration,
19+
waiters: BinaryHeap<Reverse<(std::time::Duration, TaskId, u64)>>,
20+
wakers: HashMap<u64, Waker>,
21+
}
22+
23+
unsafe impl Send for ConstantSteppedTimeModel {}
24+
25+
impl ConstantSteppedTimeModel {
26+
/// Create a ConstantSteppedTimeModel
27+
pub fn new(step_size: std::time::Duration) -> Self {
28+
let distribution = ConstantTimeDistribution::new(step_size);
29+
Self {
30+
distribution,
31+
current_step_size: distribution.sample(),
32+
current_time_elapsed: std::time::Duration::from_secs(0),
33+
waiters: BinaryHeap::new(),
34+
wakers: HashMap::new(),
35+
}
36+
}
37+
38+
fn unblock_expired(&mut self) {
39+
while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| {
40+
if *t <= self.current_time_elapsed {
41+
Some(*sleep_id)
42+
} else {
43+
None
44+
}
45+
}) {
46+
_ = self.waiters.pop();
47+
if let Some(waker) = self.wakers.remove(&waker_key) {
48+
waker.wake();
49+
}
50+
}
51+
}
52+
53+
/// Get the currently sleeping tasks and deadlines. May contain duplicates
54+
pub fn get_waiters(&self) -> &[Reverse<(std::time::Duration, TaskId, u64)>] {
55+
self.waiters.as_slice()
56+
}
57+
58+
/// Manually wake a task without affecting the global clock
59+
pub fn wake_frozen(&mut self, sleep_id: u64) {
60+
if let Some(waker) = self.wakers.remove(&sleep_id) {
61+
waker.wake();
62+
}
63+
}
64+
}
65+
66+
impl TimeModel for ConstantSteppedTimeModel {
67+
fn pause(&mut self) {
68+
warn!("Pausing stepped model has no effect")
69+
}
70+
71+
fn resume(&mut self) {
72+
warn!("Resuming stepped model has no effect")
73+
}
74+
75+
fn step(&mut self) {
76+
debug!("step");
77+
self.current_time_elapsed += self.current_step_size;
78+
self.unblock_expired();
79+
}
80+
81+
fn reset(&mut self) {
82+
self.current_step_size = self.distribution.sample();
83+
self.current_time_elapsed = std::time::Duration::from_secs(0);
84+
self.waiters.clear();
85+
self.wakers.clear();
86+
}
87+
88+
fn instant(&self) -> Instant {
89+
Instant::Simulated(self.current_time_elapsed)
90+
}
91+
92+
fn wake_next(&mut self) -> bool {
93+
if self.waiters.is_empty() {
94+
return false;
95+
}
96+
if let Some(Reverse((time, _, _))) = self.waiters.peek() {
97+
self.current_time_elapsed = max(self.current_time_elapsed, *time);
98+
}
99+
self.unblock_expired();
100+
true
101+
}
102+
103+
#[allow(clippy::useless_conversion)]
104+
fn advance(&mut self, dur: Duration) {
105+
self.current_time_elapsed += dur.into();
106+
}
107+
108+
fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Option<Waker>) -> bool {
109+
let deadline = deadline.unwrap_simulated();
110+
if deadline <= self.current_time_elapsed {
111+
return true;
112+
}
113+
114+
if let Some(waker) = waker {
115+
let task_id = ExecutionState::with(|s| s.current().id());
116+
let item = (deadline, task_id, sleep_id);
117+
self.waiters.push(Reverse(item));
118+
self.wakers.insert(sleep_id, waker);
119+
}
120+
false
121+
}
122+
123+
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
124+
self
125+
}
126+
}
127+
128+
/// A constant distribution; each sample returns the same time
129+
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
130+
pub struct ConstantTimeDistribution {
131+
/// The time that will be returned on sampling
132+
pub time: std::time::Duration,
133+
}
134+
135+
impl ConstantTimeDistribution {
136+
/// Create a new constant time distribution
137+
pub fn new(time: std::time::Duration) -> Self {
138+
Self { time }
139+
}
140+
}
141+
142+
impl TimeDistribution<std::time::Duration> for ConstantTimeDistribution {
143+
fn sample(&self) -> std::time::Duration {
144+
self.time
145+
}
146+
}

0 commit comments

Comments
 (0)