Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion shuttle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bitvec = "1.0.1"
cfg-if = "1.0"
hex = "0.4.2"
owo-colors = "3.5.0"
pin-project = "1.1.3"
rand_core = "0.6.4"
rand = "0.8.5"
rand_pcg = "0.3.1"
Expand All @@ -38,7 +39,6 @@ tempfile = "3.2.0"
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "0.3.9", features = ["env-filter"] }
trybuild = "1.0"
pin-project = "1.1.3"
# The following line is necessary to ensure vector-clocks are used for integration tests
# To run performance tests without vector clocks using `cargo bench`, this line must be commented out
shuttle = { path = ".", features = ["vector-clocks"] }
Expand All @@ -55,6 +55,7 @@ annotation = ["dep:serde", "dep:serde_json", "dep:regex"]
# are otherwise always enabled via a dev-dependency to ensure all *test* assertions utilizing vector
# clocks behave correctly during testing
bench-no-vector-clocks = []
advanced-time-models = []

[[bench]]
name = "lock"
Expand Down
1 change: 1 addition & 0 deletions shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ pub mod lazy_static;
pub mod rand;
pub mod sync;
pub mod thread;
pub mod time;

pub mod current;
pub mod scheduler;
Expand Down
41 changes: 38 additions & 3 deletions shuttle/src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::runtime::thread::continuation::PooledContinuation;
use crate::scheduler::{Schedule, Scheduler};
use crate::sync::{ResourceSignature, ResourceType};
use crate::thread::thread_fn;
use crate::time::{get_time_model, TimeModel};
use crate::{backtrace_enabled, Config, MaxSteps, UNGRACEFUL_SHUTDOWN_CONFIG};
use scoped_tls::scoped_thread_local;
use smallvec::SmallVec;
Expand Down Expand Up @@ -91,15 +92,21 @@ thread_local! {
/// static variable, but clients get access to it by calling `ExecutionState::with`.
pub(crate) struct Execution {
scheduler: Rc<RefCell<dyn Scheduler>>,
time_model: Rc<RefCell<dyn TimeModel>>,
initial_schedule: Schedule,
}

impl Execution {
/// Construct a new execution that will use the given scheduler. The execution should then be
/// invoked via its `run` method, which takes as input the closure for task 0.
pub(crate) fn new(scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
pub(crate) fn new(
scheduler: Rc<RefCell<dyn Scheduler>>,
initial_schedule: Schedule,
time_model: Rc<RefCell<dyn TimeModel>>,
) -> Self {
Self {
scheduler,
time_model,
initial_schedule,
}
}
Expand Down Expand Up @@ -130,6 +137,15 @@ impl StepError {
}
}

/// While there are no runnable tasks and tasks are able to be woken by the time model, continually wakes tasks.
/// The TimeModel's `wake_next` method is called in a loop in case there are stale wakers which do not actually
/// result in a newly runnable task when woken. Requires access to the ExecutionState to obtain a reference to the
/// time model.
fn wake_sleepers_until_runnable() {
let time_model = get_time_model();
while ExecutionState::num_runnable() == 0 && time_model.borrow_mut().wake_next() {}
}

impl Execution {
/// Run a function to be tested, taking control of scheduling it and any tasks it might spawn.
/// This function runs until `f` and all tasks spawned by `f` have terminated, or until the
Expand All @@ -138,7 +154,11 @@ impl Execution {
where
F: FnOnce() + Send + 'static,
{
let state = RefCell::new(ExecutionState::new(config.clone(), Rc::clone(&self.scheduler)));
let state = RefCell::new(ExecutionState::new(
config.clone(),
Rc::clone(&self.scheduler),
Rc::clone(&self.time_model),
));

init_panic_hook(config.clone());
CurrentSchedule::init(self.initial_schedule.clone());
Expand Down Expand Up @@ -245,6 +265,7 @@ impl Execution {
#[inline]
fn run_to_competion(&mut self, immediately_return_on_panic: bool) -> Result<(), StepError> {
loop {
wake_sleepers_until_runnable();
let next_step: Option<Rc<RefCell<PooledContinuation>>> = ExecutionState::with(|state| {
state.schedule()?;
state.advance_to_next_task();
Expand Down Expand Up @@ -340,6 +361,10 @@ pub(crate) struct ExecutionState {
// Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation
// on each scheduling decision. Should not be used outside of the `schedule` function
runnable_tasks: Vec<*const Task>,

// Counter for unique timing resource ids (Sleeps, Timeouts and Intervals)
pub(crate) timer_id_counter: u64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? It's never used no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used in shuttle/src/sync/time/mod.rs to differentiate sleepers with the same deadlines (see uses of increment_timer_counter())

pub(crate) time_model: Rc<RefCell<dyn TimeModel>>,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -373,7 +398,7 @@ pub enum ExecutionStateBorrowError {
}

impl ExecutionState {
fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>) -> Self {
fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>, time_model: Rc<RefCell<dyn TimeModel>>) -> Self {
Self {
config,
tasks: SmallVec::new(),
Expand All @@ -389,6 +414,8 @@ impl ExecutionState {
has_cleaned_up: false,
top_level_span: tracing::Span::current(),
runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS),
time_model,
timer_id_counter: 0,
}
}

Expand Down Expand Up @@ -665,6 +692,8 @@ impl ExecutionState {
TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear());
LABELS.with(|cell| cell.borrow_mut().clear());

Self::with(|s| s.timer_id_counter = 0);

#[cfg(debug_assertions)]
Self::with(|state| state.has_cleaned_up = true);

Expand All @@ -680,6 +709,8 @@ impl ExecutionState {
/// is different from the currently running task, indicating that the current task should yield
/// its execution.
pub(crate) fn maybe_yield() -> bool {
wake_sleepers_until_runnable();

Self::with(|state| {
if std::thread::panicking() && !state.in_cleanup {
return true;
Expand Down Expand Up @@ -791,6 +822,10 @@ impl ExecutionState {
ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type))
}

pub(crate) fn num_runnable() -> usize {
Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should improve the tracking of runnable so that this becomes O(1)

Copy link
Copy Markdown
Contributor Author

@dylanjwolff dylanjwolff Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, but I don't know if we want to block this PR on that change. if so, I should probably open another PR to do that separately.

this would be strongly related to building the set of runnable tasks incrementally, which I have a prototype of on this branch:

https://github.com/dylanjwolff/shuttle/tree/incremental-persistent-vec

but I haven't PR'ed it because I actually think the right thing to do is to change the Shuttle scheduler API so that the individual schedulers manage their own runnable task sets in whatever data structure is best for that algorithm (for example, priority queue for RP).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah no don't block the PR on this.

And yeah, lets chat about potential scheduler API changes on Monday


pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> {
self.storage
.get(key.into())
Expand Down
24 changes: 20 additions & 4 deletions shuttle/src/runtime/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::runtime::task::{Task, TaskId};
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use crate::scheduler::metrics::MetricsScheduler;
use crate::scheduler::{Schedule, Scheduler};
use crate::time::{frozen::FrozenTimeModel, TimeModel};
use crate::Config;
use std::cell::RefCell;
use std::fmt;
Expand Down Expand Up @@ -52,18 +53,33 @@ impl Drop for ResetSpanOnDrop {
/// function as many times as dictated by the scheduler; each execution has its scheduling decisions
/// resolved by the scheduler, which can make different choices for each execution.
#[derive(Debug)]
pub struct Runner<S: ?Sized + Scheduler> {
pub struct Runner<S: ?Sized + Scheduler, T: TimeModel> {
scheduler: Rc<RefCell<MetricsScheduler<S>>>,
time_model: Rc<RefCell<T>>,
config: Config,
}

impl<S: Scheduler + 'static> Runner<S> {
impl<S: Scheduler + 'static> Runner<S, FrozenTimeModel> {
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
pub fn new(scheduler: S, config: Config) -> Self {
let metrics_scheduler = MetricsScheduler::new(scheduler);

Self {
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
time_model: Rc::new(RefCell::new(FrozenTimeModel::new())),
config,
}
}
}

impl<S: Scheduler + 'static, T: TimeModel + 'static> Runner<S, T> {
/// Construct a new `Runner` that will use the given `Scheduler` to control the test.
pub fn new_with_time_model(scheduler: S, time_model: T, config: Config) -> Self {
let metrics_scheduler = MetricsScheduler::new(scheduler);

Self {
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
time_model: Rc::new(RefCell::new(time_model)),
config,
}
}
Expand Down Expand Up @@ -95,8 +111,9 @@ impl<S: Scheduler + 'static> Runner<S> {
None => break,
Some(s) => s,
};
self.time_model.borrow_mut().new_execution();

let execution = Execution::new(self.scheduler.clone(), schedule);
let execution = Execution::new(self.scheduler.clone(), schedule, self.time_model.clone());
let f = Arc::clone(&f);

// This is a slightly lazy way to ensure that everything outside of the "execution" span gets
Expand All @@ -122,7 +139,6 @@ pub struct PortfolioRunner {
stop_on_first_failure: bool,
config: Config,
}

impl PortfolioRunner {
/// Construct a new `PortfolioRunner` with no schedulers. If `stop_on_first_failure` is true,
/// all schedulers will be terminated as soon as any fails; if false, they will keep running
Expand Down
2 changes: 2 additions & 0 deletions shuttle/src/runtime/thread/continuation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::runtime::execution::ExecutionState;
use crate::time::get_time_model;
use crate::{ContinuationFunctionBehavior, UNGRACEFUL_SHUTDOWN_CONFIG};
use corosensei::Yielder;
use corosensei::{stack::DefaultStack, Coroutine, CoroutineResult};
Expand Down Expand Up @@ -342,6 +343,7 @@ pub(crate) fn switch() {
ContinuationInput::Resume => {}
};
}
get_time_model().borrow_mut().step()
}

#[cfg(test)]
Expand Down
10 changes: 3 additions & 7 deletions shuttle/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
use crate::runtime::execution::ExecutionState;
use crate::runtime::task::TaskId;
use crate::runtime::thread;
use crate::time::Duration;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::panic::Location;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;

pub use crate::time::sleep;
pub use std::thread::{panicking, Result};

/// A unique identifier for a running thread
Expand Down Expand Up @@ -348,12 +350,6 @@ pub fn yield_now() {
thread::switch();
}

/// Puts the current thread to sleep for at least the specified amount of time.
// Note that Shuttle does not model time, so this behaves just like a context switch.
pub fn sleep(_dur: Duration) {
thread::switch();
}

/// Get a handle to the thread that invokes it
pub fn current() -> Thread {
let (task_id, name) = ExecutionState::with(|s| {
Expand Down
107 changes: 107 additions & 0 deletions shuttle/src/time/constant_stepped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{
cmp::{max, Reverse},
collections::{BinaryHeap, HashMap},
task::Waker,
};

use tracing::{trace, warn};

use crate::{current::TaskId, runtime::execution::ExecutionState, time::WakerRegistered};

use super::{Duration, Instant, TimeModel};

/// A time model where time advances by a constant amount for each scheduling step
#[derive(Clone, Debug)]
pub struct ConstantSteppedTimeModel {
step_size: std::time::Duration,
current_time_elapsed: std::time::Duration,
waiters: BinaryHeap<Reverse<(std::time::Duration, TaskId, u64)>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a super fan of the untypedness of hits

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ie what are the fields, in particular the u64)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just the sleep ID

wakers: HashMap<u64, Waker>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we separated the waiters and the wakers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember, but I think it is because of type restrictions for what you can put on a binary heap in Rust (PartialOrd).

}

unsafe impl Send for ConstantSteppedTimeModel {}

impl ConstantSteppedTimeModel {
/// Create a ConstantSteppedTimeModel
pub fn new(step_size: std::time::Duration) -> Self {
Self {
step_size,
current_time_elapsed: std::time::Duration::from_secs(0),
waiters: BinaryHeap::new(),
wakers: HashMap::new(),
}
}

// Unblocks expired. Returns `true` if any were woken.
fn unblock_expired(&mut self) -> bool {
let mut out = false;
while let Some(waker_key) = self.waiters.peek().and_then(|Reverse((t, _, sleep_id))| {
if *t <= self.current_time_elapsed {
Some(*sleep_id)
} else {
None
}
}) {
_ = self.waiters.pop();
if let Some(waker) = self.wakers.remove(&waker_key) {
waker.wake();
}
out = true
}
out
}
}

impl TimeModel for ConstantSteppedTimeModel {
fn pause(&mut self) {
warn!("Pausing stepped model has no effect")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely pausing ConstantSteppedTimeModel means that it becomes the FrozenTimeModel ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it probably should. Maybe there's an argument that you shouldn't be manually pausing/unpausing the non-frozen models because it's error prone?

}

fn resume(&mut self) {
warn!("Resuming stepped model has no effect")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as for pause

}

fn step(&mut self) {
self.current_time_elapsed += self.step_size;
trace!("time step to {:?}", self.current_time_elapsed);
self.unblock_expired();
}

fn new_execution(&mut self) {
self.current_time_elapsed = std::time::Duration::from_secs(0);
self.waiters.clear();
self.wakers.clear();
}

fn instant(&self) -> Instant {
Instant::Simulated(self.current_time_elapsed)
}

fn wake_next(&mut self) -> bool {
if let Some(Reverse((time, _, _))) = self.waiters.peek() {
self.current_time_elapsed = max(self.current_time_elapsed, *time);
self.unblock_expired();
true
} else {
false
}
}

fn advance(&mut self, dur: Duration) {
self.current_time_elapsed += dur;
}

fn register_sleep(&mut self, deadline: Instant, sleep_id: u64, waker: Waker) -> WakerRegistered {
let deadline = deadline.unwrap_simulated();
if deadline <= self.current_time_elapsed {
return WakerRegistered::NotRegistered;
}

let task_id = ExecutionState::with(|s| s.current().id());
let item = (deadline, task_id, sleep_id);
self.waiters.push(Reverse(item));
self.wakers.insert(sleep_id, waker);

WakerRegistered::Registered
}
}
Loading