Skip to content

Commit 33527be

Browse files
committed
rtic-sync: always wake wait_queue when attempting to return an item
to freeq
1 parent eb636db commit 33527be

File tree

1 file changed

+33
-23
lines changed

1 file changed

+33
-23
lines changed

rtic-sync/src/channel.rs

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@ impl<T, const N: usize> Channel<T, N> {
108108
}
109109
}
110110
}
111+
112+
/// Return free slot `slot` to the channel.
113+
///
114+
/// This will do one of two things:
115+
/// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
116+
/// 2. else, insert `slot` into `self.freeq`.
117+
///
118+
/// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
119+
unsafe fn return_free_slot(&self, slot: u8) {
120+
critical_section::with(|cs| {
121+
fence(Ordering::SeqCst);
122+
123+
// If someone is waiting in the `wait_queue`, wake the first one up & hand it the free slot.
124+
if let Some((wait_head, mut freeq_slot)) = self.wait_queue.pop() {
125+
// SAFETY: `freeq_slot` is valid for writes: we are in a critical
126+
// section & the `SlotPtr` lives for at least the duration of the wait queue link.
127+
unsafe { freeq_slot.replace(Some(slot), cs) };
128+
wait_head.wake();
129+
} else {
130+
assert!(!self.access(cs).freeq.is_full());
131+
unsafe { self.access(cs).freeq.push_back_unchecked(slot) }
132+
}
133+
})
134+
}
111135
}
112136

113137
/// Creates a split channel with `'static` lifetime.
@@ -313,18 +337,16 @@ impl<T, const N: usize> Sender<'_, T, N> {
313337
link.remove_from_list(&self.0.wait_queue);
314338
}
315339

340+
// Return our potentially-unused free slot.
316341
// Potentially unnecessary c-s because our link was already popped, so there
317342
// is no way for anything else to access the free slot ptr. Gotta think
318343
// about this a bit more...
319-
//
320-
// SAFETY(replace): `free_slot_ptr2` is valid for writes.
321344
critical_section::with(|cs| {
322345
if let Some(freed_slot) = unsafe { free_slot_ptr2.replace(None, cs) } {
323-
debug_assert!(!self.0.access(cs).freeq.is_full());
324-
// SAFETY: freeq is not full.
325-
unsafe {
326-
self.0.access(cs).freeq.push_back_unchecked(freed_slot);
327-
}
346+
// SAFETY: freed slot is passed to us from `return_free_slot`, which either
347+
// directly (through `try_recv`), or indirectly (through another `return_free_slot`)
348+
// comes from `readyq`.
349+
unsafe { self.0.return_free_slot(freed_slot) };
328350
}
329351
});
330352
});
@@ -350,7 +372,7 @@ impl<T, const N: usize> Sender<'_, T, N> {
350372
let slot = unsafe { free_slot_ptr.replace(None, cs) };
351373

352374
// If our link is popped, then:
353-
// 1. We were popped by `try_recv` and it provided us with a slot.
375+
// 1. We were popped by `return_free_lot` and provided us with a slot.
354376
// 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
355377
if let Some(slot) = slot {
356378
Poll::Ready(Ok(slot))
@@ -482,22 +504,10 @@ impl<T, const N: usize> Receiver<'_, T, N> {
482504
let r = unsafe { ptr::read(self.0.slots.get_unchecked(rs as usize).get() as *const T) };
483505

484506
// Return the index to the free queue after we've read the value.
485-
critical_section::with(|cs| {
486-
fence(Ordering::SeqCst);
487-
488-
// If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot.
489-
if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() {
490-
// SAFETY: `freeq_slot` is valid for writes: we are in a critical
491-
// section & the `SlotPtr` lives for at least the duration of the wait queue link.
492-
unsafe { freeq_slot.replace(Some(rs), cs) };
493-
wait_head.wake();
494-
} else {
495-
assert!(!self.0.access(cs).freeq.is_full());
496-
unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
497-
}
507+
// SAFETY: `rs` comes directly from `readyq`.
508+
unsafe { self.0.return_free_slot(rs) };
498509

499-
Ok(r)
500-
})
510+
Ok(r)
501511
} else if self.is_closed() {
502512
Err(ReceiveError::NoSender)
503513
} else {

0 commit comments

Comments
 (0)