Skip to content

Commit b5b59fe

Browse files
authored
rtos-driver: Add CompatQueue (#4371)
* Add CompatQueue * Remove incorrect and unused from_isr implementations * Fix queue count confusion * Implement send_to_front
1 parent 7ca52fa commit b5b59fe

File tree

4 files changed

+366
-12
lines changed

4 files changed

+366
-12
lines changed

esp-radio-rtos-driver/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- `queue::CompatQueue` to simplify OS integration. (#4371)
1213

1314
### Changed
1415

esp-radio-rtos-driver/src/queue.rs

Lines changed: 313 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
//!
1111
//! See the [`QueueImplementation`] documentation for more information.
1212
//!
13+
//! You may also choose to use the [`CompatQueue`] implementation provided by this crate.
14+
//!
1315
//! ## Usage
1416
//!
1517
//! Users should use [`QueueHandle`] to interact with queues created by the driver implementation.
1618
//!
1719
//! > Note that the only expected user of this crate is esp-radio.
1820
19-
use core::ptr::NonNull;
21+
use core::{cell::UnsafeCell, ptr::NonNull};
2022

2123
/// Pointer to an opaque queue created by the driver implementation.
2224
pub type QueuePtr = NonNull<()>;
@@ -464,3 +466,313 @@ impl Drop for QueueHandle {
464466
unsafe { esp_rtos_queue_delete(self.0) };
465467
}
466468
}
469+
470+
use alloc::{boxed::Box, vec};
471+
472+
use crate::semaphore::{SemaphoreHandle, SemaphoreKind};
473+
474+
struct QueueInner {
475+
storage: Box<[u8]>,
476+
item_size: usize,
477+
capacity: usize,
478+
count: usize,
479+
current_read: usize,
480+
current_write: usize,
481+
}
482+
483+
impl QueueInner {
484+
fn get(&self, index: usize) -> &[u8] {
485+
let item_start = self.item_size * index;
486+
&self.storage[item_start..][..self.item_size]
487+
}
488+
489+
fn get_mut(&mut self, index: usize) -> &mut [u8] {
490+
let item_start = self.item_size * index;
491+
&mut self.storage[item_start..][..self.item_size]
492+
}
493+
494+
fn len(&self) -> usize {
495+
self.count
496+
}
497+
498+
fn send_to_back(&mut self, item: *const u8) {
499+
let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
500+
501+
let dst = self.get_mut(self.current_write);
502+
dst.copy_from_slice(item);
503+
504+
self.current_write = (self.current_write + 1) % self.capacity;
505+
self.count += 1;
506+
}
507+
508+
fn send_to_front(&mut self, item: *const u8) {
509+
let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
510+
511+
self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
512+
513+
let dst = self.get_mut(self.current_read);
514+
dst.copy_from_slice(item);
515+
516+
self.count += 1;
517+
}
518+
519+
fn read_from_front(&mut self, dst: *mut u8) {
520+
let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
521+
522+
let src = self.get(self.current_read);
523+
dst.copy_from_slice(src);
524+
525+
self.current_read = (self.current_read + 1) % self.capacity;
526+
self.count -= 1;
527+
}
528+
529+
fn remove(&mut self, item: *const u8) -> bool {
530+
let count = self.len();
531+
532+
if count == 0 {
533+
return false;
534+
}
535+
536+
let mut tmp_item = vec![0; self.item_size];
537+
538+
let mut found = false;
539+
let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
540+
for _ in 0..count {
541+
self.read_from_front(tmp_item.as_mut_ptr().cast());
542+
543+
if found || &tmp_item[..] != item_slice {
544+
self.send_to_back(tmp_item.as_mut_ptr().cast());
545+
} else {
546+
found = true;
547+
}
548+
549+
// Note that even if we find our item, we'll need to keep cycling through everything to
550+
// keep insertion order.
551+
}
552+
553+
found
554+
}
555+
}
556+
557+
/// A suitable queue implementation that only requires semaphores from the OS.
558+
///
559+
/// Register in your OS implementation by adding the following code:
560+
///
561+
/// ```rust
562+
/// use esp_radio_rtos_driver::{queue::CompatQueue, register_queue_implementation};
563+
///
564+
/// register_queue_implementation!(CompatQueue);
565+
/// ```
566+
pub struct CompatQueue {
567+
/// Allows interior mutability for the queue's inner state, when the mutex is held.
568+
inner: UnsafeCell<QueueInner>,
569+
570+
semaphore_empty: SemaphoreHandle,
571+
semaphore_full: SemaphoreHandle,
572+
mutex: SemaphoreHandle,
573+
}
574+
575+
impl CompatQueue {
576+
fn new(capacity: usize, item_size: usize) -> Self {
577+
let storage = vec![0; capacity * item_size].into_boxed_slice();
578+
let semaphore_empty = SemaphoreHandle::new(SemaphoreKind::Counting {
579+
max: capacity as u32,
580+
initial: capacity as u32,
581+
});
582+
let semaphore_full = SemaphoreHandle::new(SemaphoreKind::Counting {
583+
max: capacity as u32,
584+
initial: 0,
585+
});
586+
let mutex = SemaphoreHandle::new(SemaphoreKind::Mutex);
587+
Self {
588+
inner: UnsafeCell::new(QueueInner {
589+
storage,
590+
item_size,
591+
capacity,
592+
count: 0,
593+
current_read: 0,
594+
current_write: 0,
595+
}),
596+
semaphore_empty,
597+
semaphore_full,
598+
mutex,
599+
}
600+
}
601+
602+
unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
603+
unsafe { ptr.cast::<Self>().as_ref() }
604+
}
605+
}
606+
607+
impl QueueImplementation for CompatQueue {
608+
fn create(capacity: usize, item_size: usize) -> QueuePtr {
609+
let q = Box::new(CompatQueue::new(capacity, item_size));
610+
NonNull::from(Box::leak(q)).cast()
611+
}
612+
613+
unsafe fn delete(queue: QueuePtr) {
614+
let q = unsafe { Box::from_raw(queue.cast::<CompatQueue>().as_ptr()) };
615+
core::mem::drop(q);
616+
}
617+
618+
unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
619+
let queue = unsafe { CompatQueue::from_ptr(queue) };
620+
621+
if queue.semaphore_empty.take(timeout_us) {
622+
// The inner mutex shouldn't be held for a long time, but we still shouldn't block
623+
// indefinitely.
624+
if queue.mutex.take(timeout_us) {
625+
let inner = unsafe { &mut *queue.inner.get() };
626+
inner.send_to_front(item);
627+
628+
queue.mutex.give();
629+
queue.semaphore_full.give();
630+
true
631+
} else {
632+
queue.semaphore_empty.give();
633+
false
634+
}
635+
} else {
636+
false
637+
}
638+
}
639+
640+
unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
641+
let queue = unsafe { CompatQueue::from_ptr(queue) };
642+
643+
if queue.semaphore_empty.take(timeout_us) {
644+
// The inner mutex shouldn't be held for a long time, but we still shouldn't block
645+
// indefinitely.
646+
if queue.mutex.take(timeout_us) {
647+
let inner = unsafe { &mut *queue.inner.get() };
648+
inner.send_to_back(item);
649+
650+
queue.mutex.give();
651+
queue.semaphore_full.give();
652+
true
653+
} else {
654+
queue.semaphore_empty.give();
655+
false
656+
}
657+
} else {
658+
false
659+
}
660+
}
661+
662+
unsafe fn try_send_to_back_from_isr(
663+
queue: QueuePtr,
664+
item: *const u8,
665+
mut higher_prio_task_waken: Option<&mut bool>,
666+
) -> bool {
667+
let queue = unsafe { CompatQueue::from_ptr(queue) };
668+
669+
if queue
670+
.semaphore_empty
671+
.try_take_from_isr(higher_prio_task_waken.as_deref_mut())
672+
{
673+
if queue
674+
.mutex
675+
.try_take_from_isr(higher_prio_task_waken.as_deref_mut())
676+
{
677+
let inner = unsafe { &mut *queue.inner.get() };
678+
inner.send_to_back(item);
679+
680+
queue
681+
.mutex
682+
.try_give_from_isr(higher_prio_task_waken.as_deref_mut());
683+
queue
684+
.semaphore_full
685+
.try_give_from_isr(higher_prio_task_waken);
686+
true
687+
} else {
688+
queue
689+
.semaphore_empty
690+
.try_give_from_isr(higher_prio_task_waken);
691+
false
692+
}
693+
} else {
694+
false
695+
}
696+
}
697+
698+
unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
699+
let queue = unsafe { CompatQueue::from_ptr(queue) };
700+
701+
if queue.semaphore_full.take(timeout_us) {
702+
if queue.mutex.take(timeout_us) {
703+
let inner = unsafe { &mut *queue.inner.get() };
704+
inner.read_from_front(item);
705+
706+
queue.mutex.give();
707+
queue.semaphore_empty.give();
708+
true
709+
} else {
710+
queue.semaphore_full.give();
711+
false
712+
}
713+
} else {
714+
false
715+
}
716+
}
717+
718+
unsafe fn try_receive_from_isr(
719+
queue: QueuePtr,
720+
item: *mut u8,
721+
mut higher_prio_task_waken: Option<&mut bool>,
722+
) -> bool {
723+
let queue = unsafe { CompatQueue::from_ptr(queue) };
724+
725+
if queue
726+
.semaphore_full
727+
.try_take_from_isr(higher_prio_task_waken.as_deref_mut())
728+
{
729+
if queue
730+
.mutex
731+
.try_take_from_isr(higher_prio_task_waken.as_deref_mut())
732+
{
733+
let inner = unsafe { &mut *queue.inner.get() };
734+
inner.read_from_front(item);
735+
736+
queue
737+
.mutex
738+
.try_give_from_isr(higher_prio_task_waken.as_deref_mut());
739+
queue
740+
.semaphore_empty
741+
.try_give_from_isr(higher_prio_task_waken);
742+
true
743+
} else {
744+
queue
745+
.semaphore_full
746+
.try_give_from_isr(higher_prio_task_waken);
747+
false
748+
}
749+
} else {
750+
false
751+
}
752+
}
753+
754+
unsafe fn remove(queue: QueuePtr, item: *const u8) {
755+
let queue = unsafe { CompatQueue::from_ptr(queue) };
756+
757+
if queue.semaphore_full.take(Some(0)) {
758+
queue.mutex.take(None);
759+
760+
let inner = unsafe { &mut *queue.inner.get() };
761+
let item_removed = inner.remove(item);
762+
763+
queue.mutex.give();
764+
765+
if item_removed {
766+
queue.semaphore_empty.give();
767+
} else {
768+
queue.semaphore_full.give();
769+
}
770+
}
771+
}
772+
773+
fn messages_waiting(queue: QueuePtr) -> usize {
774+
let queue = unsafe { CompatQueue::from_ptr(queue) };
775+
776+
queue.semaphore_full.current_count() as usize
777+
}
778+
}

esp-rtos/src/esp_radio/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,12 @@ impl SemaphoreImplementation for Semaphore {
159159
semaphore.try_take()
160160
}
161161

162-
unsafe fn try_give_from_isr(semaphore: SemaphorePtr, _hptw: Option<&mut bool>) -> bool {
163-
unsafe { <Self as SemaphoreImplementation>::give(semaphore) }
162+
unsafe fn try_give_from_isr(_semaphore: SemaphorePtr, _hptw: Option<&mut bool>) -> bool {
163+
unimplemented!()
164164
}
165165

166-
unsafe fn try_take_from_isr(semaphore: SemaphorePtr, _hptw: Option<&mut bool>) -> bool {
167-
unsafe { <Self as SemaphoreImplementation>::try_take(semaphore) }
166+
unsafe fn try_take_from_isr(_semaphore: SemaphorePtr, _hptw: Option<&mut bool>) -> bool {
167+
unimplemented!()
168168
}
169169
}
170170

0 commit comments

Comments
 (0)