@@ -5,6 +5,7 @@ use crate::runtime::task::labels::Labels;
55use crate :: runtime:: task:: { ChildLabelFn , Task , TaskId , TaskName , TaskSignature , DEFAULT_INLINE_TASKS } ;
66use crate :: runtime:: thread:: continuation:: PooledContinuation ;
77use crate :: scheduler:: { Schedule , Scheduler } ;
8+ use crate :: sync:: time:: TimeModel ;
89use crate :: sync:: { ResourceSignature , ResourceType } ;
910use crate :: thread:: thread_fn;
1011use crate :: { Config , MaxSteps } ;
@@ -48,15 +49,21 @@ thread_local! {
4849/// static variable, but clients get access to it by calling `ExecutionState::with`.
4950pub ( crate ) struct Execution {
5051 scheduler : Rc < RefCell < dyn Scheduler > > ,
52+ time_model : Rc < RefCell < dyn TimeModel > > ,
5153 initial_schedule : Schedule ,
5254}
5355
5456impl Execution {
5557 /// Construct a new execution that will use the given scheduler. The execution should then be
5658 /// invoked via its `run` method, which takes as input the closure for task 0.
57- pub ( crate ) fn new ( scheduler : Rc < RefCell < dyn Scheduler > > , initial_schedule : Schedule ) -> Self {
59+ pub ( crate ) fn new (
60+ scheduler : Rc < RefCell < dyn Scheduler > > ,
61+ initial_schedule : Schedule ,
62+ time_model : Rc < RefCell < dyn TimeModel > > ,
63+ ) -> Self {
5864 Self {
5965 scheduler,
66+ time_model,
6067 initial_schedule,
6168 }
6269 }
@@ -77,6 +84,7 @@ impl Execution {
7784 let state = RefCell :: new ( ExecutionState :: new (
7885 config. clone ( ) ,
7986 Rc :: clone ( & self . scheduler ) ,
87+ Rc :: clone ( & self . time_model ) ,
8088 self . initial_schedule . clone ( ) ,
8189 ) ) ;
8290
@@ -95,6 +103,7 @@ impl Execution {
95103
96104 // Cleanup the state before it goes out of `EXECUTION_STATE` scope
97105 ExecutionState :: cleanup ( ) ;
106+ self . time_model . borrow_mut ( ) . reset ( ) ;
98107 } ) ;
99108 }
100109
@@ -107,6 +116,13 @@ impl Execution {
107116 Finished ,
108117 }
109118
119+ // While there are no runnable tasks and tasks are able to be woken by the time model, continue waking tasks
120+ while ExecutionState :: num_runnable ( ) == 0
121+ && ExecutionState :: with ( |s| Rc :: clone ( & s. time_model ) )
122+ . borrow_mut ( )
123+ . wake_next ( )
124+ { }
125+
110126 let next_step = ExecutionState :: with ( |state| {
111127 if let Err ( msg) = state. schedule ( ) {
112128 return NextStep :: Failure ( msg, state. current_schedule . clone ( ) ) ;
@@ -281,6 +297,10 @@ pub(crate) struct ExecutionState {
281297 // Persistent Vec used as a bump allocator for references to runnable tasks to avoid slow allocation
282298 // on each scheduling decision. Should not be used outside of the `schedule` function
283299 runnable_tasks : Vec < * const Task > ,
300+
301+ // Counter for unique timing resource ids (Sleeps, Timeouts and Intervals)
302+ pub ( crate ) timer_id_counter : u64 ,
303+ pub ( crate ) time_model : Rc < RefCell < dyn TimeModel > > ,
284304}
285305
286306#[ derive( Debug , PartialEq , Eq , Clone , Copy ) ]
@@ -305,7 +325,12 @@ impl ScheduledTask {
305325}
306326
307327impl ExecutionState {
308- fn new ( config : Config , scheduler : Rc < RefCell < dyn Scheduler > > , initial_schedule : Schedule ) -> Self {
328+ fn new (
329+ config : Config ,
330+ scheduler : Rc < RefCell < dyn Scheduler > > ,
331+ time_model : Rc < RefCell < dyn TimeModel > > ,
332+ initial_schedule : Schedule ,
333+ ) -> Self {
309334 Self {
310335 config,
311336 tasks : SmallVec :: new ( ) ,
@@ -322,17 +347,21 @@ impl ExecutionState {
322347 has_cleaned_up : false ,
323348 top_level_span : tracing:: Span :: current ( ) ,
324349 runnable_tasks : Vec :: with_capacity ( DEFAULT_INLINE_TASKS ) ,
350+ time_model,
351+ timer_id_counter : 0 ,
325352 }
326353 }
327354
328355 /// Invoke a closure with access to the current execution state. Library code uses this to gain
329356 /// access to the state of the execution to influence scheduling (e.g. to register a task as
330357 /// blocked).
331358 #[ inline]
359+ #[ track_caller]
332360 pub ( crate ) fn with < F , T > ( f : F ) -> T
333361 where
334362 F : FnOnce ( & mut ExecutionState ) -> T ,
335363 {
364+ trace ! ( "ExecutionState::with from {}" , Location :: caller( ) ) ;
336365 Self :: try_with ( f) . expect ( "Shuttle internal error: cannot access ExecutionState. are you trying to access a Shuttle primitive from outside a Shuttle test?" )
337366 }
338367
@@ -550,6 +579,8 @@ impl ExecutionState {
550579 TASK_ID_TO_TAGS . with ( |cell| cell. borrow_mut ( ) . clear ( ) ) ;
551580 LABELS . with ( |cell| cell. borrow_mut ( ) . clear ( ) ) ;
552581
582+ Self :: with ( |s| s. timer_id_counter = 0 ) ;
583+
553584 #[ cfg( debug_assertions) ]
554585 Self :: with ( |state| state. has_cleaned_up = true ) ;
555586
@@ -565,6 +596,13 @@ impl ExecutionState {
565596 /// is different from the currently running task, indicating that the current task should yield
566597 /// its execution.
567598 pub ( crate ) fn maybe_yield ( ) -> bool {
599+ // While there are no runnable tasks and tasks are able to be woken by the time model, continue waking tasks
600+ while ExecutionState :: num_runnable ( ) == 0
601+ && ExecutionState :: with ( |s| Rc :: clone ( & s. time_model ) )
602+ . borrow_mut ( )
603+ . wake_next ( )
604+ { }
605+
568606 Self :: with ( |state| {
569607 debug_assert ! (
570608 matches!( state. current_task, ScheduledTask :: Some ( _) ) && state. next_task == ScheduledTask :: None ,
@@ -666,11 +704,19 @@ impl ExecutionState {
666704 Self :: with ( |state| state. context_switches )
667705 }
668706
707+ pub ( crate ) fn num_tasks ( & self ) -> usize {
708+ self . tasks . len ( )
709+ }
710+
669711 #[ track_caller]
670712 pub ( crate ) fn new_resource_signature ( resource_type : ResourceType ) -> ResourceSignature {
671713 ExecutionState :: with ( |s| s. current_mut ( ) . signature . new_resource ( resource_type) )
672714 }
673715
716+ pub ( crate ) fn num_runnable ( ) -> usize {
717+ Self :: with ( |state| state. tasks . iter ( ) . filter ( |t| t. runnable ( ) ) . count ( ) )
718+ }
719+
674720 pub ( crate ) fn get_storage < K : Into < StorageKey > , T : ' static > ( & self , key : K ) -> Option < & T > {
675721 self . storage
676722 . get ( key. into ( ) )
0 commit comments