@@ -94,18 +94,47 @@ impl TimerQueue {
9494}
9595
9696#[ cfg( integrated_timers) ]
97- mod queue_impl {
98- use core:: { cell:: Cell , cmp:: min, ptr, task:: Waker } ;
99-
100- use embassy_executor:: raw:: TaskRef ;
97+ pub ( crate ) mod queue_impl {
98+ use core:: {
99+ cell:: Cell ,
100+ cmp:: min,
101+ ptr:: { self , NonNull } ,
102+ task:: Waker ,
103+ } ;
104+
105+ use embassy_executor:: raw:: Executor as RawExecutor ;
106+ use embassy_executor_timer_queue:: TimerQueueItem ;
101107 use portable_atomic:: { AtomicPtr , Ordering } ;
102108
109+ /// An item in the timer queue.
110+ #[ derive( Default ) ]
111+ pub ( crate ) struct QueueItem {
112+ /// The next item in the queue.
113+ ///
114+ /// If this field contains `Some`, the item is in the queue. The last item in the queue has
115+ /// a value of `Some(dangling_pointer)`
116+ pub next : Cell < Option < NonNull < QueueItem > > > ,
117+
118+ /// The time at which this item expires.
119+ pub expires_at : u64 ,
120+
121+ /// The registered waker. If Some, the item is enqueued in the timer queue.
122+ pub waker : Option < Waker > ,
123+
124+ pub owner : AtomicPtr < RawExecutor > ,
125+ }
126+
127+ unsafe impl Sync for QueueItem { }
128+
103129 /// Copy of the embassy integrated timer queue, that clears the owner upon
104130 /// dequeueing.
105131 pub ( super ) struct RawQueue {
106- head : Cell < Option < TaskRef > > ,
132+ head : Cell < Option < NonNull < QueueItem > > > ,
107133 }
108134
135+ unsafe impl Send for RawQueue { }
136+ unsafe impl Sync for RawQueue { }
137+
109138 impl RawQueue {
110139 /// Creates a new timer queue.
111140 pub const fn new ( ) -> Self {
@@ -119,25 +148,43 @@ mod queue_impl {
119148 /// If this function returns `true`, the called should find the next
120149 /// expiration time and set a new alarm for that time.
121150 pub fn schedule_wake ( & mut self , at : u64 , waker : & Waker ) -> bool {
122- let task = embassy_executor:: raw:: task_from_waker ( waker) ;
123- let item = task. timer_queue_item ( ) ;
124- if item. next . get ( ) . is_none ( ) {
125- // If not in the queue, add it and update.
126- let prev = self . head . replace ( Some ( task) ) ;
127- item. next . set ( if prev. is_none ( ) {
128- Some ( unsafe { TaskRef :: dangling ( ) } )
129- } else {
130- prev
131- } ) ;
132- item. expires_at . set ( at) ;
133- true
134- } else if at <= item. expires_at . get ( ) {
135- // If expiration is sooner than previously set, update.
136- item. expires_at . set ( at) ;
137- true
138- } else {
139- // Task does not need to be updated.
140- false
151+ let item = unsafe {
152+ // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient
153+ // to ensure that this function creates the only mutable reference
154+ // to the queue item.
155+ TimerQueueItem :: from_embassy_waker ( waker)
156+ } ;
157+ let item = unsafe { item. as_mut :: < QueueItem > ( ) } ;
158+ match item. waker . as_ref ( ) {
159+ Some ( _) if at <= item. expires_at => {
160+ // If expiration is sooner than previously set, update.
161+ item. expires_at = at;
162+ // The waker is always stored in its own queue item, so we don't need to update
163+ // it.
164+
165+ // Trigger a queue update in case this item can be immediately dequeued.
166+ true
167+ }
168+ Some ( _) => {
169+ // Queue item does not need to be updated, the task will be scheduled to be
170+ // woken before the new expiration.
171+ false
172+ }
173+ None => {
174+ // If not in the queue, add it and update.
175+ let mut item_ptr = NonNull :: from ( item) ;
176+ let prev = self . head . replace ( Some ( item_ptr) ) ;
177+
178+ let item = unsafe { item_ptr. as_mut ( ) } ;
179+
180+ item. expires_at = at;
181+ item. waker = Some ( waker. clone ( ) ) ;
182+ item. next . set ( prev) ;
183+ // The default implementation doesn't care about the
184+ // opaque payload, leave it unchanged.
185+
186+ true
187+ }
141188 }
142189 }
143190
@@ -149,45 +196,36 @@ mod queue_impl {
149196 pub fn next_expiration ( & mut self , now : u64 ) -> u64 {
150197 let mut next_expiration = u64:: MAX ;
151198
152- self . retain ( |p| {
153- let item = p. timer_queue_item ( ) ;
154- let expires = item. expires_at . get ( ) ;
155-
156- if expires <= now {
199+ self . retain ( |item| {
200+ if item. expires_at <= now {
157201 // Timer expired, process task.
158- embassy_executor:: raw:: wake_task ( p) ;
202+ if let Some ( waker) = item. waker . take ( ) {
203+ waker. wake ( ) ;
204+ }
159205 false
160206 } else {
161207 // Timer didn't yet expire, or never expires.
162- next_expiration = min ( next_expiration, expires ) ;
163- expires != u64:: MAX
208+ next_expiration = min ( next_expiration, item . expires_at ) ;
209+ item . expires_at != u64:: MAX
164210 }
165211 } ) ;
166212
167213 next_expiration
168214 }
169215
170- fn retain ( & self , mut f : impl FnMut ( TaskRef ) -> bool ) {
216+ fn retain ( & self , mut f : impl FnMut ( & mut QueueItem ) -> bool ) {
171217 let mut prev = & self . head ;
172- while let Some ( p) = prev. get ( ) {
173- if unsafe { p == TaskRef :: dangling ( ) } {
174- // prev was the last item, stop
175- break ;
176- }
177- let item = p. timer_queue_item ( ) ;
178- if f ( p) {
218+ while let Some ( mut p) = prev. get ( ) {
219+ let item = unsafe { p. as_mut ( ) } ;
220+
221+ if f ( item) {
179222 // Skip to next
180223 prev = & item. next ;
181224 } else {
182225 // Remove it
183226 prev. set ( item. next . get ( ) ) ;
184227 // Clear owner
185- unsafe {
186- // SAFETY: our payload is an AtomicPtr.
187- item. payload
188- . as_ref :: < AtomicPtr < ( ) > > ( )
189- . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
190- }
228+ item. owner . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
191229 item. next . set ( None ) ;
192230 }
193231 }
0 commit comments