From ab9367a53cccc636b339e7d38b640422e893bfbc Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 20 Feb 2025 14:53:37 -0700 Subject: [PATCH 01/11] zephyr: object: Implement new ZephyrObject wrapper Implement a new wrapping system for Zephyr Objects. Instead of trying to either use a static or a pinned-box, use an atomic to detect initialization, as well as improper moves. This allows for objects to have `const` constructors, allowing them to be declared as simple statics and just shared. Subsequent patches will implement this for various primitives. Signed-off-by: David Brown --- zephyr/src/object.rs | 149 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index 6f40ea09..3a1fe250 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -92,6 +92,151 @@ use alloc::boxed::Box; use crate::sync::atomic::{AtomicUsize, Ordering}; +/// ## Init/move safe objects +/// +/// In Rust code, many language features are designed around Rust's "move semantics". Because of +/// the borrow checker, the Rust compiler has full knowledge of when it is safe to move an object in +/// memory, as it will know that there are no references to it. +/// +/// However, most kernel objects in Zephyr contain self-referential pointers to those objects. The +/// traditional way to handle this in Rust is to use `Pin`. However, besides Pin being awkward to +/// use, it is generally assumed that the underlying objects will be dynamically allocated. It is +/// desirable to allow as much functionality of Zephyr to be used without explicitly requiring +/// alloc. +/// +/// The original solution (Wrapped), used a type `Fixed` that either referenced a static, or +/// contained a `Pin>` of the Zephyr kernel object. This introduces overhead for both the +/// enum as well as the actual reference itself. +/// +/// Some simple benchmarking has determined that it is just as efficient, or even slightly more so, +/// to represent each object instead as a `UnsafeCell` contaning the Zephyr object, and an atomic +/// pointer that can be used to determine the state of the object. +/// +/// This type is not intended for general use, but for the implementation of wrappers for Zephyr +/// types that require non-move semantics. +/// +/// # Safety +/// +/// The Zephyr APIs require that once objects have been initialized, they are not moved in memory. +/// To avoid the inconvenience of managing 'Pin' for most of these, we rely on a run-time detection +/// both of initialization and non-movement. It is fairly easy, as a user of an object in Rust to +/// avoid moving it. Generally, in an embedded system, objects will either live on the stack of a +/// single persistent thread, or will be statically allocated. Both of these cases will result in +/// objects that don't move. However, we want initialization to be able to happen on first _use_ +/// not when the constructor runs, because the semantics of most constructors invovles a move (even +/// if that is often optimized away). +/// +/// Note that this does not solve the case of objects that must not be moved even after the object +/// has a single Rust reference (threads, and work queues, notably, or timers with active +/// callbacks). +/// +/// To do this, each object is paired with an Atomic pointer. The pointer can exist in three state: +/// - null: The object has not been initialized. It is safe to move the object at this time. +/// - pointer that equals the addres of the object itself. Object has been initialized, and can be +/// used. It must not be moved. +/// - pointer that doesn't match the object. This indicates that the object was moved, and is +/// invalid. We treat this as a panic condition. +pub struct ZephyrObject { + state: AtomicUsize, + object: UnsafeCell, +} + +impl ZephyrObject +where + ZephyrObject: ObjectInit, +{ + /// Construct a new Zephyr Object. + /// + /// The 'init' function will be given a reference to the object. For objects that have + /// initialization parameters (specifically Semaphores), this can be used to hold those + /// parameters until the real init is called upon first use. + /// + /// The 'setup' function must not assume the address given to it will persist. The object can + /// be freely moved by Rust until the 'init' call has been called, which happens on first use. + pub const fn new_raw() -> Self { + Self { + state: AtomicUsize::new(0), + // SAFETY: It is safe to assume Zephyr objects can be zero initialized before calling + // their init. The atomic above will ensure that this is not used by any API other than + // the init call until it has been initialized. + object: UnsafeCell::new(unsafe { mem::zeroed() }), + } + } + + /// Get a reference, _without initializing_ the item. + /// + /// This is useful during a const constructor to be able to stash values in the item. + pub const fn get_uninit(&self) -> *mut T { + self.object.get() + } + + /// Get a reference to the underlying zephyr object, ensuring it has been initialized properly. + /// The method is unsafe, because the caller must ensure that the lifetime of `&self` is long + /// enough for the use of the raw pointer. + /// + /// # Safety + /// + /// If the object has not been initialized, It's 'init' method will be called. If the object + /// has been moved since `init` was called, this will panic. Otherwise, the caller must ensure + /// that the use of the returned pointer does not outlive the `&self`. + /// + /// The 'init' method will be called within a critical section, so should be careful to not + /// block, or take extra time. + pub unsafe fn get(&self) -> *mut T { + let addr = self.object.get(); + + // First, try reading the atomic relaxed. This makes the common case of the object being + // initialized faster, and we can double check after. + match self.state.load(Ordering::Relaxed) { + // Uninitialized. Falls through to the slower init case. + 0 => (), + // Initialized, and object has not been moved. + ptr if ptr == addr as usize => return addr, + _ => { + // Object was moved after init. + panic!("Object moved after init"); + } + } + + // Perform the possible initialization within a critical section to avoid a race and double + // initialization. + critical_section::with(|_| { + // Reload, with Acquire ordering to see a determined value. + let state = self.state.load(Ordering::Acquire); + + // If the address does match, an initialization got in before the critical section. + if state == addr as usize { + // Note, this returns from the closure, not the function, but this is ok, as the + // critical section result is the return result of the whole function. + return addr; + } else if state != 0 { + // Initialization got in, and then it was moved. This shouldn't happen without + // unsafe code, but it is easy to detect. + panic!("Object moved after init"); + } + + // Perform the initialization. + >::init(addr); + + self.state.store(addr as usize, Ordering::Release); + + addr + }) + } +} + +/// All `ZephyrObject`s must implement `ObjectInit` in order for first use to be able to initialize +/// the object. +pub trait ObjectInit { + /// Initialize the object. + /// + /// This is called upon first use. The address given may (and generally will) be different than + /// the initial address given to the `setup` call in the [`ZephyrObject::new`] constructor. + /// After this is called, all subsequent calls to [`ZephyrObject::get`] will return the same + /// address, or panic. + fn init(item: *mut T); +} + // The kernel object itself must be wrapped in `UnsafeCell` in Rust. This does several thing, but // the primary feature that we want to declare to the Rust compiler is that this item has "interior // mutability". One impact will be that the default linker section will be writable, even though @@ -100,6 +245,10 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; // the mutations happen from C code, so this is less important than the data being placed in the // proper section. Many will have the link section overridden by the `kobj_define` macro. +/// ## Old Wrapped objects +/// +/// The wrapped objects was the original approach to managing Zephyr objects. +/// /// Define the Wrapping of a kernel object. /// /// This trait defines the association between a static kernel object and the two associated Rust From e6a92df8d9042f735896d2c3ae7e0407b9a6db01 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 20 Feb 2025 14:55:39 -0700 Subject: [PATCH 02/11] zephyr: sys: sync: Make Semaphore constructors `const` Move Semaphores to the new `ZephyrObject` system, allowing the constructor to be const. Unfortunately, `Result` isn't generally useful with const, and as such, this changes the API for `Semaphore::new()` to just return the semaphore instead of a Result. Instead, if a semaphore constructor has inconsitent arguments, it will panic. Given that semaphore constructors are generally quite simple, this is a minor issue. However, it will involve removing a lot of `.unwrap()` or `.expect(...)` calls on semaphores. The samples have been changed for the API change, but mostly not changed to take advantage of the easier ways to use them. It is generally no longer necessary to put semaphores in an Arc or Rc, but just to allocate them in something like a StaticCell, and then pass them around by reference. Signed-off-by: David Brown --- samples/philosophers/src/dynsemsync.rs | 2 +- samples/philosophers/src/semsync.rs | 19 ++--- samples/work-philosophers/src/async_sem.rs | 2 +- zephyr/src/kio/sync.rs | 2 +- zephyr/src/sys/sync.rs | 2 +- zephyr/src/sys/sync/semaphore.rs | 89 +++++++++++----------- zephyr/src/work/futures.rs | 4 +- 7 files changed, 55 insertions(+), 65 deletions(-) diff --git a/samples/philosophers/src/dynsemsync.rs b/samples/philosophers/src/dynsemsync.rs index 9f43c0b6..2d77bdcd 100644 --- a/samples/philosophers/src/dynsemsync.rs +++ b/samples/philosophers/src/dynsemsync.rs @@ -35,7 +35,7 @@ impl ForkSync for SemSync { pub fn dyn_semaphore_sync() -> Vec> { let forks = [(); NUM_PHIL] .each_ref() - .map(|()| Arc::new(Semaphore::new(1, 1).unwrap())); + .map(|()| Arc::new(Semaphore::new(1, 1))); (0..NUM_PHIL) .map(|_| { diff --git a/samples/philosophers/src/semsync.rs b/samples/philosophers/src/semsync.rs index c8896fe1..18323274 100644 --- a/samples/philosophers/src/semsync.rs +++ b/samples/philosophers/src/semsync.rs @@ -10,7 +10,7 @@ extern crate alloc; use alloc::boxed::Box; use alloc::vec::Vec; -use zephyr::{kobj_define, sync::Arc, sys::sync::Semaphore, time::Forever}; +use zephyr::{sync::Arc, sys::sync::Semaphore, time::Forever}; use crate::{ForkSync, NUM_PHIL}; @@ -18,7 +18,7 @@ use crate::{ForkSync, NUM_PHIL}; pub struct SemSync { /// The forks for this philosopher. This is a big excessive, as we really don't need all of /// them, but the ForSync code uses the index here. - forks: [Arc; NUM_PHIL], + forks: &'static [Semaphore; NUM_PHIL], } impl ForkSync for SemSync { @@ -33,22 +33,15 @@ impl ForkSync for SemSync { #[allow(dead_code)] pub fn semaphore_sync() -> Vec> { - let forks = SEMS.each_ref().map(|m| { - // Each fork starts as taken. - Arc::new(m.init_once((1, 1)).unwrap()) - }); - (0..NUM_PHIL) .map(|_| { - let syncer = SemSync { - forks: forks.clone(), - }; + let syncer = SemSync { forks: &SEMS }; let item = Box::new(syncer) as Box; Arc::from(item) }) .collect() } -kobj_define! { - static SEMS: [StaticSemaphore; NUM_PHIL]; -} +// The state of const array initialization in rust as dreadful. There is array-init, which isn't +// const, and there is array-const-fn-init, which, for some reason is a proc macro. +static SEMS: [Semaphore; NUM_PHIL] = [const { Semaphore::new(1, 1) }; NUM_PHIL]; diff --git a/samples/work-philosophers/src/async_sem.rs b/samples/work-philosophers/src/async_sem.rs index b4528a8d..5c593d15 100644 --- a/samples/work-philosophers/src/async_sem.rs +++ b/samples/work-philosophers/src/async_sem.rs @@ -43,7 +43,7 @@ async fn local_phil() -> Stats { // One fork for each philospher. let forks: Vec<_> = (0..NUM_PHIL) - .map(|_| Rc::new(Semaphore::new(1, 1).unwrap())) + .map(|_| Rc::new(Semaphore::new(1, 1))) .collect(); // Create all of the philosphers diff --git a/zephyr/src/kio/sync.rs b/zephyr/src/kio/sync.rs index 8ef6ba70..769a05ca 100644 --- a/zephyr/src/kio/sync.rs +++ b/zephyr/src/kio/sync.rs @@ -62,7 +62,7 @@ impl Mutex { /// Construct a new Mutex. pub fn new(t: T) -> Mutex { Mutex { - inner: Semaphore::new(1, 1).unwrap(), + inner: Semaphore::new(1, 1), data: UnsafeCell::new(t), } } diff --git a/zephyr/src/sys/sync.rs b/zephyr/src/sys/sync.rs index ef88234b..80213066 100644 --- a/zephyr/src/sys/sync.rs +++ b/zephyr/src/sys/sync.rs @@ -34,4 +34,4 @@ pub mod mutex; pub mod semaphore; pub use mutex::{Condvar, Mutex, StaticCondvar, StaticMutex}; -pub use semaphore::{Semaphore, StaticSemaphore}; +pub use semaphore::Semaphore; diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs index a6c9095d..4e1bcb24 100644 --- a/zephyr/src/sys/sync/semaphore.rs +++ b/zephyr/src/sys/sync/semaphore.rs @@ -16,8 +16,6 @@ use core::fmt; #[cfg(CONFIG_RUST_ALLOC)] use core::future::Future; #[cfg(CONFIG_RUST_ALLOC)] -use core::mem; -#[cfg(CONFIG_RUST_ALLOC)] use core::pin::Pin; #[cfg(CONFIG_RUST_ALLOC)] use core::task::{Context, Poll}; @@ -27,22 +25,19 @@ use zephyr_sys::ETIMEDOUT; #[cfg(CONFIG_RUST_ALLOC)] use crate::kio::ContextExt; +use crate::object::{ObjectInit, ZephyrObject}; #[cfg(CONFIG_RUST_ALLOC)] use crate::time::NoWait; use crate::{ error::{to_result_void, Result}, - object::{Fixed, StaticKernelObject, Wrapped}, raw::{k_sem, k_sem_count_get, k_sem_give, k_sem_init, k_sem_reset, k_sem_take}, time::Timeout, }; pub use crate::raw::K_SEM_MAX_LIMIT; -/// A zephyr `k_sem` usable from safe Rust code. -pub struct Semaphore { - /// The raw Zephyr `k_sem`. - pub(crate) item: Fixed, -} +/// General Zephyr Semaphores +pub struct Semaphore(pub(crate) ZephyrObject); /// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr /// implementation (which is why Clone is also implemented). @@ -55,13 +50,27 @@ impl Semaphore { /// Create a new dynamically allocated Semaphore. This semaphore can only be used from system /// threads. The arguments are as described in [the /// docs](https://docs.zephyrproject.org/latest/kernel/services/synchronization/semaphores.html). + /// + /// Note that this API has changed, and it now doesn't return a Result, since the Result time + /// generally doesn't work (in stable rust) with const. #[cfg(CONFIG_RUST_ALLOC)] - pub fn new(initial_count: c_uint, limit: c_uint) -> Result { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + pub const fn new(initial_count: c_uint, limit: c_uint) -> Semaphore { + // Due to delayed init, we need to replicate the object checks in the C `k_sem_init`. + + if limit == 0 || initial_count > limit { + panic!("Invalid semaphore initialization"); + } + + let this = >::new_raw(); + unsafe { - to_result_void(k_sem_init(item.get(), initial_count, limit))?; + let addr = this.get_uninit(); + (*addr).count = initial_count; + (*addr).limit = limit; } - Ok(Semaphore { item }) + + // to_result_void(k_sem_init(item.get(), initial_count, limit))?; + Semaphore(this) } /// Take a semaphore. @@ -74,7 +83,7 @@ impl Semaphore { T: Into, { let timeout: Timeout = timeout.into(); - let ret = unsafe { k_sem_take(self.item.get(), timeout.0) }; + let ret = unsafe { k_sem_take(self.0.get(), timeout.0) }; to_result_void(ret) } @@ -98,7 +107,7 @@ impl Semaphore { /// This routine gives to the semaphore, unless the semaphore is already at its maximum /// permitted count. pub fn give(&self) { - unsafe { k_sem_give(self.item.get()) } + unsafe { k_sem_give(self.0.get()) } } /// Resets a semaphor's count to zero. @@ -108,14 +117,32 @@ impl Semaphore { /// /// [`take`]: Self::take pub fn reset(&self) { - unsafe { k_sem_reset(self.item.get()) } + unsafe { k_sem_reset(self.0.get()) } } /// Get a semaphore's count. /// /// Returns the current count. pub fn count_get(&self) -> usize { - unsafe { k_sem_count_get(self.item.get()) as usize } + unsafe { k_sem_count_get(self.0.get()) as usize } + } +} + +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_sem) { + // SAFEFY: Get the initial values used in new. The address may have changed, but only due + // to a move. + unsafe { + let count = (*item).count; + let limit = (*item).limit; + + if k_sem_init(item, count, limit) != 0 { + // Note that with delayed init, we cannot do anything with invalid values. We're + // replicated the check in `new` above, so would only catch semantic changes in the + // implementation of `k_sem_init`. + unreachable!(); + } + } } } @@ -153,36 +180,6 @@ impl<'a> Future for SemTake<'a> { } } -/// A static Zephyr `k_sem`. -/// -/// This is intended to be used from within the `kobj_define!` macro. It declares a static ksem -/// that will be properly registered with the Zephyr kernel object system. Call [`init_once`] to -/// get the [`Semaphore`] that is represents. -/// -/// [`init_once`]: StaticKernelObject::init_once -pub type StaticSemaphore = StaticKernelObject; - -unsafe impl Sync for StaticSemaphore {} - -impl Wrapped for StaticKernelObject { - type T = Semaphore; - - /// The initializer for Semaphores is the initial count, and the count limit (which can be - /// K_SEM_MAX_LIMIT, re-exported here. - type I = (c_uint, c_uint); - - // TODO: Thoughts about how to give parameters to the initialzation. - fn get_wrapped(&self, arg: Self::I) -> Semaphore { - let ptr = self.value.get(); - unsafe { - k_sem_init(ptr, arg.0, arg.1); - } - Semaphore { - item: Fixed::Static(ptr), - } - } -} - impl fmt::Debug for Semaphore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "sys::Semaphore") diff --git a/zephyr/src/work/futures.rs b/zephyr/src/work/futures.rs index 841da7ed..f21f9d32 100644 --- a/zephyr/src/work/futures.rs +++ b/zephyr/src/work/futures.rs @@ -68,7 +68,7 @@ impl Answer { pub fn new() -> Self { Self { item: Mutex::new(None), - wake: Semaphore::new(0, 1).expect("Initialize semaphore"), + wake: Semaphore::new(0, 1), } } @@ -307,7 +307,7 @@ impl WakeInfo { ev.get(), ZR_POLL_TYPE_SEM_AVAILABLE, k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32, - sem.item.get() as *mut c_void, + sem.0.get() as *mut c_void, ); } } From d59d4c0d586d6b9ad5155efec648736d031ef37d Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 21 Feb 2025 11:02:05 -0700 Subject: [PATCH 03/11] zephyr: Convert `Queue` to new atomic initializer up Convert to the new atomic initializer format. This makes the `new()` constructor `const` allowing queues to be used as statics. Similar to the API change to Semaphore, the constructor no longer returns a Result, just the Queue directly. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 6 ++-- zephyr/src/sys/queue.rs | 57 ++++++++------------------------------ zephyr/src/work/futures.rs | 2 +- 3 files changed, 16 insertions(+), 49 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index c39c090b..53cc6076 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -93,7 +93,7 @@ pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { /// receivers will likely be blocked forever. Any data that has been queued and not received will /// be leaked when all receivers have been droped. pub fn unbounded() -> (Sender, Receiver) { - unbounded_from(Queue::new().unwrap()) + unbounded_from(Queue::new()) } /// Create a multi-producer multi-consumer channel with bounded capacity. @@ -582,8 +582,8 @@ impl Bounded { .collect(); let slots = Box::into_pin(slots); - let free = Queue::new().unwrap(); - let chan = Queue::new().unwrap(); + let free = Queue::new(); + let chan = Queue::new(); // Add each of the boxes to the free list. for slot in slots.as_ref().iter() { diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs index c363f889..e546fad1 100644 --- a/zephyr/src/sys/queue.rs +++ b/zephyr/src/sys/queue.rs @@ -6,22 +6,14 @@ use core::ffi::c_void; use core::fmt; -#[cfg(CONFIG_RUST_ALLOC)] -use core::mem; use zephyr_sys::{k_queue, k_queue_append, k_queue_get, k_queue_init}; -#[cfg(CONFIG_RUST_ALLOC)] -use crate::error::Result; -use crate::object::{Fixed, StaticKernelObject, Wrapped}; +use crate::object::{ObjectInit, ZephyrObject}; use crate::time::Timeout; /// A wrapper around a Zephyr `k_queue` object. -pub struct Queue { - pub(crate) item: Fixed, -} - -unsafe impl Sync for StaticKernelObject {} +pub struct Queue(pub(crate) ZephyrObject); unsafe impl Sync for Queue {} unsafe impl Send for Queue {} @@ -33,13 +25,8 @@ impl Queue { /// /// **Note**: When a Queue is dropped, any messages that have been added to the queue will be /// leaked. - #[cfg(CONFIG_RUST_ALLOC)] - pub fn new() -> Result { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); - unsafe { - k_queue_init(item.get()); - } - Ok(Queue { item }) + pub const fn new() -> Queue { + Queue(>::new_raw()) } /// Append an element to the end of a queue. @@ -61,7 +48,7 @@ impl Queue { /// These are easiest to satisfy by ensuring the message is Boxed, and owned by the queue /// system. pub unsafe fn send(&self, data: *mut c_void) { - k_queue_append(self.item.get(), data) + k_queue_append(self.0.get(), data) } /// Get an element from a queue. @@ -84,42 +71,22 @@ impl Queue { T: Into, { let timeout: Timeout = timeout.into(); - k_queue_get(self.item.get(), timeout.0) + k_queue_get(self.0.get(), timeout.0) } } -impl Wrapped for StaticKernelObject { - type T = Queue; - - type I = (); - - fn get_wrapped(&self, _arg: Self::I) -> Queue { - let ptr = self.value.get(); +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_queue) { + // SAFETY: ZephyrObject handles initialization and move prevention. unsafe { - k_queue_init(ptr); - } - Queue { - item: Fixed::Static(ptr), + k_queue_init(item); } } } -/// A statically defined Zephyr `k_queue`. -/// -/// This should be declared as follows: -/// ``` -/// kobj_define! { -/// static MY_QUEUE: StaticQueue; -/// } -/// -/// let my_queue = MY_QUEUE.init_once(()); -/// -/// my_queue.send(...); -/// ``` -pub type StaticQueue = StaticKernelObject; - impl fmt::Debug for Queue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Queue {:?}", self.item.get()) + // SAFETY: Just getting the address to print. + write!(f, "sys::Queue {:?}", unsafe { self.0.get() }) } } diff --git a/zephyr/src/work/futures.rs b/zephyr/src/work/futures.rs index f21f9d32..be4e47d6 100644 --- a/zephyr/src/work/futures.rs +++ b/zephyr/src/work/futures.rs @@ -339,7 +339,7 @@ impl WakeInfo { ev.get(), ZR_POLL_TYPE_DATA_AVAILABLE, k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32, - queue.item.get() as *mut c_void, + queue.0.get() as *mut c_void, ); } } From 152bbd5f78f6d8f23c10d832bba87d2efd5e4357 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 21 Feb 2025 11:08:14 -0700 Subject: [PATCH 04/11] samples: bench: fix for API change Adapt to the API change in both Semaphore and Queue. This still allocates using Arc, though. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 7e0f4ede..10e8a6ae 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -162,10 +162,10 @@ impl ThreadTests { let mut thread_commands = Vec::new(); for _ in 0..count { - let sem = Arc::new(Semaphore::new(0, u32::MAX).unwrap()); + let sem = Arc::new(Semaphore::new(0, u32::MAX)); result.sems.push(sem.clone()); - let sem = Arc::new(Semaphore::new(0, u32::MAX).unwrap()); + let sem = Arc::new(Semaphore::new(0, u32::MAX)); result.back_sems.push(sem); let chans = ChanPair::new_bounded(1); @@ -902,7 +902,7 @@ impl SimpleMain { fn new(count: usize, workq: Arc) -> Self { Self { locked: SpinMutex::new(Locked::new(count)), - done: Semaphore::new(0, 1).unwrap(), + done: Semaphore::new(0, 1), workq, } } @@ -953,7 +953,7 @@ fn spin_bench() { #[no_mangle] fn sem_bench() { let iters = 10_000; - let sem = Semaphore::new(iters as u32, iters as u32).unwrap(); + let sem = Semaphore::new(iters as u32, iters as u32); let timer = BenchTimer::new("Semaphore take", iters); for _ in 0..iters { sem.take(Forever).unwrap(); From 234379ed96dc42338b133ed562d20bdf8a93486c Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 21 Feb 2025 14:16:52 -0700 Subject: [PATCH 05/11] zepyr: sys: sync: Mutex/Condvar Update the sys Mutex/Condvar to use the new object initialization, and update the uses. This requires a few fixes: - The SimpleTls no longer needs the StaticTls helper, and we can just use a plain global static Mutex. - Bounded channels add items to a free queue during init. To avoid moving this queue after items have been inserted, we place it in a box. Given that the data of bounded queues is also boxed, this isn't really a concern. Signed-off-by: David Brown --- zephyr/src/simpletls.rs | 72 ------------------------------- zephyr/src/sync/channel.rs | 9 ++-- zephyr/src/sync/mutex.rs | 11 ++--- zephyr/src/sys/sync/mutex.rs | 83 ++++++++++++------------------------ zephyr/src/work.rs | 8 ++-- 5 files changed, 44 insertions(+), 139 deletions(-) diff --git a/zephyr/src/simpletls.rs b/zephyr/src/simpletls.rs index 1e8f3cfa..9d920e51 100644 --- a/zephyr/src/simpletls.rs +++ b/zephyr/src/simpletls.rs @@ -5,14 +5,9 @@ extern crate alloc; -use core::{ptr, sync::atomic::Ordering}; - -use alloc::boxed::Box; use alloc::vec::Vec; use zephyr_sys::{k_current_get, k_thread}; -use crate::sync::{atomic::AtomicPtr, Mutex}; - /// A container for simple thread local storage. /// /// This will maintain a mapping between Zephyr threads and a value of type T. Entries will have to @@ -55,70 +50,3 @@ impl SimpleTls { .map(|pos| self.map[pos].1) } } - -/// A helper to safely use these with static. -/// -/// The StaticTls type has a constant constructor, and the same insert and get methods as the -/// underlying SimpleTls, with support for initializing the Mutex as needed. -// TODO: This should eventually make it into a more general lazy mechanism. -pub struct StaticTls { - /// The container for the data. - /// - /// The AtomicPtr is either Null, or contains a raw pointer to the underlying Mutex holding the - /// data. - data: AtomicPtr>>, -} - -impl StaticTls { - /// Create a new StaticTls that is empty. - pub const fn new() -> Self { - Self { - data: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Get the underlying Mutex out of the data, initializing it with an empty type if necessary. - fn get_inner(&self) -> &Mutex> { - let data = self.data.fetch_update( - // TODO: These orderings are likely stronger than necessary. - Ordering::SeqCst, - Ordering::SeqCst, - |ptr| { - if ptr.is_null() { - // For null, we need to allocate a new one. - let data = Box::new(Mutex::new(SimpleTls::new())); - Some(Box::into_raw(data)) - } else { - // If there was already a value, just use it. - None - } - }, - ); - let data = match data { - Ok(_) => { - // If the update stored something, it unhelpfully returns the old value, which was - // the null pointer. Since the pointer will only ever be updated once, it is safe - // to use a relaxed load here. - self.data.load(Ordering::Relaxed) - } - // If there was already a pointer, that is what we want. - Err(ptr) => ptr, - }; - - // SAFETY: The stored data was updated at most once, by the above code, and we now have a - // pointer to a valid leaked box holding the data. - unsafe { &*data } - } - - /// Insert a new association into the StaticTls. - pub fn insert(&self, thread: *const k_thread, data: T) { - let inner = self.get_inner(); - inner.lock().unwrap().insert(thread, data); - } - - /// Lookup the data associated with a given thread. - pub fn get(&self) -> Option { - let inner = self.get_inner(); - inner.lock().unwrap().get() - } -} diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 53cc6076..f76d6a4b 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -569,8 +569,11 @@ struct Bounded { /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware /// of. MaybeUninit allows us to create this without allocation. _slots: Pin]>>, - /// The free queue, holds messages that aren't be used. - free: Queue, + /// The free queue, holds messages that aren't be used. The free queue has to be in a box, + /// because it cannot move after the constructor runs. The chan is fine to wait until first + /// use, when the object has settled. As we are also boxing the messages, this isn't really + /// that costly. + free: Box, /// The channel queue. These are messages that have been sent and are waiting to be received. chan: Queue, } @@ -582,7 +585,7 @@ impl Bounded { .collect(); let slots = Box::into_pin(slots); - let free = Queue::new(); + let free = Box::new(Queue::new()); let chan = Queue::new(); // Add each of the boxes to the free list. diff --git a/zephyr/src/sync/mutex.rs b/zephyr/src/sync/mutex.rs index 2e5b2886..860c51c4 100644 --- a/zephyr/src/sync/mutex.rs +++ b/zephyr/src/sync/mutex.rs @@ -98,9 +98,8 @@ impl Mutex { } /// Construct a new Mutex, dynamically allocating the underlying sys Mutex. - #[cfg(CONFIG_RUST_ALLOC)] - pub fn new(t: T) -> Mutex { - Mutex::new_from(t, sys::Mutex::new().unwrap()) + pub const fn new(t: T) -> Mutex { + Mutex::new_from(t, sys::Mutex::new()) } } @@ -164,7 +163,9 @@ impl DerefMut for MutexGuard<'_, T> { impl Drop for MutexGuard<'_, T> { #[inline] fn drop(&mut self) { - self.lock.inner.unlock().unwrap(); + if let Err(e) = self.lock.inner.unlock() { + panic!("Problem unlocking MutexGuard in drop: {:?}", e); + } } } @@ -195,7 +196,7 @@ impl Condvar { /// Construct a new Condvar, dynamically allocating the underlying Zephyr `k_condvar`. #[cfg(CONFIG_RUST_ALLOC)] pub fn new() -> Condvar { - Condvar::new_from(sys::Condvar::new().unwrap()) + Condvar::new_from(sys::Condvar::new()) } /// Blocks the current thread until this conditional variable receives a notification. diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index ae19723b..0cfa7d06 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -8,7 +8,7 @@ //! //! [`object`]: crate::object -use crate::object::{Fixed, StaticKernelObject, Wrapped}; +use crate::object::{ObjectInit, StaticKernelObject, ZephyrObject}; use crate::sys::K_FOREVER; use crate::{ error::{to_result_void, Result}, @@ -19,8 +19,6 @@ use crate::{ time::Timeout, }; use core::fmt; -#[cfg(CONFIG_RUST_ALLOC)] -use core::mem; /// A Zephyr `k_mutux` usable from safe Rust code. /// @@ -46,20 +44,15 @@ use core::mem; /// [`sync::Mutex`]: http://example.com/TODO pub struct Mutex { /// The raw Zephyr mutex. - item: Fixed, + item: ZephyrObject, } impl Mutex { /// Create a new Mutex in an unlocked state. /// /// Create a new dynamically allocated Mutex. The Mutex can only be used from system threads. - #[cfg(CONFIG_RUST_ALLOC)] - pub fn new() -> Result { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); - unsafe { - to_result_void(k_mutex_init(item.get()))?; - } - Ok(Mutex { item }) + pub const fn new() -> Mutex { + Mutex { item: >::new_raw() } } /// Lock a Zephyr Mutex. @@ -84,6 +77,15 @@ impl Mutex { } } +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_mutex) { + // SAFETY: ZephyrObject handles initialization and move prevention. + unsafe { + k_mutex_init(item); + } + } +} + /// A static Zephyr `k_mutex` /// /// This is intended to be used from within the `kobj_define!` macro. It declares a static @@ -101,26 +103,10 @@ unsafe impl Send for Mutex {} unsafe impl Sync for StaticMutex {} unsafe impl Send for StaticMutex {} -impl Wrapped for StaticKernelObject { - type T = Mutex; - - /// Mutex initializers take no argument. - type I = (); - - fn get_wrapped(&self, _arg: Self::I) -> Mutex { - let ptr = self.value.get(); - unsafe { - k_mutex_init(ptr); - } - Mutex { - item: Fixed::Static(ptr), - } - } -} - impl fmt::Debug for Mutex { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Mutex {:?}", self.item.get()) + // SAFETY: Address is just gotten to print for diagnostics. + write!(f, "sys::Mutex {:?}", unsafe { self.item.get() }) } } @@ -128,8 +114,7 @@ impl fmt::Debug for Mutex { /// /// Lightweight wrappers for Zephyr's `k_condvar`. pub struct Condvar { - /// The underlying `k_condvar`. - item: Fixed, + item: ZephyrObject, } #[doc(hidden)] @@ -144,13 +129,8 @@ impl Condvar { /// Create a new Condvar. /// /// Create a new dynamically allocated Condvar. The Condvar can only be used from system threads. - #[cfg(CONFIG_RUST_ALLOC)] - pub fn new() -> Result { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); - unsafe { - to_result_void(k_condvar_init(item.get()))?; - } - Ok(Condvar { item }) + pub const fn new() -> Condvar { + Condvar { item: >::new_raw() } } /// Wait for someone else using this mutex/condvar pair to notify. @@ -184,25 +164,18 @@ impl Condvar { } } -impl fmt::Debug for Condvar { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Condvar {:?}", self.item.get()) +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_condvar) { + // SAFETY: ZephyrObject handles initialization and move prevention. + unsafe { + k_condvar_init(item); + } } } -impl Wrapped for StaticCondvar { - type T = Condvar; - - /// Condvar initializers take no argument. - type I = (); - - fn get_wrapped(&self, _arg: Self::I) -> Condvar { - let ptr = self.value.get(); - unsafe { - k_condvar_init(ptr); - } - Condvar { - item: Fixed::Static(ptr), - } +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: Just getting the address to print. + write!(f, "sys::Condvar {:?}", unsafe { self.item.get() }) } } diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 17e28e0d..cc7d6b83 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -194,7 +194,7 @@ use zephyr_sys::{ }; use crate::{ - error::to_result_void, kio::ContextExt, object::Fixed, simpletls::StaticTls, sync::Arc, + error::to_result_void, kio::ContextExt, object::Fixed, simpletls::SimpleTls, sync::{Arc, Mutex}, sys::thread::ThreadStack, time::Timeout, }; @@ -275,7 +275,7 @@ impl WorkQueueBuilder { // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is // a pointer into this work item, which will not move, because of the Fixed. let this = &mut *item.get(); - WORK_QUEUES.insert(&this.thread, WorkQueueRef(item.get())); + WORK_QUEUES.lock().unwrap().insert(&this.thread, WorkQueueRef(item.get())); // SAFETY: Start work queue thread. The main issue here is that the work queue cannot // be deallocated once the thread has started. We enforce this by making Drop panic. @@ -340,7 +340,7 @@ impl Drop for WorkQueue { /// /// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit /// manually right now. -static WORK_QUEUES: StaticTls = StaticTls::new(); +static WORK_QUEUES: Mutex> = Mutex::new(SimpleTls::new()); /// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't /// implement stop. @@ -353,7 +353,7 @@ unsafe impl Sync for WorkQueueRef {} /// Retrieve the current work queue, if we are running within one. pub fn get_current_workq() -> Option<*mut k_work_q> { - WORK_QUEUES.get().map(|wq| wq.0) + WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0) } /// A Rust wrapper for `k_poll_signal`. From 52cd270239cf20a27872c8b10eb5512dd4450090 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Feb 2025 14:27:15 -0700 Subject: [PATCH 06/11] samples: bench: Remove the 'basesem' prototype This was an experiment to see if using an atomic to manage semaphore initialization was workable. Now that the main Semaphore implementation works this way, this can just be removed. Signed-off-by: David Brown --- samples/bench/src/basesem.rs | 117 ----------------------------------- samples/bench/src/lib.rs | 36 ----------- 2 files changed, 153 deletions(-) delete mode 100644 samples/bench/src/basesem.rs diff --git a/samples/bench/src/basesem.rs b/samples/bench/src/basesem.rs deleted file mode 100644 index 20350df9..00000000 --- a/samples/bench/src/basesem.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Base Semaphore -//! -//! This is an experiment into a different approach to Zephyr kernel objects. -//! -//! Currently, these kernel objects are directed through "Fixed", which is an enum referencing with -//! a pointer to something static declared, or to a `Pin>>`. This was done in an -//! attempt to keep things performant, but we actually always still end up with both an enum -//! discriminant, as well as an extra indirection for the static one. -//! -//! The deep issue here is that Zephyr objects inherently cannot be moved. Zephyr uses a `dlist` -//! structure in most objects that has a pointer back to itself to indicate the empty list. -//! -//! To work around this, we will implement objects as a pairing of an `AtomicUsize` and a -//! `UnsafeCell` (for whatever underlying type). The atomic will go through a small number -//! of states: -//! -//! - 0: indicates that this object is uninitialized. -//! - ptr: where ptr is the address of Self for an initialized object. -//! -//! On each use, the atomic value can be read (Relaxed is fine here), and if a 0 is seen, perform an -//! initialization. The initialization will lock a simple critical section, checking the atomic -//! again, to make sure it didn't get initialized by something intercepting it. If the check sees a -//! 'ptr' value that is not the same as Self, it indicates the object has been moved after -//! initialization, and will simply panic. - -// To measure performance, this module implements this for `k_sem` without abstractions around it. -// The idea is to compare performance with the above `Fixed` implementation. - -use core::{cell::UnsafeCell, ffi::c_uint, mem, sync::atomic::Ordering}; - -use zephyr::Result; -use zephyr::{ - error::to_result_void, - raw::{k_sem, k_sem_give, k_sem_init, k_sem_take}, - sync::atomic::AtomicUsize, - time::Timeout, -}; - -pub struct Semaphore { - state: AtomicUsize, - item: UnsafeCell, -} - -// SAFETY: These are both Send and Sync. The semaphore itself is safe, and the atomic+critical -// section protects the state. -unsafe impl Send for Semaphore {} -unsafe impl Sync for Semaphore {} - -impl Semaphore { - /// Construct a new semaphore, with the given initial_count and limit. There is a bit of - /// trickery to pass the initial values through to the initializer, but otherwise this is just a - /// basic initialization. - pub fn new(initial_count: c_uint, limit: c_uint) -> Semaphore { - let this = Self { - state: AtomicUsize::new(0), - item: unsafe { UnsafeCell::new(mem::zeroed()) }, - }; - - // Set the initial count and limit in the semaphore to use for later initialization. - unsafe { - let ptr = this.item.get(); - (*ptr).count = initial_count; - (*ptr).limit = limit; - } - - this - } - - /// Get the raw pointer, initializing the `k_sem` if needed. - fn get(&self) -> *mut k_sem { - // First load can be relaxed, for performance reasons. If it is seen as uninitialized, the - // below Acquire load will see the correct value. - let state = self.state.load(Ordering::Relaxed); - if state == self as *const Self as usize { - return self.item.get(); - } else if state != 0 { - panic!("Semaphore was moved after first use"); - } - - critical_section::with(|_| { - // Reload, with Acquire ordering to see a determined value. - let state = self.state.load(Ordering::Acquire); - if state == self as *const Self as usize { - return self.item.get(); - } else if state != 0 { - panic!("Semaphore was moved after first use"); - } - - // Perform the initialization. We're within the critical section, and know that nobody - // could be using this. - unsafe { - let ptr = self.item.get(); - let initial_count = (*ptr).count; - let limit = (*ptr).limit; - - k_sem_init(ptr, initial_count, limit); - } - - self.state - .store(self as *const Self as usize, Ordering::Release); - self.item.get() - }) - } - - /// Synchronous take. - pub fn take(&self, timeout: impl Into) -> Result<()> { - let timeout: Timeout = timeout.into(); - let ptr = self.get(); - let ret = unsafe { k_sem_take(ptr, timeout.0) }; - to_result_void(ret) - } - - pub fn give(&self) { - let ptr = self.get(); - unsafe { k_sem_give(ptr) }; - } -} diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 10e8a6ae..dfc25426 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -33,8 +33,6 @@ use zephyr::{ work::{WorkQueue, WorkQueueBuilder}, }; -mod basesem; - /// How many threads to run in the tests. const NUM_THREADS: usize = 6; @@ -61,7 +59,6 @@ extern "C" fn rust_main() { } tester.run(Command::Empty); - tester.run(Command::BaseSemPingPong(NUM_THREADS, 10_000)); tester.run(Command::SimpleSem(10_000)); tester.run(Command::SimpleSemAsync(10_000)); tester.run(Command::SimpleSemYield(10_000)); @@ -121,10 +118,6 @@ struct ThreadTests { /// The test also all return their result to the main. The threads Send, the main running /// receives. results: ChanPair, - - /// For the base sem test, just use these, which are just shared by reference. - forward: basesem::Semaphore, - reverse: basesem::Semaphore, } impl ThreadTests { @@ -155,8 +148,6 @@ impl ThreadTests { low_command: low_send, high_command: high_send, workq, - forward: basesem::Semaphore::new(0, 1), - reverse: basesem::Semaphore::new(0, 1), }; let mut thread_commands = Vec::new(); @@ -449,11 +440,6 @@ impl ThreadTests { continue; } } - - Command::BaseSemPingPong(_nthread, count) => { - this.base_worker(count); - total = count; - } } this.results @@ -639,23 +625,6 @@ impl ThreadTests { let _ = this; } - fn base_worker(&self, count: usize) { - for _ in 0..count { - self.forward.take(Forever).unwrap(); - self.reverse.give(); - } - } - - // In the low runner, does the ping-pong with each. - fn base_replier(&self, nthread: usize, count: usize) { - for _ in 0..count { - for _ in 0..nthread { - self.forward.give(); - self.reverse.take(Forever).unwrap(); - } - } - } - /// And the low priority worker. fn low_runner(this: Arc, command: Receiver) { let _ = this; @@ -685,9 +654,6 @@ impl ThreadTests { } Command::SemPingPongAsync(_) => (), Command::SemOnePingPongAsync(_, _) => (), - Command::BaseSemPingPong(nthread, count) => { - this.base_replier(nthread, count); - } } // printkln!("low command: {:?}", cmd); @@ -720,7 +686,6 @@ impl ThreadTests { } Command::SemPingPongAsync(_) => (), Command::SemOnePingPongAsync(_, _) => (), - Command::BaseSemPingPong(_, _) => (), } // printkln!("high command: {:?}", cmd); @@ -779,7 +744,6 @@ enum Command { SemOnePingPong(usize), /// Same as SemOnePingPong, but async. The first parameter is the number of async tasks. SemOnePingPongAsync(usize, usize), - BaseSemPingPong(usize, usize), } enum Result { From 2bca1013c43d349d6eab20355a52ca3dd53a1487 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Feb 2025 14:45:56 -0700 Subject: [PATCH 07/11] samples: bench: Reduce benchmark iterations Drop the iterations so not as much time is spent in CI. This still gives meaning functional results for CI, and the increased value can be used for actual benchmarking. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 51 ++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index dfc25426..f5ef673d 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -42,6 +42,11 @@ const THREAD_STACK_SIZE: usize = 2048; /// Stack size to use for the work queue. const WORK_STACK_SIZE: usize = 2048; +/// This is a global iteration. Small numbers still test functionality within CI, and large numbers +/// give more meaningful benchmark results. +const TOTAL_ITERS: usize = 1_000; +// const TOTAL_ITERS: usize = 10_000; + #[no_mangle] extern "C" fn rust_main() { let tester = ThreadTests::new(NUM_THREADS); @@ -54,33 +59,33 @@ extern "C" fn rust_main() { let simple = Simple::new(tester.workq.clone()); let mut num = 6; while num < 500 { - simple.run(num, 10_000 / num); + simple.run(num, TOTAL_ITERS / num); num = num * 13 / 10; } tester.run(Command::Empty); - tester.run(Command::SimpleSem(10_000)); - tester.run(Command::SimpleSemAsync(10_000)); - tester.run(Command::SimpleSemYield(10_000)); - tester.run(Command::SimpleSemYieldAsync(10_000)); - tester.run(Command::SemWait(10_000)); - tester.run(Command::SemWaitAsync(10_000)); - tester.run(Command::SemWaitSameAsync(10_000)); - tester.run(Command::SemHigh(10_000)); - tester.run(Command::SemPingPong(10_000)); - tester.run(Command::SemPingPongAsync(10_000)); - tester.run(Command::SemOnePingPong(10_000)); + tester.run(Command::SimpleSem(TOTAL_ITERS)); + tester.run(Command::SimpleSemAsync(TOTAL_ITERS)); + tester.run(Command::SimpleSemYield(TOTAL_ITERS)); + tester.run(Command::SimpleSemYieldAsync(TOTAL_ITERS)); + tester.run(Command::SemWait(TOTAL_ITERS)); + tester.run(Command::SemWaitAsync(TOTAL_ITERS)); + tester.run(Command::SemWaitSameAsync(TOTAL_ITERS)); + tester.run(Command::SemHigh(TOTAL_ITERS)); + tester.run(Command::SemPingPong(TOTAL_ITERS)); + tester.run(Command::SemPingPongAsync(TOTAL_ITERS)); + tester.run(Command::SemOnePingPong(TOTAL_ITERS)); /* - tester.run(Command::SemOnePingPongAsync(NUM_THREADS, 10_000 / 6)); - tester.run(Command::SemOnePingPongAsync(20, 10_000 / 20)); - tester.run(Command::SemOnePingPongAsync(50, 10_000 / 50)); - tester.run(Command::SemOnePingPongAsync(100, 10_000 / 100)); - tester.run(Command::SemOnePingPongAsync(500, 10_000 / 500)); + tester.run(Command::SemOnePingPongAsync(NUM_THREADS, TOTAL_ITERS / 6)); + tester.run(Command::SemOnePingPongAsync(20, TOTAL_ITERS / 20)); + tester.run(Command::SemOnePingPongAsync(50, TOTAL_ITERS / 50)); + tester.run(Command::SemOnePingPongAsync(100, TOTAL_ITERS / 100)); + tester.run(Command::SemOnePingPongAsync(500, TOTAL_ITERS / 500)); tester.run(Command::Empty); */ let mut num = 6; - while num < 500 { - tester.run(Command::SemOnePingPongAsync(num, 10_000 / num)); + while num < 100 { + tester.run(Command::SemOnePingPongAsync(num, TOTAL_ITERS / num)); num = num * 13 / 10; } @@ -889,8 +894,8 @@ impl Locked { /// Benchmark the performance of Arc. fn arc_bench() { let thing = Arc::new(123); - let timer = BenchTimer::new("Arc clone+drop", 10_000); - for _ in 0..10_000 { + let timer = BenchTimer::new("Arc clone+drop", TOTAL_ITERS); + for _ in 0..TOTAL_ITERS { let _ = thing.clone(); } timer.stop(); @@ -900,7 +905,7 @@ fn arc_bench() { #[inline(never)] #[no_mangle] fn spin_bench() { - let iters = 10_000; + let iters = TOTAL_ITERS; let thing = SpinMutex::new(123); let timer = BenchTimer::new("SpinMutex lock", iters); for _ in 0..iters { @@ -916,7 +921,7 @@ fn spin_bench() { #[inline(never)] #[no_mangle] fn sem_bench() { - let iters = 10_000; + let iters = TOTAL_ITERS; let sem = Semaphore::new(iters as u32, iters as u32); let timer = BenchTimer::new("Semaphore take", iters); for _ in 0..iters { From 70fe87c233f0fbc838a0b6cf337600752871f91b Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Feb 2025 16:02:40 -0700 Subject: [PATCH 08/11] samples: bench: Make Semaphores static Although there is still a lot of allocation left (the crossbeam API for channels is kind of built around it), convert the Semaphores in the benchmark to be static. This reduces overhead a little bit, because the reference counts of Arc don't need to be maintained, and also demonstrates how StaticCell can be used to allocate things like arrays of objects statically. Signed-off-by: David Brown --- samples/bench/Cargo.toml | 2 ++ samples/bench/src/lib.rs | 71 ++++++++++++++++++++++------------------ 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/samples/bench/Cargo.toml b/samples/bench/Cargo.toml index 8ddbb884..8088507b 100644 --- a/samples/bench/Cargo.toml +++ b/samples/bench/Cargo.toml @@ -15,6 +15,8 @@ crate-type = ["staticlib"] [dependencies] zephyr = "0.1.0" critical-section = "1.1.2" +heapless = "0.8" +static_cell = "2.1" # Dependencies that are used by build.rs. [build-dependencies] diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index f5ef673d..842f3a8b 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -15,6 +15,7 @@ use core::pin::Pin; use alloc::collections::vec_deque::VecDeque; use alloc::vec; use alloc::vec::Vec; +use static_cell::StaticCell; use zephyr::sync::SpinMutex; use zephyr::time::NoWait; use zephyr::work::futures::work_size; @@ -47,6 +48,9 @@ const WORK_STACK_SIZE: usize = 2048; const TOTAL_ITERS: usize = 1_000; // const TOTAL_ITERS: usize = 10_000; +/// A heapless::Vec, with a maximum size of the number of threads chosen above. +type HeaplessVec = heapless::Vec; + #[no_mangle] extern "C" fn rust_main() { let tester = ThreadTests::new(NUM_THREADS); @@ -101,17 +105,17 @@ extern "C" fn rust_main() { /// low priority task is providing the data. struct ThreadTests { /// Each test thread gets a semaphore, to use as appropriate for that test. - sems: Vec>, + sems: HeaplessVec<&'static Semaphore>, /// This semaphore is used to ping-ping back to another thread. - back_sems: Vec>, + back_sems: HeaplessVec<&'static Semaphore>, /// Each test also has a message queue, for testing, that it has sender and receiver for. - chans: Vec>, + chans: HeaplessVec>, /// In addition, each test has a channel it owns the receiver for that listens for commands /// about what test to run. - commands: Vec>, + commands: HeaplessVec>, /// Low and high also take commands. low_command: Sender, @@ -145,10 +149,10 @@ impl ThreadTests { let _ = Arc::into_raw(workq.clone()); let mut result = Self { - sems: Vec::new(), - back_sems: Vec::new(), - chans: Vec::new(), - commands: Vec::new(), + sems: HeaplessVec::new(), + back_sems: HeaplessVec::new(), + chans: HeaplessVec::new(), + commands: HeaplessVec::new(), results: ChanPair::new_unbounded(), low_command: low_send, high_command: high_send, @@ -157,18 +161,21 @@ impl ThreadTests { let mut thread_commands = Vec::new(); - for _ in 0..count { - let sem = Arc::new(Semaphore::new(0, u32::MAX)); - result.sems.push(sem.clone()); + static SEMS: [StaticCell; NUM_THREADS] = [const { StaticCell::new() }; NUM_THREADS]; + static BACK_SEMS: [StaticCell; NUM_THREADS] = [const { StaticCell::new() }; NUM_THREADS]; - let sem = Arc::new(Semaphore::new(0, u32::MAX)); - result.back_sems.push(sem); + for i in 0..count { + let sem = SEMS[i].init(Semaphore::new(0, u32::MAX)); + result.sems.push(sem).unwrap(); + + let sem = BACK_SEMS[i].init(Semaphore::new(0, u32::MAX)); + result.back_sems.push(sem).unwrap(); let chans = ChanPair::new_bounded(1); - result.chans.push(chans.clone()); + result.chans.push(chans.clone()).unwrap(); let (csend, crecv) = bounded(1); - result.commands.push(csend); + result.commands.push(csend).unwrap(); thread_commands.push(crecv); } @@ -213,8 +220,8 @@ impl ThreadTests { work_size(Self::ping_pong_worker_async( result.clone(), 0, - result.sems[0].clone(), - result.back_sems[0].clone(), + result.sems[0], + result.back_sems[0], 6 )) ); @@ -348,7 +355,7 @@ impl ThreadTests { // ourselves. Command::SimpleSemAsync(count) => { spawn( - Self::simple_sem_async(this.clone(), id, this.sems[id].clone(), count), + Self::simple_sem_async(this.clone(), id, this.sems[id], count), &this.workq, c"worker", ); @@ -360,7 +367,7 @@ impl ThreadTests { Self::simple_sem_yield_async( this.clone(), id, - this.sems[id].clone(), + this.sems[id], count, ), &this.workq, @@ -371,7 +378,7 @@ impl ThreadTests { Command::SemWaitAsync(count) => { spawn( - Self::sem_take_async(this.clone(), id, this.sems[id].clone(), count), + Self::sem_take_async(this.clone(), id, this.sems[id], count), &this.workq, c"worker", ); @@ -380,7 +387,7 @@ impl ThreadTests { Command::SemWaitSameAsync(count) => { spawn( - Self::sem_take_async(this.clone(), id, this.sems[id].clone(), count), + Self::sem_take_async(this.clone(), id, this.sems[id], count), &this.workq, c"worker", ); @@ -399,8 +406,8 @@ impl ThreadTests { Self::ping_pong_worker_async( this.clone(), id, - this.sems[id].clone(), - this.back_sems[id].clone(), + this.sems[id], + this.back_sems[id], count, ), &this.workq, @@ -424,8 +431,8 @@ impl ThreadTests { Self::ping_pong_worker_async( this.clone(), th, - this.sems[0].clone(), - this.back_sems[0].clone(), + this.sems[0], + this.back_sems[0], count, ), &this.workq, @@ -464,7 +471,7 @@ impl ThreadTests { } } - async fn simple_sem_async(this: Arc, id: usize, sem: Arc, count: usize) { + async fn simple_sem_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { for _ in 0..count { sem.give(); sem.take_async(NoWait).await.unwrap(); @@ -477,7 +484,7 @@ impl ThreadTests { .unwrap(); } - async fn simple_sem_yield_async(this: Arc, id: usize, sem: Arc, count: usize) { + async fn simple_sem_yield_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { for _ in 0..count { sem.give(); sem.take_async(NoWait).await.unwrap(); @@ -509,7 +516,7 @@ impl ThreadTests { } } - async fn sem_take_async(this: Arc, id: usize, sem: Arc, count: usize) { + async fn sem_take_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { for _ in 0..count { // Enable this to verify that we are actually blocking. if false { @@ -551,8 +558,8 @@ impl ThreadTests { async fn ping_pong_worker_async( this: Arc, id: usize, - sem: Arc, - back_sem: Arc, + sem: &'static Semaphore, + back_sem: &'static Semaphore, count: usize, ) { for i in 0..count { @@ -615,7 +622,7 @@ impl ThreadTests { // No reply. } - async fn sem_giver_async(this: Arc, sems: Vec>, count: usize) { + async fn sem_giver_async(this: Arc, sems: HeaplessVec<&'static Semaphore>, count: usize) { for _ in 0..count { for sem in &sems { sem.give(); @@ -699,7 +706,7 @@ impl ThreadTests { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct ChanPair { sender: Sender, receiver: Receiver, From 96fac0ab75e4b64423a8e892925637fe45fdc942 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Feb 2025 16:39:59 -0700 Subject: [PATCH 09/11] zephyr: Run rustfmt Clean up formatting Signed-off-by: David Brown --- zephyr/src/sys/sync/mutex.rs | 8 ++++++-- zephyr/src/work.rs | 14 +++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index 0cfa7d06..bd14a06b 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -52,7 +52,9 @@ impl Mutex { /// /// Create a new dynamically allocated Mutex. The Mutex can only be used from system threads. pub const fn new() -> Mutex { - Mutex { item: >::new_raw() } + Mutex { + item: >::new_raw(), + } } /// Lock a Zephyr Mutex. @@ -130,7 +132,9 @@ impl Condvar { /// /// Create a new dynamically allocated Condvar. The Condvar can only be used from system threads. pub const fn new() -> Condvar { - Condvar { item: >::new_raw() } + Condvar { + item: >::new_raw(), + } } /// Wait for someone else using this mutex/condvar pair to notify. diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index cc7d6b83..f4d820d7 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -194,8 +194,13 @@ use zephyr_sys::{ }; use crate::{ - error::to_result_void, kio::ContextExt, object::Fixed, simpletls::SimpleTls, sync::{Arc, Mutex}, - sys::thread::ThreadStack, time::Timeout, + error::to_result_void, + kio::ContextExt, + object::Fixed, + simpletls::SimpleTls, + sync::{Arc, Mutex}, + sys::thread::ThreadStack, + time::Timeout, }; pub mod futures; @@ -275,7 +280,10 @@ impl WorkQueueBuilder { // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is // a pointer into this work item, which will not move, because of the Fixed. let this = &mut *item.get(); - WORK_QUEUES.lock().unwrap().insert(&this.thread, WorkQueueRef(item.get())); + WORK_QUEUES + .lock() + .unwrap() + .insert(&this.thread, WorkQueueRef(item.get())); // SAFETY: Start work queue thread. The main issue here is that the work queue cannot // be deallocated once the thread has started. We enforce this by making Drop panic. From 3a6f3a1c8c28bdaa4a4eeb03c4fc21db2ed698a5 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Feb 2025 16:40:34 -0700 Subject: [PATCH 10/11] samples: bench: Clean up formatting Run rustfmt to clean up formatting of the code. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 842f3a8b..2e91148e 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -161,8 +161,10 @@ impl ThreadTests { let mut thread_commands = Vec::new(); - static SEMS: [StaticCell; NUM_THREADS] = [const { StaticCell::new() }; NUM_THREADS]; - static BACK_SEMS: [StaticCell; NUM_THREADS] = [const { StaticCell::new() }; NUM_THREADS]; + static SEMS: [StaticCell; NUM_THREADS] = + [const { StaticCell::new() }; NUM_THREADS]; + static BACK_SEMS: [StaticCell; NUM_THREADS] = + [const { StaticCell::new() }; NUM_THREADS]; for i in 0..count { let sem = SEMS[i].init(Semaphore::new(0, u32::MAX)); @@ -364,12 +366,7 @@ impl ThreadTests { Command::SimpleSemYieldAsync(count) => { spawn( - Self::simple_sem_yield_async( - this.clone(), - id, - this.sems[id], - count, - ), + Self::simple_sem_yield_async(this.clone(), id, this.sems[id], count), &this.workq, c"worker", ); @@ -484,7 +481,12 @@ impl ThreadTests { .unwrap(); } - async fn simple_sem_yield_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { + async fn simple_sem_yield_async( + this: Arc, + id: usize, + sem: &'static Semaphore, + count: usize, + ) { for _ in 0..count { sem.give(); sem.take_async(NoWait).await.unwrap(); From 8310bb947377b99343a9e91ecdbeef8286a1de11 Mon Sep 17 00:00:00 2001 From: David Brown Date: Sun, 9 Mar 2025 10:18:09 -0600 Subject: [PATCH 11/11] samples: philosophers: Migrate Mutex to const constructor With Mutex having a const constructor, simplify the initializaion, and use a simple static for the Stats container. Signed-off-by: David Brown --- samples/philosophers/src/lib.rs | 25 ++++++++++++++++--------- samples/philosophers/src/sysmutex.rs | 2 +- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index 596c31d4..f602ec49 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -68,22 +68,18 @@ extern "C" fn rust_main() { printkln!("Hello world from Rust on {}", zephyr::kconfig::CONFIG_BOARD); printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY); - let stats = Arc::new(Mutex::new_from( - Stats::default(), - STAT_MUTEX.init_once(()).unwrap(), - )); + let stats = &STAT_MUTEX; let syncers = get_syncer(); printkln!("Pre fork"); for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) { - let child_stat = stats.clone(); let thread = PHIL_THREADS[i] .init_once(PHIL_STACKS[i].init_once(()).unwrap()) .unwrap(); thread.spawn(move || { - phil_thread(i, syncer, child_stat); + phil_thread(i, syncer, stats); }); } @@ -133,7 +129,7 @@ fn get_syncer() -> Vec> { get_channel_syncer() } -fn phil_thread(n: usize, syncer: Arc, stats: Arc>) { +fn phil_thread(n: usize, syncer: Arc, stats: &'static Mutex) { printkln!("Child {} started: {:?}", n, syncer); // Determine our two forks. @@ -191,6 +187,17 @@ struct Stats { thinking: [u64; NUM_PHIL], } +impl Stats { + /// Const constructor to allow for static initializaion. + pub const fn new() -> Self { + Self { + count: [0; NUM_PHIL], + eating: [0; NUM_PHIL], + thinking: [0; NUM_PHIL], + } + } +} + impl Stats { fn record_eat(&mut self, index: usize, time: Duration) { self.eating[index] += time.to_millis(); @@ -211,9 +218,9 @@ impl Stats { } } +static STAT_MUTEX: Mutex = Mutex::new(Stats::new()); + kobj_define! { static PHIL_THREADS: [StaticThread; NUM_PHIL]; static PHIL_STACKS: [ThreadStack; NUM_PHIL]; - - static STAT_MUTEX: StaticMutex; } diff --git a/samples/philosophers/src/sysmutex.rs b/samples/philosophers/src/sysmutex.rs index c89f862f..b34e1c5c 100644 --- a/samples/philosophers/src/sysmutex.rs +++ b/samples/philosophers/src/sysmutex.rs @@ -23,7 +23,7 @@ pub struct SysMutexSync { impl SysMutexSync { #[allow(dead_code)] pub fn new() -> SysMutexSync { - let locks = [(); NUM_PHIL].each_ref().map(|()| Mutex::new().unwrap()); + let locks = [(); NUM_PHIL].each_ref().map(|()| Mutex::new()); SysMutexSync { locks } } }