Skip to content

Commit e2d9b67

Browse files
committed
zephyr: work: Simple Workqueue Work
Change the old Work implementation, which was really more of an early attempt at futures into a useful simple Work model for Zephyr Work queues. These do not use triggered work, but are simply re-submitted as needed to the work queues. Currently, they are always contained within an Arc. Signed-off-by: David Brown <[email protected]>
1 parent 28faaf8 commit e2d9b67

File tree

1 file changed

+88
-37
lines changed

1 file changed

+88
-37
lines changed

zephyr/src/work.rs

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -177,15 +177,8 @@
177177
178178
extern crate alloc;
179179

180-
use alloc::boxed::Box;
181180
use core::{
182-
convert::Infallible,
183-
ffi::{c_int, c_uint, CStr},
184-
future::Future,
185-
mem,
186-
pin::Pin,
187-
ptr,
188-
task::Poll,
181+
cell::UnsafeCell, convert::Infallible, ffi::{c_int, c_uint, CStr}, future::Future, mem, pin::Pin, ptr, task::Poll
189182
};
190183

191184
use zephyr_sys::{
@@ -196,6 +189,7 @@ use zephyr_sys::{
196189

197190
use crate::{
198191
error::to_result_void, kio::ContextExt, object::Fixed, simpletls::StaticTls,
192+
sync::Arc,
199193
sys::thread::ThreadStack, time::Timeout,
200194
};
201195

@@ -543,39 +537,57 @@ impl SubmitResult {
543537
///
544538
/// This is similar to a Future, except there is no concept of it completing. It manages its
545539
/// associated data however it wishes, and is responsible for re-queuing as needed.
546-
pub trait SimpleAction<T> {
540+
///
541+
/// Note, specifically, that the Act does not take a mutable reference. This is because the Work
542+
/// below uses an Arc, so this data can be shared.
543+
pub trait SimpleAction {
547544
/// Perform the action.
548-
fn act(self: Pin<&mut Self>);
545+
fn act(self: Pin<&Self>);
549546
}
550547

551548
/// A basic Zephyr work item.
552549
///
553550
/// Holds a `k_work`, along with the data associated with that work. When the work is queued, the
554551
/// `act` method will be called on the provided `SimpleAction`.
555-
#[repr(C)]
556552
pub struct Work<T> {
557-
work: k_work,
553+
work: UnsafeCell<k_work>,
558554
action: T,
559555
}
560556

561-
impl<T: SimpleAction<T>> Work<T> {
557+
/// SAFETY: Work queues can be sent as long as the action itself can be.
558+
unsafe impl<F> Send for Work<F>
559+
where
560+
F: SimpleAction,
561+
F: Send,
562+
{
563+
}
564+
565+
/// SAFETY: Work queues are Sync when the action is.
566+
unsafe impl<F> Sync for Work<F>
567+
where
568+
F: SimpleAction,
569+
F: Sync,
570+
{
571+
}
572+
573+
impl<T: SimpleAction + Send> Work<T> {
562574
/// Construct a new Work from the given action.
563575
///
564576
/// Note that the data will be moved into the pinned Work. The data is internal, and only
565577
/// accessible to the work thread (the `act` method). If shared data is needed, normal
566578
/// inter-thread sharing mechanisms are needed.
567579
///
568580
/// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
569-
pub fn new(action: T) -> Pin<Box<Self>> {
570-
let mut this = Box::pin(Self {
581+
pub fn new(action: T) -> Pin<Arc<Self>> {
582+
let this = Arc::pin(Self {
571583
// SAFETY: will be initialized below, after this is pinned.
572584
work: unsafe { mem::zeroed() },
573585
action,
574586
});
575-
let ptr = this.as_mut().as_k_work();
576-
// SAFETY: Initializes the zero allocated struct.
587+
588+
// SAFETY: Initializes above zero-initialized struct.
577589
unsafe {
578-
k_work_init(ptr, Some(Self::handler));
590+
k_work_init(this.work.get(), Some(Self::handler));
579591
}
580592

581593
this
@@ -585,41 +597,80 @@ impl<T: SimpleAction<T>> Work<T> {
585597
///
586598
/// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an
587599
/// explanation of them.
588-
pub fn submit(self: Pin<&mut Self>) -> crate::Result<SubmitResult> {
600+
pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
601+
// We "leak" the arc, so that when the handler runs, it can be safely turned back into an
602+
// Arc, and the drop on the arc will then run.
603+
let work = this.work.get();
604+
605+
// SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
606+
// back into a Pin when it reconstructs the Arc.
607+
let this = unsafe { Pin::into_inner_unchecked(this) };
608+
let _ = Arc::into_raw(this);
609+
589610
// SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
590611
// work item is no longer queued when the data is dropped.
591-
SubmitResult::to_result(unsafe { k_work_submit(self.as_k_work()) })
612+
SubmitResult::to_result(unsafe { k_work_submit(work) })
592613
}
593614

594615
/// Submit this work to a specified work queue.
595616
///
596617
/// TODO: Change when we have better wrappers for work queues.
597618
pub fn submit_to_queue(
598-
self: Pin<&mut Self>,
599-
queue: *mut k_work_q,
619+
this: Pin<Arc<Self>>,
620+
queue: &WorkQueue,
600621
) -> crate::Result<SubmitResult> {
622+
let work = this.work.get();
623+
624+
// "leak" the arc to give to C. We'll reconstruct it in the handler.
625+
// SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
626+
// back into a Pin when it reconstructs the Arc.
627+
let this = unsafe { Pin::into_inner_unchecked(this) };
628+
let _ = Arc::into_raw(this);
629+
601630
// SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
602631
// work item is no longer queued when the data is dropped.
603-
SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue, self.as_k_work()) })
632+
SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
604633
}
605634

606-
/// Get the pointer to the underlying work queue.
607-
fn as_k_work(self: Pin<&mut Self>) -> *mut k_work {
608-
// SAFETY: This is private, and no code here will move the pinned item.
609-
unsafe { self.map_unchecked_mut(|s| &mut s.work).get_unchecked_mut() }
635+
/// Callback, through C, but bound by a specific type.
636+
extern "C" fn handler(work: *mut k_work) {
637+
// We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
638+
// necessarily at the beginning of the struct.
639+
// SAFETY: Converts raw pointer to work back into the box.
640+
let this = unsafe { Self::from_raw(work) };
641+
642+
// Access the action within, still pinned.
643+
// SAFETY: It is safe to keep the pin on the interior.
644+
let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
645+
646+
action.act();
610647
}
611648

612-
/// Get a pointer into our action.
613-
fn as_action(self: Pin<&mut Self>) -> Pin<&mut T> {
614-
// SAFETY: We rely on the worker itself not moving the data.
615-
unsafe { self.map_unchecked_mut(|s| &mut s.action) }
649+
/*
650+
/// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw`
651+
/// called to avoid leaking the item.
652+
fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
653+
// SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
654+
// generally use move.
655+
let this = unsafe { Pin::into_inner_unchecked(this) };
656+
Arc::into_raw(this)
657+
}
658+
*/
659+
660+
/// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
661+
unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
662+
// SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the
663+
// pointer is valid.
664+
let ptr = ptr
665+
.cast::<u8>()
666+
.sub(mem::offset_of!(Self, work))
667+
.cast::<Self>();
668+
let this = Arc::from_raw(ptr);
669+
Pin::new_unchecked(this)
616670
}
617671

618-
/// Callback, through C, but bound by a specific type.
619-
extern "C" fn handler(work: *mut k_work) {
620-
// SAFETY: We rely on repr(C) placing the first field of a struct at the same address as the
621-
// struct. This avoids needing a Rust equivalent to `CONTAINER_OF`.
622-
let this: Pin<&mut Self> = unsafe { Pin::new_unchecked(&mut *(work as *mut Self)) };
623-
this.as_action().act();
672+
/// Access the inner action.
673+
pub fn action(&self) -> &T {
674+
&self.action
624675
}
625676
}

0 commit comments

Comments
 (0)