-
Notifications
You must be signed in to change notification settings - Fork 46
Adding Time Models to Shuttle #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
f072dc5
0a42ef9
a06a0ea
a89ccbd
5790385
89317af
dde6347
1868371
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ use crate::runtime::thread::continuation::PooledContinuation; | |
| use crate::scheduler::{Schedule, Scheduler}; | ||
| use crate::sync::{ResourceSignature, ResourceType}; | ||
| use crate::thread::thread_fn; | ||
| use crate::time::{get_time_model, TimeModel}; | ||
| use crate::{backtrace_enabled, Config, MaxSteps, UNGRACEFUL_SHUTDOWN_CONFIG}; | ||
| use scoped_tls::scoped_thread_local; | ||
| use smallvec::SmallVec; | ||
|
|
@@ -91,15 +92,21 @@ thread_local! { | |
| /// static variable, but clients get access to it by calling `ExecutionState::with`. | ||
| pub(crate) struct Execution { | ||
| scheduler: Rc<RefCell<dyn Scheduler>>, | ||
| time_model: Rc<RefCell<dyn TimeModel>>, | ||
| initial_schedule: Schedule, | ||
| } | ||
|
|
||
| impl Execution { | ||
| /// Construct a new execution that will use the given scheduler. The execution should then be | ||
| /// invoked via its `run` method, which takes as input the closure for task 0. | ||
| pub(crate) fn new(scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self { | ||
| pub(crate) fn new( | ||
| scheduler: Rc<RefCell<dyn Scheduler>>, | ||
| initial_schedule: Schedule, | ||
| time_model: Rc<RefCell<dyn TimeModel>>, | ||
| ) -> Self { | ||
| Self { | ||
| scheduler, | ||
| time_model, | ||
| initial_schedule, | ||
| } | ||
| } | ||
|
|
@@ -130,6 +137,15 @@ impl StepError { | |
| } | ||
| } | ||
|
|
||
| /// While there are no runnable tasks and tasks are able to be woken by the time model, continually wakes tasks. | ||
| /// The TimeModel's `wake_next` method is called in a loop in case there are stale wakers which do not actually | ||
| /// result in a newly runnable task when woken. Requires access to the ExecutionState to obtain a reference to the | ||
| /// time model. | ||
| fn wake_sleepers_until_runnable() { | ||
| let tm = get_time_model(); | ||
| while ExecutionState::num_runnable() == 0 && tm.borrow_mut().wake_next() {} | ||
| } | ||
|
|
||
| impl Execution { | ||
| /// Run a function to be tested, taking control of scheduling it and any tasks it might spawn. | ||
| /// This function runs until `f` and all tasks spawned by `f` have terminated, or until the | ||
|
|
@@ -138,7 +154,11 @@ impl Execution { | |
| where | ||
| F: FnOnce() + Send + 'static, | ||
| { | ||
| let state = RefCell::new(ExecutionState::new(config.clone(), Rc::clone(&self.scheduler))); | ||
| let state = RefCell::new(ExecutionState::new( | ||
| config.clone(), | ||
| Rc::clone(&self.scheduler), | ||
| Rc::clone(&self.time_model), | ||
| )); | ||
|
|
||
| init_panic_hook(config.clone()); | ||
| CurrentSchedule::init(self.initial_schedule.clone()); | ||
|
|
@@ -245,6 +265,7 @@ impl Execution { | |
| #[inline] | ||
| fn run_to_competion(&mut self, immediately_return_on_panic: bool) -> Result<(), StepError> { | ||
| loop { | ||
| wake_sleepers_until_runnable(); | ||
| let next_step: Option<Rc<RefCell<PooledContinuation>>> = ExecutionState::with(|state| { | ||
| state.schedule()?; | ||
| state.advance_to_next_task(); | ||
|
|
@@ -340,6 +361,10 @@ pub(crate) struct ExecutionState { | |
| // Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation | ||
| // on each scheduling decision. Should not be used outside of the `schedule` function | ||
| runnable_tasks: Vec<*const Task>, | ||
|
|
||
| // Counter for unique timing resource ids (Sleeps, Timeouts and Intervals) | ||
| pub(crate) timer_id_counter: u64, | ||
| pub(crate) time_model: Rc<RefCell<dyn TimeModel>>, | ||
| } | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Clone, Copy)] | ||
|
|
@@ -373,7 +398,7 @@ pub enum ExecutionStateBorrowError { | |
| } | ||
|
|
||
| impl ExecutionState { | ||
| fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>) -> Self { | ||
| fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>, time_model: Rc<RefCell<dyn TimeModel>>) -> Self { | ||
| Self { | ||
| config, | ||
| tasks: SmallVec::new(), | ||
|
|
@@ -389,6 +414,8 @@ impl ExecutionState { | |
| has_cleaned_up: false, | ||
| top_level_span: tracing::Span::current(), | ||
| runnable_tasks: Vec::with_capacity(DEFAULT_INLINE_TASKS), | ||
| time_model, | ||
| timer_id_counter: 0, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -665,6 +692,8 @@ impl ExecutionState { | |
| TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().clear()); | ||
| LABELS.with(|cell| cell.borrow_mut().clear()); | ||
|
|
||
| Self::with(|s| s.timer_id_counter = 0); | ||
|
|
||
| #[cfg(debug_assertions)] | ||
| Self::with(|state| state.has_cleaned_up = true); | ||
|
|
||
|
|
@@ -680,6 +709,8 @@ impl ExecutionState { | |
| /// is different from the currently running task, indicating that the current task should yield | ||
| /// its execution. | ||
| pub(crate) fn maybe_yield() -> bool { | ||
| wake_sleepers_until_runnable(); | ||
|
|
||
| Self::with(|state| { | ||
| if std::thread::panicking() && !state.in_cleanup { | ||
| return true; | ||
|
|
@@ -786,11 +817,19 @@ impl ExecutionState { | |
| Self::with(|state| state.context_switches) | ||
| } | ||
|
|
||
| pub(crate) fn num_tasks(&self) -> usize { | ||
| self.tasks.len() | ||
| } | ||
|
|
||
| #[track_caller] | ||
| pub(crate) fn new_resource_signature(resource_type: ResourceType) -> ResourceSignature { | ||
| ExecutionState::with(|s| s.current_mut().signature.new_resource(resource_type)) | ||
| } | ||
|
|
||
| pub(crate) fn num_runnable() -> usize { | ||
| Self::with(|state| state.tasks.iter().filter(|t| t.runnable()).count()) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should improve the tracking of runnable so that this becomes O(1)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, but I don't know if we want to block this PR on that change. if so, I should probably open another PR to do that separately. this would be strongly related to building the set of runnable tasks incrementally, which I have a prototype of on this branch: https://github.com/dylanjwolff/shuttle/tree/incremental-persistent-vec but I haven't PR'ed it because I actually think the right thing to do is to change the Shuttle scheduler API so that the individual schedulers manage their own runnable task sets in whatever data structure is best for that algorithm (for example, priority queue for RP).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah no don't block the PR on this. And yeah, lets chat about potential scheduler API changes on Monday |
||
|
|
||
| pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> { | ||
| self.storage | ||
| .get(key.into()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for? It's never used no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's used in
shuttle/src/sync/time/mod.rsto differentiate sleepers with the same deadlines (see uses ofincrement_timer_counter())