Skip to content

Commit a2d8cfa

Browse files
committed
rtic-sync: explicitly send an awoken Sender the slot it can use
1 parent 6903d20 commit a2d8cfa

File tree

1 file changed

+71
-33
lines changed

1 file changed

+71
-33
lines changed

rtic-sync/src/channel.rs

Lines changed: 71 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ use core::{
1212
#[doc(hidden)]
1313
pub use critical_section;
1414
use heapless::Deque;
15-
use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
1615
use rtic_common::{
17-
dropper::OnDrop,
18-
wait_queue::{Link, WaitQueue},
16+
dropper::OnDrop, wait_queue::DoublyLinkedList, wait_queue::Link,
17+
waker_registration::CriticalSectionWakerRegistration as WakerRegistration,
1918
};
2019

2120
#[cfg(feature = "defmt-03")]
2221
use crate::defmt;
2322

23+
type WaitQueueData = (Waker, SlotPtr);
24+
type WaitQueue = DoublyLinkedList<WaitQueueData>;
25+
2426
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
2527
///
2628
/// This channel uses critical sections, however there are extremely small and all `memcpy`
@@ -192,11 +194,11 @@ unsafe impl<T, const N: usize> Send for Sender<'_, T, N> {}
192194
/// This is needed to make the async closure in `send` accept that we "share"
193195
/// the link possible between threads.
194196
#[derive(Clone)]
195-
struct LinkPtr(*mut Option<Link<Waker>>);
197+
struct LinkPtr(*mut Option<Link<WaitQueueData>>);
196198

197199
impl LinkPtr {
198200
/// This will dereference the pointer stored within and give out an `&mut`.
199-
unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
201+
unsafe fn get(&mut self) -> &mut Option<Link<WaitQueueData>> {
200202
&mut *self.0
201203
}
202204
}
@@ -205,6 +207,28 @@ unsafe impl Send for LinkPtr {}
205207

206208
unsafe impl Sync for LinkPtr {}
207209

210+
/// This is needed to make the async closure in `send` accept that we "share"
211+
/// the link possible between threads.
212+
#[derive(Clone)]
213+
struct SlotPtr(*mut Option<u8>);
214+
215+
impl SlotPtr {
216+
/// Replace the value of this slot with `new_value`, and return
217+
/// the old value.
218+
fn replace(
219+
&mut self,
220+
new_value: Option<u8>,
221+
_cs: critical_section::CriticalSection,
222+
) -> Option<u8> {
223+
// SAFETY: we are in a critical section.
224+
unsafe { core::ptr::replace(self.0, new_value) }
225+
}
226+
}
227+
228+
unsafe impl Send for SlotPtr {}
229+
230+
unsafe impl Sync for SlotPtr {}
231+
208232
impl<T, const N: usize> core::fmt::Debug for Sender<'_, T, N> {
209233
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
210234
write!(f, "Sender")
@@ -268,44 +292,59 @@ impl<T, const N: usize> Sender<'_, T, N> {
268292
/// Send a value. If there is no place left in the queue this will wait until there is.
269293
/// If the receiver does not exist this will return an error.
270294
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
271-
let mut link_ptr: Option<Link<Waker>> = None;
295+
let mut free_slot_ptr: Option<u8> = None;
296+
let mut link_ptr: Option<Link<WaitQueueData>> = None;
272297

273298
// Make this future `Drop`-safe.
274299
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
275-
let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
300+
let mut link_ptr = LinkPtr(core::ptr::addr_of_mut!(link_ptr));
301+
// SAFETY(freed_slot): Shadow the original definition of `free_slot_ptr` so we can't abuse it.
302+
let mut free_slot_ptr = SlotPtr(core::ptr::addr_of_mut!(free_slot_ptr));
276303

277304
let mut link_ptr2 = link_ptr.clone();
305+
let mut free_slot_ptr2 = free_slot_ptr.clone();
278306
let dropper = OnDrop::new(|| {
279307
// SAFETY: We only run this closure and dereference the pointer if we have
280308
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
281309
// of this pointer is in the `poll_fn`.
282310
if let Some(link) = unsafe { link_ptr2.get() } {
283311
link.remove_from_list(&self.0.wait_queue);
284312
}
313+
314+
// Potentially unnecessary c-s because our link was already popped, so there
315+
// is no way for anything else to access the free slot ptr. Gotta think
316+
// about this a bit more...
317+
critical_section::with(|cs| {
318+
if let Some(freed_slot) = free_slot_ptr2.replace(None, cs) {
319+
debug_assert!(!self.0.access(cs).freeq.is_full());
320+
// SAFETY: freeq is not full.
321+
unsafe {
322+
self.0.access(cs).freeq.push_back_unchecked(freed_slot);
323+
}
324+
}
325+
});
285326
});
286327

287328
let idx = poll_fn(|cx| {
288-
if self.is_closed() {
289-
return Poll::Ready(Err(()));
290-
}
291-
292329
// Do all this in one critical section, else there can be race conditions
293-
let queue_idx = critical_section::with(|cs| {
330+
critical_section::with(|cs| {
294331
let wq_empty = self.0.wait_queue.is_empty();
295332
let fq_empty = self.0.access(cs).freeq.is_empty();
333+
296334
if !wq_empty || fq_empty {
297335
// SAFETY: This pointer is only dereferenced here and on drop of the future
298336
// which happens outside this `poll_fn`'s stack frame.
299337
let link = unsafe { link_ptr.get() };
300338
if let Some(link) = link {
301339
if !link.is_popped() {
302-
return None;
340+
return Poll::Pending;
303341
} else {
304342
// Fall through to dequeue
305343
}
306344
} else {
307345
// Place the link in the wait queue on first run.
308-
let link_ref = link.insert(Link::new(cx.waker().clone()));
346+
let link_ref =
347+
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
309348

310349
// SAFETY(new_unchecked): The address to the link is stable as it is defined
311350
// outside this stack frame.
@@ -315,23 +354,21 @@ impl<T, const N: usize> Sender<'_, T, N> {
315354
// `link_ptr` lives until the end of the stack frame.
316355
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
317356

318-
return None;
357+
return Poll::Pending;
319358
}
320359
}
321360

322-
assert!(!self.0.access(cs).freeq.is_empty());
323-
// Get index as the queue is guaranteed not empty and the wait queue is empty
324-
let idx = unsafe { self.0.access(cs).freeq.pop_front_unchecked() };
325-
326-
Some(idx)
327-
});
361+
let slot = free_slot_ptr
362+
.replace(None, cs)
363+
.or_else(|| self.0.access(cs).freeq.pop_back());
328364

329-
if let Some(idx) = queue_idx {
330-
// Return the index
331-
Poll::Ready(Ok(idx))
332-
} else {
333-
Poll::Pending
334-
}
365+
if let Some(slot) = slot {
366+
Poll::Ready(Ok(slot))
367+
} else {
368+
debug_assert!(self.is_closed());
369+
Poll::Ready(Err(()))
370+
}
371+
})
335372
})
336373
.await;
337374

@@ -430,14 +467,15 @@ impl<T, const N: usize> Receiver<'_, T, N> {
430467

431468
// Return the index to the free queue after we've read the value.
432469
critical_section::with(|cs| {
433-
assert!(!self.0.access(cs).freeq.is_full());
434-
unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
435-
436470
fence(Ordering::SeqCst);
437471

438-
// If someone is waiting in the WaiterQueue, wake the first one up.
439-
if let Some(wait_head) = self.0.wait_queue.pop() {
472+
// If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot.
473+
if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() {
474+
freeq_slot.replace(Some(rs), cs);
440475
wait_head.wake();
476+
} else {
477+
assert!(!self.0.access(cs).freeq.is_full());
478+
unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
441479
}
442480

443481
Ok(r)
@@ -495,7 +533,7 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
495533
// Mark the receiver as dropped and wake all waiters
496534
critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true);
497535

498-
while let Some(waker) = self.0.wait_queue.pop() {
536+
while let Some((waker, _)) = self.0.wait_queue.pop() {
499537
waker.wake();
500538
}
501539
}

0 commit comments

Comments
 (0)