Skip to content

Commit 2111d6f

Browse files
committed
rtic-sync: Channel: Sender: rewriter send logic to be easier to validate
1 parent e9e81fb commit 2111d6f

File tree

1 file changed

+39
-32
lines changed

1 file changed

+39
-32
lines changed

rtic-sync/src/channel.rs

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -333,44 +333,51 @@ impl<T, const N: usize> Sender<'_, T, N> {
333333
// Do all this in one critical section, else there can be race conditions
334334
critical_section::with(|cs| {
335335
let wq_empty = self.0.wait_queue.is_empty();
336-
let fq_empty = self.0.access(cs).freeq.is_empty();
337-
338-
if !wq_empty || fq_empty {
339-
// SAFETY: This pointer is only dereferenced here and on drop of the future
340-
// which happens outside this `poll_fn`'s stack frame.
341-
let link = unsafe { link_ptr.get() };
342-
if let Some(link) = link {
343-
if !link.is_popped() {
344-
return Poll::Pending;
336+
let freeq_empty = self.0.access(cs).freeq.is_empty();
337+
338+
// SAFETY: This pointer is only dereferenced here and on drop of the future
339+
// which happens outside this `poll_fn`'s stack frame.
340+
let link = unsafe { link_ptr.get() };
341+
342+
// We are already in the wait queue.
343+
if let Some(link) = link {
344+
if link.is_popped() {
345+
// If our link is popped, then:
346+
// 1. We were popped by `try_recv` and it provided us with a slot.
347+
// 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
348+
let slot = unsafe { free_slot_ptr.replace(None, cs) };
349+
350+
if let Some(slot) = slot {
351+
Poll::Ready(Ok(slot))
345352
} else {
346-
// Fall through to dequeue
353+
Poll::Ready(Err(()))
347354
}
348355
} else {
349-
// Place the link in the wait queue on first run.
350-
let link_ref =
351-
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
352-
353-
// SAFETY(new_unchecked): The address to the link is stable as it is defined
354-
// outside this stack frame.
355-
// SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
356-
// and we make sure in `dropper` that the link is removed from the queue
357-
// before dropping `link_ptr` AND `dropper` makes sure that the shadowed
358-
// `link_ptr` lives until the end of the stack frame.
359-
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
360-
361-
return Poll::Pending;
356+
Poll::Pending
362357
}
363358
}
364-
365-
// SAFETY: `free_slot_ptr` is valid for writes, as `free_slot_ptr` is still alive.
366-
let slot = unsafe { free_slot_ptr.replace(None, cs) }
367-
.or_else(|| self.0.access(cs).freeq.pop_back());
368-
369-
if let Some(slot) = slot {
359+
// We are not in the wait queue, but others are, or there is currently no free
360+
// slot available.
361+
else if !wq_empty || freeq_empty {
362+
// Place the link in the wait queue.
363+
let link_ref =
364+
link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
365+
366+
// SAFETY(new_unchecked): The address to the link is stable as it is defined
367+
// outside this stack frame.
368+
// SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
369+
// and we make sure in `dropper` that the link is removed from the queue
370+
// before dropping `link_ptr` AND `dropper` makes sure that the shadowed
371+
// `link_ptr` lives until the end of the stack frame.
372+
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
373+
374+
Poll::Pending
375+
}
376+
// We are not in the wait queue, no one else is waiting, and there is a free slot available.
377+
else {
378+
assert!(!self.0.access(cs).freeq.is_empty());
379+
let slot = unsafe { self.0.access(cs).freeq.pop_back_unchecked() };
370380
Poll::Ready(Ok(slot))
371-
} else {
372-
debug_assert!(self.is_closed());
373-
Poll::Ready(Err(()))
374381
}
375382
})
376383
})

0 commit comments

Comments
 (0)