From d700b24c00454675c23cf188954ae12fdb9fb047 Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 26 Feb 2025 15:48:31 -0700 Subject: [PATCH 01/11] zephyr: Add initial support for embassy Adds the Cargo.toml framework for initial support of using Embassy's executor on Zephyr. This implements a time driver for Embassy using a `k_timer` from Zephyr. Signed-off-by: David Brown --- zephyr/Cargo.toml | 22 +++++++ zephyr/src/embassy.rs | 88 ++++++++++++++++++++++++++++ zephyr/src/embassy/time_driver.rs | 97 +++++++++++++++++++++++++++++++ zephyr/src/lib.rs | 1 + 4 files changed, 208 insertions(+) create mode 100644 zephyr/src/embassy.rs create mode 100644 zephyr/src/embassy/time_driver.rs diff --git a/zephyr/Cargo.toml b/zephyr/Cargo.toml index 0b508568..ff272d8b 100644 --- a/zephyr/Cargo.toml +++ b/zephyr/Cargo.toml @@ -45,8 +45,30 @@ version = "0.2.2" # should be safe to build the crate even if the Rust code doesn't use it because of configs. features = ["alloc"] +[dependencies.embassy-time-driver] +version = "0.2" +# Someone needs to tell embassy what our clock frequency is. Until we have support from cmake for +# this, we'll have to leave this up to the app to provide, since it is heavily board specific. +optional = true + +[dependencies.embassy-time-queue-utils] +version = "0.1" +optional = true + +[dependencies.embassy-sync] +version = "0.6.2" +optional = true + # These are needed at build time. # Whether these need to be vendored is an open question. They are not # used by the core Zephyr tree, but are needed by zephyr applications. [build-dependencies] zephyr-build = { version = "0.1.0", path = "../zephyr-build" } + +[features] +# Provide an implementation of embassy-time-driver. +time-driver = [ + "dep:embassy-time-driver", + "dep:embassy-time-queue-utils", + "dep:embassy-sync", +] diff --git a/zephyr/src/embassy.rs b/zephyr/src/embassy.rs new file mode 100644 index 00000000..3361d77c --- /dev/null +++ b/zephyr/src/embassy.rs @@ -0,0 +1,88 @@ +//! Support for Embassy on Rust+Zephyr +//! +//! [Embassy](https://embassy.dev/) is: "The next-generation framework for embedded applications". +//! From a typical RTOS perspective it is perhaps a little difficult to explain what exactly it is, +//! and why it makes sense to discuss it in the context of supporting Rust on Zephyr. +//! +//! At a core level, Embassy is a set of crates that implement various functionality that is used +//! when writing bare metal applications in Rust. Combined, these provide most of the functionality +//! that is needed for an embedded application. However, the crates are largely independent, and as +//! such find use when combined with Zephyr. +//! +//! ## Executor +//! +//! A significant aspect of Embassy's functionality revolves around providing one or more executors +//! for coordinating async code in Rust. The Rust language transforms code annotated with +//! async/await into state machines that allow these operations to be run cooperatively. A bare +//! metal system with one or more of these executors managing async tasks can indeed solve many of +//! the types of scheduling solutions needed for embedded systems. +//! +//! Although Zephyr does have a thread scheduler, there are still some advantages to running an +//! executor on one or more Zephyr threads: +//! +//! - Because the async code is transformed into a state machine, this code only uses stack while +//! evaluating to the next stopping point. This allows a large number of async operations to +//! happen on a single thread, without requiring additional stack. The state machines themselves +//! do take memory, but this usage is known at compile time (with the stable Rust compiler, it is +//! allocated from a pool, and with the nightly compiler, can be completely compile time +//! determined). +//! - Context switches between async threads can be very fast. When running a single executor +//! thread, there is no need for locking for data that is entirely kept within that thread, and +//! these context switches have similar cost to a function call. Even with multiple threads +//! involved, many switches will happen on the same underlying Zephyr thread, reducing the need to +//! reschedule. +//! - Embassy provides a lot of mechanisms for coordinating between these tasks, all that work in +//! the context of async/await. Some may be thought of as redundant with Zephyr primitives, but +//! they serve a different purpose, and provide more streamlined coordination for things entirely +//! within the Rust world. +//! +//! ## Use +//! +//! To best use this module, it is best to look at the various examples under `samples/embassy*` in +//! this repo. Some of the embassy crates, especially embassy-executor have numerous features that +//! must be configured correctly for proper operation. To use the 'executor-thread' feature, it is +//! also necessary to configure embassy for the proper platform. Future versions of the Cmake files +//! for Rust on Zephyr may provide assistance with this, but for now, this does limit a given +//! application to running on a specific architecture. For using the `executor-zephyr` feature +//! provided by this module, it easier to allow the code to run on multiple platforms. +//! +//! The following features in the `zephyr` crate configure what is supported: +//! +//! - **`executor-zephyr`**: This implements an executor that uses Zephyr's thread primitives +//! (`k_thread_suspend` and `k_thread_resume`) to suspend the executor thread when there is no work +//! to perform. This feature is incompatible with either `embassy-thread` or `embassy-interrupt` +//! in the `embassy-executor` crate. +//! - **`embassy-time-driver`**: This feature causes the `zephyr` crate to provide a time driver to +//! Embassy. This driver uses a single `k_timer` in Zephyr to wake async operations that are +//! dependent on time. This enables the `embassy-time` crate's functionality to be used freely +//! within async tasks on Zephyr. +//! +//! Future versions of this support will provide async interfaces to various driver systems in +//! Zephyr, allowing the use of Zephyr drivers freely from async code. +//! +//! It is perfectly permissible to use the `executor-thread` feature from embassy-executor on +//! Zephyr, within the following guidelines: +//! +//! - The executor is incompatible with the async executor provided within [`crate::kio`], and +//! because there are no features to enable this, this functions will still be accessible. Be +//! careful. You should enable `no-kio` in the zephyr crate to hide these functions. +//! - This executor does not coordinate with the scheduler on Zephyr, but uses an +//! architecture-specific mechanmism when there is no work. On Cortex-M, this is the 'wfe' +//! instruction, on riscv32, the 'wfi' instruction. This means that no tasks of lower priority +//! will ever run, so this should only be started from the lowest priority task on the system. +//! - Because the 'idle' thread in Zephyr will never run, some platforms will not enter low power +//! mode, when the system is idle. This is very platform specific. +//! +//! ## Caveats +//! +//! The executor provided by Embassy is fundamentally incompatible with the executor provided by +//! this crate's [`crate::kio`] and [`crate::work::futures`]. Trying to use the functionality +//! provided by operations, such as [`Semaphore::take_async`], will generally result in a panic. +//! These routines are conditionally compiled out when `executor-zephyr` is enabled, but there is no +//! way for this crate to detect the use of embassy's `executor-threaded`. Combining these will +//! result in undefined behavior, likely difficult to debug crashes. +//! +//! [`Semaphore::take_async`]: crate::sys::sync::Semaphore::take_async + +#[cfg(feature = "time-driver")] +mod time_driver; diff --git a/zephyr/src/embassy/time_driver.rs b/zephyr/src/embassy/time_driver.rs new file mode 100644 index 00000000..d0d0fcbc --- /dev/null +++ b/zephyr/src/embassy/time_driver.rs @@ -0,0 +1,97 @@ +//! Embassy time driver for Zephyr. +//! +//! Implements the time driver for Embassy using a `k_timer` in Zephyr. + +use core::{cell::{RefCell, UnsafeCell}, mem}; + +use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex}; +use embassy_time_driver::Driver; +use embassy_time_queue_utils::Queue; + +use crate::raw::{ + k_timer, + k_timer_init, + k_timer_start, + k_timeout_t, +}; +use crate::sys::K_FOREVER; + +embassy_time_driver::time_driver_impl!(static DRIVER: ZephyrTimeDriver = ZephyrTimeDriver { + queue: Mutex::new(RefCell::new(Queue::new())), + timer: Mutex::new(RefCell::new(unsafe { mem::zeroed() })), +}); + +struct ZephyrTimeDriver { + queue: Mutex>, + timer: Mutex>, +} + +/// A wrapper around `k_timer`. In this case, the implementation is a little simpler than the one +/// in the timer module, as we are always called from within a critical section. +struct ZTimer { + item: UnsafeCell, + initialized: bool, +} + +impl ZTimer { + fn set_alarm(&mut self, next: u64, now: u64) -> bool { + if next <= now { + return false; + } + + // Otherwise, initialize our timer, and handle it. + if !self.initialized { + unsafe { k_timer_init(self.item.get(), Some(Self::timer_tick), None); } + self.initialized = true; + } + + // There is a +1 here as the `k_timer_start()` for historical reasons, subtracts one from + // the time, effectively rounding down, whereas we want to wait at least long enough. + let delta = k_timeout_t { ticks: (next - now + 1) as i64 }; + let period = K_FOREVER; + unsafe { k_timer_start(self.item.get(), delta, period); } + + true + } + + unsafe extern "C" fn timer_tick(_k_timer: *mut k_timer) { + DRIVER.check_alarm(); + } +} + +impl Driver for ZephyrTimeDriver { + fn now(&self) -> u64 { + crate::time::now().ticks() + } + + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + critical_section::with(|cs| { + let mut queue = self.queue.borrow(cs).borrow_mut(); + let mut timer = self.timer.borrow(cs).borrow_mut(); + + if queue.schedule_wake(at, waker) { + let mut next = queue.next_expiration(self.now()); + while !timer.set_alarm(next, self.now()) { + next = queue.next_expiration(self.now()); + } + } + }) + } +} + +impl ZephyrTimeDriver { + fn check_alarm(&self) { + critical_section::with(|cs| { + let mut queue = self.queue.borrow(cs).borrow_mut(); + let mut timer = self.timer.borrow(cs).borrow_mut(); + + let mut next = queue.next_expiration(self.now()); + while !timer.set_alarm(next, self.now()) { + next = queue.next_expiration(self.now()); + } + }) + } +} + +// SAFETY: The timer access is always coordinated through a critical section. +unsafe impl Send for ZTimer { } diff --git a/zephyr/src/lib.rs b/zephyr/src/lib.rs index 024a1136..3f596435 100644 --- a/zephyr/src/lib.rs +++ b/zephyr/src/lib.rs @@ -12,6 +12,7 @@ pub mod align; pub mod device; +pub mod embassy; pub mod error; #[cfg(CONFIG_RUST_ALLOC)] pub mod kio; From f1c28ceb8727deb6252ff07c5a9709e20c94435c Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 26 Feb 2025 15:50:22 -0700 Subject: [PATCH 02/11] samples: embassy: Start of an embassy demo This demo shows shows a few async tasks that are coordinated using the Embassy embassy-thread executor. Signed-off-by: David Brown --- samples/embassy/CMakeLists.txt | 8 ++++ samples/embassy/Cargo.toml | 51 +++++++++++++++++++++++++ samples/embassy/pimoroni_tiny_2040.conf | 7 ++++ samples/embassy/prj.conf | 14 +++++++ samples/embassy/rpi_pico.conf | 7 ++++ samples/embassy/sample.yaml | 20 ++++++++++ samples/embassy/src/lib.rs | 51 +++++++++++++++++++++++++ 7 files changed, 158 insertions(+) create mode 100644 samples/embassy/CMakeLists.txt create mode 100644 samples/embassy/Cargo.toml create mode 100644 samples/embassy/pimoroni_tiny_2040.conf create mode 100644 samples/embassy/prj.conf create mode 100644 samples/embassy/rpi_pico.conf create mode 100644 samples/embassy/sample.yaml create mode 100644 samples/embassy/src/lib.rs diff --git a/samples/embassy/CMakeLists.txt b/samples/embassy/CMakeLists.txt new file mode 100644 index 00000000..a4a93f2f --- /dev/null +++ b/samples/embassy/CMakeLists.txt @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) + +project(embassy_on_zephyr) +rust_cargo_application() diff --git a/samples/embassy/Cargo.toml b/samples/embassy/Cargo.toml new file mode 100644 index 00000000..53fb3fa9 --- /dev/null +++ b/samples/embassy/Cargo.toml @@ -0,0 +1,51 @@ +# Copyright (c) 2024 Linaro LTD +# SPDX-License-Identifier: Apache-2.0 + +[package] +# This must be rustapp for now. +name = "rustapp" +version = "0.1.0" +edition = "2021" +description = "A sample hello world application in Rust" +license = "Apache-2.0 or MIT" + +[lib] +crate-type = ["staticlib"] + +[dependencies] +zephyr = { version = "0.1.0", features = ["time-driver"] } +log = "0.4.22" +static_cell = "2.1" + +[dependencies.embassy-executor] +version = "0.7.0" +# path = "../../embassy/embassy-executor" +features = [ + "log", + "task-arena-size-1024", + "arch-cortex-m", + "executor-thread", +] + +[dependencies.embassy-futures] +version = "0.1.1" +# path = "../../embassy/embassy-futures" + +[dependencies.embassy-sync] +version = "0.6.2" +# path = "../../embassy/embassy-sync" + +[dependencies.embassy-time] +version = "0.4.0" +# path = "../../embassy/embassy-time" +# This is board specific. +features = ["tick-hz-10_000"] + +[dependencies.critical-section] +version = "1.2" + +[profile.dev] +opt-level = 1 + +[profile.release] +debug = true diff --git a/samples/embassy/pimoroni_tiny_2040.conf b/samples/embassy/pimoroni_tiny_2040.conf new file mode 100644 index 00000000..94c0843a --- /dev/null +++ b/samples/embassy/pimoroni_tiny_2040.conf @@ -0,0 +1,7 @@ +# Copyright (c) 2024 Linaro LTD +# SPDX-License-Identifier: Apache-2.0 + +# This board doesn't have a serial console, so use RTT. +CONFIG_UART_CONSOLE=n +CONFIG_RTT_CONSOLE=y +CONFIG_USE_SEGGER_RTT=y diff --git a/samples/embassy/prj.conf b/samples/embassy/prj.conf new file mode 100644 index 00000000..10dd0b37 --- /dev/null +++ b/samples/embassy/prj.conf @@ -0,0 +1,14 @@ +# Copyright (c) 2024 Linaro LTD +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_DEBUG=y + +# The default 1k stack isn't large enough for rust string formatting with logging. +CONFIG_MAIN_STACK_SIZE=4096 + +CONFIG_RUST=y + +CONFIG_RUST_ALLOC=y +# CONFIG_LOG=y + +CONFIG_LOG_BACKEND_RTT=n diff --git a/samples/embassy/rpi_pico.conf b/samples/embassy/rpi_pico.conf new file mode 100644 index 00000000..94c0843a --- /dev/null +++ b/samples/embassy/rpi_pico.conf @@ -0,0 +1,7 @@ +# Copyright (c) 2024 Linaro LTD +# SPDX-License-Identifier: Apache-2.0 + +# This board doesn't have a serial console, so use RTT. +CONFIG_UART_CONSOLE=n +CONFIG_RTT_CONSOLE=y +CONFIG_USE_SEGGER_RTT=y diff --git a/samples/embassy/sample.yaml b/samples/embassy/sample.yaml new file mode 100644 index 00000000..d598029d --- /dev/null +++ b/samples/embassy/sample.yaml @@ -0,0 +1,20 @@ +sample: + description: Embassy Hello + name: hello and basics from Embassy +common: + harness: console + harness_config: + type: one_line + regex: + - "Hello world from Rust on (.*)" + tags: rust + filter: CONFIG_RUST_SUPPORTED + platform_allow: + - qemu_cortex_m0 + - qemu_cortex_m3 + - qemu_riscv32 + - qemu_riscv64 + - nrf52840dk/nrf52840 +tests: + sample.rust.helloworld: + tags: introduction diff --git a/samples/embassy/src/lib.rs b/samples/embassy/src/lib.rs new file mode 100644 index 00000000..9a7fcedb --- /dev/null +++ b/samples/embassy/src/lib.rs @@ -0,0 +1,51 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +#![no_std] + +use embassy_executor::{Executor, Spawner}; +use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel}; +use embassy_time::{Duration, Timer}; +use log::info; +use static_cell::StaticCell; + +static EXECUTOR_LOW: StaticCell = StaticCell::new(); + +#[no_mangle] +extern "C" fn rust_main() { + unsafe { + zephyr::set_logger().unwrap(); + } + + info!("Hello world from Rust on {}", zephyr::kconfig::CONFIG_BOARD); + + let executor = EXECUTOR_LOW.init(Executor::new()); + executor.run(|spawner| { + spawner.spawn(sample_task(spawner)).unwrap(); + }) +} + +static CHAN: Channel = Channel::new(); + +#[embassy_executor::task] +async fn sample_task(spawner: Spawner) { + info!("Started once"); + spawner.spawn(other_task(spawner)).unwrap(); + loop { + // Wait for a message. + let msg = CHAN.receive().await; + info!("main task got: {}", msg); + } +} + +#[embassy_executor::task] +async fn other_task(_spawner: Spawner) { + info!("The other task"); + let mut count = 0; + loop { + CHAN.send(count).await; + count = count.wrapping_add(1); + + Timer::after(Duration::from_secs(1)).await; + } +} From b580a3f657888a88376cec85ffd23afe8f1ad67c Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 27 Feb 2025 16:20:58 -0700 Subject: [PATCH 03/11] zephyr: Implement `executor-zephyr` to use with Embassy This implements an executor for Embassy that runs a set of async tasks on a single Zephyr thread. The executor will suspend the thread when there is no work to do, resuming when there is work. It is permissible to run multiple executors, on different threads, to support a hybrid async cooperative and priority-based scheduling. Signed-off-by: David Brown --- zephyr/Cargo.toml | 9 +++++ zephyr/src/embassy.rs | 5 +++ zephyr/src/embassy/executor.rs | 74 ++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 zephyr/src/embassy/executor.rs diff --git a/zephyr/Cargo.toml b/zephyr/Cargo.toml index ff272d8b..2e340907 100644 --- a/zephyr/Cargo.toml +++ b/zephyr/Cargo.toml @@ -59,6 +59,10 @@ optional = true version = "0.6.2" optional = true +[dependencies.embassy-executor] +version = "0.7.0" +optional = true + # These are needed at build time. # Whether these need to be vendored is an open question. They are not # used by the core Zephyr tree, but are needed by zephyr applications. @@ -72,3 +76,8 @@ time-driver = [ "dep:embassy-time-queue-utils", "dep:embassy-sync", ] + +# Provide an embassy-based executor that runs on individual Zephyr threads. +executor-zephyr = [ + "dep:embassy-executor", +] diff --git a/zephyr/src/embassy.rs b/zephyr/src/embassy.rs index 3361d77c..fa16400b 100644 --- a/zephyr/src/embassy.rs +++ b/zephyr/src/embassy.rs @@ -86,3 +86,8 @@ #[cfg(feature = "time-driver")] mod time_driver; + +#[cfg(feature = "executor-zephyr")] +pub use executor::Executor; +#[cfg(feature = "executor-zephyr")] +mod executor; diff --git a/zephyr/src/embassy/executor.rs b/zephyr/src/embassy/executor.rs new file mode 100644 index 00000000..936435b7 --- /dev/null +++ b/zephyr/src/embassy/executor.rs @@ -0,0 +1,74 @@ +//! An embassy executor tailored for Zephyr + +use core::{marker::PhantomData, sync::atomic::Ordering}; + +use embassy_executor::{raw, Spawner}; +use zephyr_sys::{k_current_get, k_thread_resume, k_thread_suspend, k_tid_t}; + +use crate::{printkln, sync::atomic::AtomicBool}; + +/// Zephyr-thread based executor. +pub struct Executor { + inner: Option, + id: k_tid_t, + pend: AtomicBool, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let id = unsafe { k_current_get() }; + + Self { + inner: None, + pend: AtomicBool::new(false), + id, + not_send: PhantomData, + } + } + + /// Run the executor. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + let context = self as *mut _ as *mut (); + self.inner.replace(raw::Executor::new(context)); + let inner = self.inner.as_mut().unwrap(); + init(inner.spawner()); + + loop { + unsafe { + // The raw executor's poll only runs things that were queued _before_ this poll + // itself is actually run. This means, specifically, that if the polled execution + // causes this, or other threads to enqueue, this will return without running them. + // `__pender` _will_ be called, but it isn't "sticky" like `wfe/sev` are. To + // simulate this, we will use the 'pend' atomic to count + inner.poll(); + if !self.pend.swap(false, Ordering::SeqCst) { + // printkln!("_suspend"); + k_thread_suspend(k_current_get()); + } + } + } + } +} + +#[export_name = "__pender"] +fn __pender(context: *mut ()) { + unsafe { + let myself = k_current_get(); + + let this = context as *const Executor; + let other = (*this).id; + + // If the other is a different thread, resume it. + if other != myself { + // printkln!("_resume"); + k_thread_resume(other); + } + // Otherwise, we need to make sure our own next suspend doesn't happen. + // We need to also prevent a suspend from happening in the case where the only running + // thread causes other work to become pending. The resume below will do nothing, as we + // are just running. + (*this).pend.store(true, Ordering::SeqCst); + } +} From 6d13ea58b68e69d74e89cfd05ee8ca682a605683 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 27 Feb 2025 16:23:07 -0700 Subject: [PATCH 04/11] samples: embassy: Update Embassy Demo for Zephyr executor Update the embassy demo to use the Zephyr executor. This performs a ping-ping test using two executors, with one end of the responder running on another thread. On the rp2040, the round trip time with `executor-threaded` is about 12us, with `executor-zephyr`, all on a single thread, it is about 15us, and with pairs of task running across differrent threads, the time is about 26us. Signed-off-by: David Brown --- samples/embassy/Cargo.toml | 16 +- samples/embassy/src/lib.rs | 294 ++++++++++++++++++++++++++++++++++--- 2 files changed, 288 insertions(+), 22 deletions(-) diff --git a/samples/embassy/Cargo.toml b/samples/embassy/Cargo.toml index 53fb3fa9..bc6ea6f9 100644 --- a/samples/embassy/Cargo.toml +++ b/samples/embassy/Cargo.toml @@ -16,6 +16,7 @@ crate-type = ["staticlib"] zephyr = { version = "0.1.0", features = ["time-driver"] } log = "0.4.22" static_cell = "2.1" +heapless = "0.8" [dependencies.embassy-executor] version = "0.7.0" @@ -23,8 +24,6 @@ version = "0.7.0" features = [ "log", "task-arena-size-1024", - "arch-cortex-m", - "executor-thread", ] [dependencies.embassy-futures] @@ -44,6 +43,19 @@ features = ["tick-hz-10_000"] [dependencies.critical-section] version = "1.2" +[features] +# default = ["executor-thread"] +default = ["executor-zephyr"] + +executor-thread = [ + "embassy-executor/arch-cortex-m", + "embassy-executor/executor-thread", +] + +executor-zephyr = [ + "zephyr/executor-zephyr", +] + [profile.dev] opt-level = 1 diff --git a/samples/embassy/src/lib.rs b/samples/embassy/src/lib.rs index 9a7fcedb..5e965dda 100644 --- a/samples/embassy/src/lib.rs +++ b/samples/embassy/src/lib.rs @@ -3,13 +3,37 @@ #![no_std] -use embassy_executor::{Executor, Spawner}; +extern crate alloc; + +use core::ffi::c_int; + +#[cfg(feature = "executor-thread")] +use embassy_executor::Executor; + +#[cfg(feature = "executor-zephyr")] +use zephyr::embassy::Executor; + +use alloc::format; +use embassy_executor::{SendSpawner, Spawner}; use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel}; -use embassy_time::{Duration, Timer}; use log::info; use static_cell::StaticCell; +use zephyr::{kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, kobj_define, printkln, raw::k_cycle_get_64}; +use zephyr::raw; + +/// Maximum number of threads to spawn. As this is async, these do not each need a stack. +const NUM_THREADS: usize = 6; + +const THREAD_STACK_SIZE: usize = 2048; static EXECUTOR_LOW: StaticCell = StaticCell::new(); +static EXECUTOR_MAIN: StaticCell = StaticCell::new(); + +static LOW_SPAWNER: Channel = Channel::new() ; + +// The main thread priority. +const MAIN_PRIO: c_int = 2; +const LOW_PRIO: c_int = 5; #[no_mangle] extern "C" fn rust_main() { @@ -17,35 +41,265 @@ extern "C" fn rust_main() { zephyr::set_logger().unwrap(); } - info!("Hello world from Rust on {}", zephyr::kconfig::CONFIG_BOARD); + // Set our own priority. + unsafe { + raw::k_thread_priority_set(raw::k_current_get(), MAIN_PRIO); + } - let executor = EXECUTOR_LOW.init(Executor::new()); + // Start up the low priority thread. + let mut thread = LOW_THREAD + .init_once(LOW_STACK.init_once(()).unwrap()) + .unwrap(); + thread.set_priority(LOW_PRIO); + thread.spawn(move || { + low_executor(); + }); + + info!("Starting Embassy executor on {}", zephyr::kconfig::CONFIG_BOARD); + + let executor = EXECUTOR_MAIN.init(Executor::new()); executor.run(|spawner| { - spawner.spawn(sample_task(spawner)).unwrap(); + spawner.spawn(main(spawner)).unwrap(); }) } -static CHAN: Channel = Channel::new(); +/// The low priority executor. +fn low_executor() -> ! { + let executor = EXECUTOR_LOW.init(Executor::new()); + executor.run(|spawner| { + LOW_SPAWNER.try_send(spawner.make_send()).ok().unwrap(); + }) +} #[embassy_executor::task] -async fn sample_task(spawner: Spawner) { - info!("Started once"); - spawner.spawn(other_task(spawner)).unwrap(); - loop { - // Wait for a message. - let msg = CHAN.receive().await; - info!("main task got: {}", msg); +async fn main(spawner: Spawner) { + info!("Benchmark begin"); + + let low_spawner = LOW_SPAWNER.receive().await; + + let tester = ThreadTests::new(NUM_THREADS); + + tester.run(spawner, low_spawner, Command::Empty).await; + tester.run(spawner, low_spawner, Command::Empty).await; + tester.run(spawner, low_spawner, Command::PingPong(10_000)).await; +} + +/// Async task tests. +/// +/// For each test, we have a set of threads that do work, a "high priority" thread higher than those +/// and a low priority thread, lower than any of those. This is used to test operations in both a +/// fast-path (message or semaphore always available), and slow path (thread must block and be woken +/// by message coming in). Generally, this is determined by whether high or low priority tasks are +/// providing the data. +struct ThreadTests { + /// How many threads were actually asked for. + count: usize, + + /// Forward channels, acts as semaphores forward. + forward: heapless::Vec, NUM_THREADS>, + + back: Channel, + + /// Each worker sends results back through this. + answers: Channel, +} + +impl ThreadTests { + /// Construct the tests. + /// + /// Note that this uses a single StaticCell, and therefore can only be called once. + fn new(count: usize) -> &'static Self { + static THIS: StaticCell = StaticCell::new(); + let this = THIS.init(Self { + count, + forward: heapless::Vec::new(), + back: Channel::new(), + answers: Channel::new(), + }); + + for _ in 0..count { + this.forward.push(Channel::new()).ok().unwrap(); + } + + this } + + async fn run(&'static self, spawner: Spawner, low_spawner: SendSpawner, command: Command) { + let desc = format!("{:?}", command); + let timer = BenchTimer::new(&desc, self.count * command.get_count()); + + let mut answers: heapless::Vec, NUM_THREADS> = heapless::Vec::new(); + for _ in 0..self.count { + answers.push(None).unwrap(); + } + let mut low = false; + let mut msg_count = (1 + self.count) as isize; + + // Fire off all of the workers. + for id in 0..self.count { + spawner.spawn(worker(self, id, command)).unwrap(); + } + + // And the "low" priority thread (which isn't lower at this time). + low_spawner.spawn(low_task(self, command)).unwrap(); + //let _ = low_spawner; + //spawner.spawn(low_task(self, command)).unwrap(); + + // Now wait for all of the responses. + loop { + match self.answers.receive().await { + Answer::Worker { id, count } => { + if answers[id].replace(count).is_some() { + panic!("Multiple results from worker {}", id); + } + msg_count -= 1; + if msg_count <= 0 { + break; + } + } + + Answer::Low => { + if low { + panic!("Multiple result from 'low' worker"); + } + low = true; + + msg_count -= 1; + if msg_count <= 0 { + break; + } + } + } + } + + if msg_count != 0 { + panic!("Invalid number of replies\n"); + } + + timer.stop(); + } +} + +/// An individual work thread. This performs the specified operation, returning the result. +#[embassy_executor::task(pool_size = NUM_THREADS)] +async fn worker(this: &'static ThreadTests, id: usize, command: Command) { + let mut total = 0; + + match command { + Command::Empty => { + // Nothing to do. + } + Command::PingPong(count) => { + // The ping pong test, reads messages from in indexed channel (one for each worker), and + // replies to a shared channel. + for _ in 0..count { + this.forward[id].receive().await; + this.back.send(()).await; + total += 1; + } + } + } + + this.answers.send(Answer::Worker { id, count: total }).await; } +/// The low priority worker for the given command. Exits when finished. #[embassy_executor::task] -async fn other_task(_spawner: Spawner) { - info!("The other task"); - let mut count = 0; - loop { - CHAN.send(count).await; - count = count.wrapping_add(1); +async fn low_task(this: &'static ThreadTests, command: Command) { + match command { + Command::Empty => { + // Nothing to do. + } + Command::PingPong(count) => { + // Each worker expects a message to tell it to work, and will reply with its answer. + for _ in 0..count { + for forw in &this.forward { + forw.send(()).await; + this.back.receive().await; + } + } + } + } + + this.answers.send(Answer::Low).await; +} - Timer::after(Duration::from_secs(1)).await; +#[derive(Copy, Clone, Debug)] +enum Command { + /// The empty test. Does nothing, but invokes everything. Useful to determine overhead. + Empty, + /// Pong test. Each thread waits for a message on its own channel, and then replies on a shared + /// channel to a common worker that is performing these operations. + PingPong(usize), +} + +impl Command { + /// Return how many operations this particular command invokes. + fn get_count(self) -> usize { + match self { + Self::Empty => 0, + Self::PingPong(count) => count, + } } } + +#[derive(Debug)] +enum Answer { + /// A worker has finished it's processing. + Worker { + /// What is the id of this worker. + id: usize, + /// Operation count. + count: usize, + }, + /// The low priority task has completed. + Low, +} + +// TODO: Put this benchmarking stuff somewhere useful. +fn now() -> u64 { + unsafe { k_cycle_get_64() } +} + +/// Timing some operations. +/// +/// To use: +/// ``` +/// /// 500 is the number of iterations happening. +/// let timer = BenchTimer::new("My thing", 500); +/// // operations +/// timer.stop("Thing being timed"); +/// ``` +pub struct BenchTimer<'a> { + what: &'a str, + start: u64, + count: usize, +} + +impl<'a> BenchTimer<'a> { + pub fn new(what: &'a str, count: usize) -> Self { + Self { + what, + start: now(), + count, + } + } + + pub fn stop(self) { + let stop = now(); + let time = + (stop - self.start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0; + let time = if self.count > 0 { + time / (self.count as f64) * 1000.0 + } else { + 0.0 + }; + + printkln!(" {:8.3} us, {} of {}", time, self.count, self.what); + } +} + +kobj_define! { + static LOW_THREAD: StaticThread; + static LOW_STACK: ThreadStack; +} From c2d5ce4136fdba0a0548182aa360a7ae95e05232 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 27 Feb 2025 16:26:28 -0700 Subject: [PATCH 05/11] zephyr: Update rustfmt Update the formatting. Signed-off-by: David Brown --- zephyr/src/embassy/executor.rs | 2 +- zephyr/src/embassy/time_driver.rs | 26 +++++++++++++++----------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/zephyr/src/embassy/executor.rs b/zephyr/src/embassy/executor.rs index 936435b7..349eaf50 100644 --- a/zephyr/src/embassy/executor.rs +++ b/zephyr/src/embassy/executor.rs @@ -41,7 +41,7 @@ impl Executor { // itself is actually run. This means, specifically, that if the polled execution // causes this, or other threads to enqueue, this will return without running them. // `__pender` _will_ be called, but it isn't "sticky" like `wfe/sev` are. To - // simulate this, we will use the 'pend' atomic to count + // simulate this, we will use the 'pend' atomic to count inner.poll(); if !self.pend.swap(false, Ordering::SeqCst) { // printkln!("_suspend"); diff --git a/zephyr/src/embassy/time_driver.rs b/zephyr/src/embassy/time_driver.rs index d0d0fcbc..c9e73247 100644 --- a/zephyr/src/embassy/time_driver.rs +++ b/zephyr/src/embassy/time_driver.rs @@ -2,18 +2,16 @@ //! //! Implements the time driver for Embassy using a `k_timer` in Zephyr. -use core::{cell::{RefCell, UnsafeCell}, mem}; +use core::{ + cell::{RefCell, UnsafeCell}, + mem, +}; use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex}; use embassy_time_driver::Driver; use embassy_time_queue_utils::Queue; -use crate::raw::{ - k_timer, - k_timer_init, - k_timer_start, - k_timeout_t, -}; +use crate::raw::{k_timeout_t, k_timer, k_timer_init, k_timer_start}; use crate::sys::K_FOREVER; embassy_time_driver::time_driver_impl!(static DRIVER: ZephyrTimeDriver = ZephyrTimeDriver { @@ -41,15 +39,21 @@ impl ZTimer { // Otherwise, initialize our timer, and handle it. if !self.initialized { - unsafe { k_timer_init(self.item.get(), Some(Self::timer_tick), None); } + unsafe { + k_timer_init(self.item.get(), Some(Self::timer_tick), None); + } self.initialized = true; } // There is a +1 here as the `k_timer_start()` for historical reasons, subtracts one from // the time, effectively rounding down, whereas we want to wait at least long enough. - let delta = k_timeout_t { ticks: (next - now + 1) as i64 }; + let delta = k_timeout_t { + ticks: (next - now + 1) as i64, + }; let period = K_FOREVER; - unsafe { k_timer_start(self.item.get(), delta, period); } + unsafe { + k_timer_start(self.item.get(), delta, period); + } true } @@ -94,4 +98,4 @@ impl ZephyrTimeDriver { } // SAFETY: The timer access is always coordinated through a critical section. -unsafe impl Send for ZTimer { } +unsafe impl Send for ZTimer {} From c3bd91243645aa7d0a1fc873c3358e4eea5d6900 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 27 Feb 2025 16:26:46 -0700 Subject: [PATCH 06/11] samples: embassy: Update formatting Run `cargo fmt`. Signed-off-by: David Brown --- samples/embassy/src/lib.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/samples/embassy/src/lib.rs b/samples/embassy/src/lib.rs index 5e965dda..d44e0b07 100644 --- a/samples/embassy/src/lib.rs +++ b/samples/embassy/src/lib.rs @@ -18,8 +18,10 @@ use embassy_executor::{SendSpawner, Spawner}; use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel}; use log::info; use static_cell::StaticCell; -use zephyr::{kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, kobj_define, printkln, raw::k_cycle_get_64}; use zephyr::raw; +use zephyr::{ + kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, kobj_define, printkln, raw::k_cycle_get_64, +}; /// Maximum number of threads to spawn. As this is async, these do not each need a stack. const NUM_THREADS: usize = 6; @@ -29,7 +31,7 @@ const THREAD_STACK_SIZE: usize = 2048; static EXECUTOR_LOW: StaticCell = StaticCell::new(); static EXECUTOR_MAIN: StaticCell = StaticCell::new(); -static LOW_SPAWNER: Channel = Channel::new() ; +static LOW_SPAWNER: Channel = Channel::new(); // The main thread priority. const MAIN_PRIO: c_int = 2; @@ -55,7 +57,10 @@ extern "C" fn rust_main() { low_executor(); }); - info!("Starting Embassy executor on {}", zephyr::kconfig::CONFIG_BOARD); + info!( + "Starting Embassy executor on {}", + zephyr::kconfig::CONFIG_BOARD + ); let executor = EXECUTOR_MAIN.init(Executor::new()); executor.run(|spawner| { @@ -81,7 +86,9 @@ async fn main(spawner: Spawner) { tester.run(spawner, low_spawner, Command::Empty).await; tester.run(spawner, low_spawner, Command::Empty).await; - tester.run(spawner, low_spawner, Command::PingPong(10_000)).await; + tester + .run(spawner, low_spawner, Command::PingPong(10_000)) + .await; } /// Async task tests. From 4ebf17b3afbb89ca9720d2f174d3d3a25b837eb3 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 27 Feb 2025 22:20:56 -0700 Subject: [PATCH 07/11] samples: embassy: Fix test name Signed-off-by: David Brown --- samples/embassy/sample.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/embassy/sample.yaml b/samples/embassy/sample.yaml index d598029d..272706cc 100644 --- a/samples/embassy/sample.yaml +++ b/samples/embassy/sample.yaml @@ -16,5 +16,5 @@ common: - qemu_riscv64 - nrf52840dk/nrf52840 tests: - sample.rust.helloworld: + sample.rust.embassyhello: tags: introduction From 72d126b47f41d640dbde393d0856b021ca56cfaf Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 3 Mar 2025 13:24:09 -0700 Subject: [PATCH 08/11] samples: embassy: Increase task pool size On 64-bit targets, the task Futures are larger, and the chosen pool size is not adequate. Increase to 2k, which still should fit in the memory constraints on all targets, but not just fail immediately on 64-bit targets. Signed-off-by: David Brown --- samples/embassy/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/embassy/Cargo.toml b/samples/embassy/Cargo.toml index bc6ea6f9..6301463e 100644 --- a/samples/embassy/Cargo.toml +++ b/samples/embassy/Cargo.toml @@ -23,7 +23,7 @@ version = "0.7.0" # path = "../../embassy/embassy-executor" features = [ "log", - "task-arena-size-1024", + "task-arena-size-2048", ] [dependencies.embassy-futures] From 06bf85f6bb41ebac64e0dbb68a26fbf2270fe1c1 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 3 Mar 2025 13:25:04 -0700 Subject: [PATCH 09/11] samples: embassy: Detect proper end to the test Print a message after the test finish, and use this to terminate the tests with success. Signed-off-by: David Brown --- samples/embassy/sample.yaml | 2 +- samples/embassy/src/lib.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/samples/embassy/sample.yaml b/samples/embassy/sample.yaml index 272706cc..c4a24cf5 100644 --- a/samples/embassy/sample.yaml +++ b/samples/embassy/sample.yaml @@ -6,7 +6,7 @@ common: harness_config: type: one_line regex: - - "Hello world from Rust on (.*)" + - "Embassy tests passed" tags: rust filter: CONFIG_RUST_SUPPORTED platform_allow: diff --git a/samples/embassy/src/lib.rs b/samples/embassy/src/lib.rs index d44e0b07..8bfd14be 100644 --- a/samples/embassy/src/lib.rs +++ b/samples/embassy/src/lib.rs @@ -89,6 +89,8 @@ async fn main(spawner: Spawner) { tester .run(spawner, low_spawner, Command::PingPong(10_000)) .await; + + info!("Embassy tests passed"); } /// Async task tests. From 7cde08f6bc7bd7f738263f672a275c5d0918cea4 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 3 Mar 2025 13:25:43 -0700 Subject: [PATCH 10/11] zephyr: embassy: Eliminate unused printkln import With the directives themselves removed, remove the import to avoid a warning about unused imports. Signed-off-by: David Brown --- zephyr/src/embassy/executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zephyr/src/embassy/executor.rs b/zephyr/src/embassy/executor.rs index 349eaf50..b6ca1393 100644 --- a/zephyr/src/embassy/executor.rs +++ b/zephyr/src/embassy/executor.rs @@ -5,7 +5,7 @@ use core::{marker::PhantomData, sync::atomic::Ordering}; use embassy_executor::{raw, Spawner}; use zephyr_sys::{k_current_get, k_thread_resume, k_thread_suspend, k_tid_t}; -use crate::{printkln, sync::atomic::AtomicBool}; +use crate::sync::atomic::AtomicBool; /// Zephyr-thread based executor. pub struct Executor { From 7212600c0a0164a0fced560d901f3a299aaf2ad7 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 3 Mar 2025 13:28:57 -0700 Subject: [PATCH 11/11] zephyr: embassy: Add Default implementation Clippy complains about a no-argument `new()` method without an accompanying Default. It doesn't hurt anything to add, and could make this potentially slightly more useful. Signed-off-by: David Brown --- zephyr/src/embassy/executor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zephyr/src/embassy/executor.rs b/zephyr/src/embassy/executor.rs index b6ca1393..22dfa29a 100644 --- a/zephyr/src/embassy/executor.rs +++ b/zephyr/src/embassy/executor.rs @@ -52,6 +52,12 @@ impl Executor { } } +impl Default for Executor { + fn default() -> Self { + Self::new() + } +} + #[export_name = "__pender"] fn __pender(context: *mut ()) { unsafe {