@@ -5,16 +5,19 @@ use esp_hal::time::{Duration, Instant};
55use esp_radio_preempt_driver:: {
66 queue:: { QueueImplementation , QueuePtr } ,
77 register_queue_implementation,
8- yield_task,
98} ;
109use esp_sync:: NonReentrantMutex ;
1110
11+ use crate :: wait_queue:: WaitQueue ;
12+
1213struct QueueInner {
1314 storage : Box < [ u8 ] > ,
1415 item_size : usize ,
1516 capacity : usize ,
1617 current_read : usize ,
1718 current_write : usize ,
19+ waiting_for_space : WaitQueue ,
20+ waiting_for_item : WaitQueue ,
1821}
1922
2023impl QueueInner {
@@ -25,6 +28,8 @@ impl QueueInner {
2528 current_read : 0 ,
2629 current_write : 0 ,
2730 storage : vec ! [ 0 ; capacity * item_size] . into_boxed_slice ( ) ,
31+ waiting_for_space : WaitQueue :: new ( ) ,
32+ waiting_for_item : WaitQueue :: new ( ) ,
2833 }
2934 }
3035
@@ -39,7 +44,7 @@ impl QueueInner {
3944 }
4045
4146 unsafe fn try_enqueue ( & mut self , item : * const u8 ) -> bool {
42- if self . len ( ) == self . capacity {
47+ if self . full ( ) {
4348 return false ;
4449 }
4550
@@ -54,7 +59,7 @@ impl QueueInner {
5459 }
5560
5661 unsafe fn try_dequeue ( & mut self , dst : * mut u8 ) -> bool {
57- if self . len ( ) == 0 {
62+ if self . empty ( ) {
5863 return false ;
5964 }
6065
@@ -69,15 +74,15 @@ impl QueueInner {
6974 }
7075
7176 unsafe fn remove ( & mut self , item : * const u8 ) {
77+ if self . empty ( ) {
78+ return ;
79+ }
80+
7281 // do what the ESP-IDF implementations does...
7382 // just remove all elements and add them back except the one we need to remove -
7483 // good enough for now
7584 let count = self . len ( ) ;
7685
77- if count == 0 {
78- return ;
79- }
80-
8186 let mut tmp_item = vec ! [ 0 ; self . item_size] ;
8287
8388 let item_slice = unsafe { core:: slice:: from_raw_parts ( item, self . item_size ) } ;
@@ -100,6 +105,14 @@ impl QueueInner {
100105 self . capacity - self . current_read + self . current_write
101106 }
102107 }
108+
109+ fn empty ( & self ) -> bool {
110+ self . len ( ) == 0
111+ }
112+
113+ fn full ( & self ) -> bool {
114+ self . len ( ) == self . capacity
115+ }
103116}
104117
105118pub struct Queue {
@@ -117,48 +130,109 @@ impl Queue {
117130 unsafe { ptr. cast :: < Self > ( ) . as_ref ( ) }
118131 }
119132
120- fn yield_loop_with_timeout ( timeout_us : Option < u32 > , mut cb : impl FnMut ( ) -> bool ) -> bool {
121- let start = if timeout_us. is_some ( ) {
122- Instant :: now ( )
123- } else {
124- Instant :: EPOCH
125- } ;
126-
127- let timeout = timeout_us
128- . map ( |us| Duration :: from_micros ( us as u64 ) )
129- . unwrap_or ( Duration :: MAX ) ;
133+ unsafe fn send_to_back ( & self , item : * const u8 , timeout_us : Option < u32 > ) -> bool {
134+ let deadline = timeout_us. map ( |us| Instant :: now ( ) + Duration :: from_micros ( us as u64 ) ) ;
130135
131136 loop {
132- if cb ( ) {
137+ let enqueued = self . inner . with ( |queue| {
138+ if unsafe { queue. try_enqueue ( item) } {
139+ queue. waiting_for_item . notify ( ) ;
140+ true
141+ } else {
142+ // The task will go to sleep when the above critical section is released.
143+ queue. waiting_for_space . wait_with_deadline ( deadline) ;
144+ false
145+ }
146+ } ) ;
147+
148+ if enqueued {
133149 return true ;
134150 }
135151
136- if timeout_us. is_some ( ) && start. elapsed ( ) > timeout {
152+ // We are here because the queue was full. Now we've either timed out, or an item has
153+ // been removed from the queue. However, any higher priority task can wake up
154+ // and preempt us still. Let's just check for the timeout, and try the whole process
155+ // again.
156+
157+ if let Some ( deadline) = deadline
158+ && deadline < Instant :: now ( )
159+ {
160+ // We have a deadline and we've timed out.
137161 return false ;
138162 }
139-
140- yield_task ( ) ;
163+ // We can block more, so let's attempt to enqueue again.
141164 }
142165 }
143166
144- unsafe fn send_to_back ( & self , item : * const u8 , timeout_us : Option < u32 > ) -> bool {
145- Self :: yield_loop_with_timeout ( timeout_us, || unsafe { self . try_send_to_back ( item) } )
146- }
147-
148167 unsafe fn try_send_to_back ( & self , item : * const u8 ) -> bool {
149- self . inner . with ( |queue| unsafe { queue. try_enqueue ( item) } )
168+ self . inner . with ( |queue| {
169+ if unsafe { queue. try_enqueue ( item) } {
170+ queue. waiting_for_item . notify ( ) ;
171+ true
172+ } else {
173+ false
174+ }
175+ } )
150176 }
151177
152178 unsafe fn receive ( & self , item : * mut u8 , timeout_us : Option < u32 > ) -> bool {
153- Self :: yield_loop_with_timeout ( timeout_us, || unsafe { self . try_receive ( item) } )
179+ let deadline = timeout_us. map ( |us| Instant :: now ( ) + Duration :: from_micros ( us as u64 ) ) ;
180+
181+ loop {
182+ // Attempt to dequeue an item from the queue
183+ let dequeued = self . inner . with ( |queue| {
184+ if unsafe { queue. try_dequeue ( item) } {
185+ queue. waiting_for_space . notify ( ) ;
186+ true
187+ } else {
188+ // The task will go to sleep when the above critical section is released.
189+ queue. waiting_for_item . wait_with_deadline ( deadline) ;
190+ false
191+ }
192+ } ) ;
193+
194+ if dequeued {
195+ return true ;
196+ }
197+
198+ // We are here because we weren't able to dequeue from the queue previously. We've
199+ // either timed out, or the queue has an item. However, any higher priority
200+ // task can wake up and preempt us still. Let's just check for the timeout,
201+ // and try the whole process again.
202+
203+ if let Some ( deadline) = deadline
204+ && deadline < Instant :: now ( )
205+ {
206+ // We have a deadline and we've timed out.
207+ return false ;
208+ }
209+ // We can block more, so let's attempt to dequeue again.
210+ }
154211 }
155212
156213 unsafe fn try_receive ( & self , item : * mut u8 ) -> bool {
157- self . inner . with ( |queue| unsafe { queue. try_dequeue ( item) } )
214+ self . inner . with ( |queue| {
215+ if unsafe { queue. try_dequeue ( item) } {
216+ queue. waiting_for_space . notify ( ) ;
217+ true
218+ } else {
219+ false
220+ }
221+ } )
158222 }
159223
160224 unsafe fn remove ( & self , item : * const u8 ) {
161- self . inner . with ( |queue| unsafe { queue. remove ( item) } )
225+ self . inner . with ( |queue| {
226+ let was_full = queue. full ( ) ;
227+
228+ unsafe {
229+ queue. remove ( item) ;
230+ }
231+
232+ if was_full && !queue. full ( ) {
233+ queue. waiting_for_space . notify ( ) ;
234+ }
235+ } )
162236 }
163237
164238 fn messages_waiting ( & self ) -> usize {
0 commit comments