diff --git a/samples/work-philosophers/CMakeLists.txt b/samples/async-philosophers/CMakeLists.txt similarity index 85% rename from samples/work-philosophers/CMakeLists.txt rename to samples/async-philosophers/CMakeLists.txt index e118b2c3..7716a8ea 100644 --- a/samples/work-philosophers/CMakeLists.txt +++ b/samples/async-philosophers/CMakeLists.txt @@ -3,6 +3,6 @@ cmake_minimum_required(VERSION 3.20.0) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) -project(work_philosophers) +project(async_philosophers) rust_cargo_application() diff --git a/samples/async-philosophers/Cargo.toml b/samples/async-philosophers/Cargo.toml new file mode 100644 index 00000000..d26b7cba --- /dev/null +++ b/samples/async-philosophers/Cargo.toml @@ -0,0 +1,36 @@ +# Copyright (c) 2024 Linaro LTD +# SPDX-License-Identifier: Apache-2.0 + +[package] +# This must be rustapp for now. +name = "rustapp" +version = "0.1.0" +edition = "2021" +description = "A sample hello world application in Rust" +license = "Apache-2.0 or MIT" + +[lib] +crate-type = ["staticlib"] + +[dependencies] +zephyr = { version = "0.1.0", features = ["time-driver", "executor-zephyr"] } +static_cell = "2.1" + +embassy-executor = { version = "0.7.0", features = ["log", "task-arena-size-2048"] } +embassy-sync = "0.6.2" + +# For real builds, you should figure out your target's tick rate and set the appropriate feature, +# like in these examples. Without this, embassy-time will assume a 1Mhz tick rate, and every time +# operation will involve a conversion. +embassy-time = "0.4.0" +# embassy-time = { version = "0.4.0", features = ["tick-hz-10_000"] } +# embassy-time = { version = "0.4.0", features = ["tick-hz-100"] } + +# Dependencies that are used by build.rs. +[build-dependencies] +zephyr-build = "0.1.0" + +[profile.release] +debug-assertions = true +overflow-checks = true +debug = true diff --git a/samples/work-philosophers/Kconfig b/samples/async-philosophers/Kconfig similarity index 100% rename from samples/work-philosophers/Kconfig rename to samples/async-philosophers/Kconfig diff --git a/samples/work-philosophers/boards/rpi_pico.conf b/samples/async-philosophers/boards/rpi_pico.conf similarity index 100% rename from samples/work-philosophers/boards/rpi_pico.conf rename to samples/async-philosophers/boards/rpi_pico.conf diff --git a/samples/work-philosophers/build.rs b/samples/async-philosophers/build.rs similarity index 100% rename from samples/work-philosophers/build.rs rename to samples/async-philosophers/build.rs diff --git a/samples/work-philosophers/prj.conf b/samples/async-philosophers/prj.conf similarity index 94% rename from samples/work-philosophers/prj.conf rename to samples/async-philosophers/prj.conf index c53536c4..2232da7d 100644 --- a/samples/work-philosophers/prj.conf +++ b/samples/async-philosophers/prj.conf @@ -7,6 +7,7 @@ CONFIG_MAIN_STACK_SIZE=8192 CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=4096 CONFIG_POLL=y +CONFIG_STACK_CANARIES=y # CONFIG_DEBUG=y diff --git a/samples/work-philosophers/sample.yaml b/samples/async-philosophers/sample.yaml similarity index 82% rename from samples/work-philosophers/sample.yaml rename to samples/async-philosophers/sample.yaml index be3698fd..1b960183 100644 --- a/samples/work-philosophers/sample.yaml +++ b/samples/async-philosophers/sample.yaml @@ -1,6 +1,6 @@ sample: - description: Philosphers, in Rust - name: workq philosophers rust + description: Async Philosphers, in Rust + name: async philosophers rust common: harness: console harness_config: diff --git a/samples/async-philosophers/src/async_sem.rs b/samples/async-philosophers/src/async_sem.rs new file mode 100644 index 00000000..0e9a47bf --- /dev/null +++ b/samples/async-philosophers/src/async_sem.rs @@ -0,0 +1,98 @@ +//! Async Semaphore based demo +//! +//! This implementation on the dining philosopher problem uses Zephyr semaphores to represent the +//! forks. Each philosopher dines as per the algorithm a number of times, and when the are all +//! finished, the test is considered successful. Deadlock will result in the primary thread not +//! completing. +//! +//! Notably, this uses Rc and RefCell along with spawn_local to demonstrate that multiple async +//! tasks run on the same worker do not need Send. It is just important that write operations on +//! the RefCell do not `.await` or a panic is likely. + +use embassy_executor::Spawner; +use embassy_sync::{ + blocking_mutex::raw::CriticalSectionRawMutex, + mutex::Mutex, + semaphore::{FairSemaphore, Semaphore}, +}; +use embassy_time::Timer; +use zephyr::{printkln, sync::Arc}; + +use crate::{get_random_delay, ResultSignal, Stats, NUM_PHIL}; + +type ESemaphore = FairSemaphore; + +/// The semaphores for the forks. +static FORKS: [ESemaphore; NUM_PHIL] = [const { ESemaphore::new(1) }; NUM_PHIL]; + +/// The semaphore to wait for them all to finish. +static DONE_SEM: ESemaphore = ESemaphore::new(0); + +/// Number of iterations of each philospher. +/// +/// Should be long enough to exercise the test, but too +/// long and the test will timeout. The delay calculated will randomly be between 25 and 775, and +/// there are two waits, so typically, each "eat" will take about a second. +const EAT_COUNT: usize = 10; + +#[embassy_executor::task] +pub async fn phil(spawner: Spawner, stats_sig: &'static ResultSignal) { + // Our overall stats. + let stats = Arc::new(Mutex::new(Stats::default())); + + // Spawn off each philosopher. + for i in 0..NUM_PHIL { + let forks = if i == NUM_PHIL - 1 { + [&FORKS[0], &FORKS[i]] + } else { + [&FORKS[i], &FORKS[i + 1]] + }; + + spawner.spawn(one_phil(forks, i, stats.clone())).unwrap(); + } + + // Wait for them all to finish. + DONE_SEM.acquire(NUM_PHIL).await.unwrap(); + + // Send the stats back. + stats_sig.signal(stats); +} + +/// Simulate a single philospher. +/// +/// The forks must be ordered with the first fork having th lowest number, otherwise this will +/// likely deadlock. +/// +/// This will run for EAT_COUNT times, and then return. +#[embassy_executor::task(pool_size = NUM_PHIL)] +async fn one_phil( + forks: [&'static ESemaphore; 2], + n: usize, + stats: Arc>, +) { + for i in 0..EAT_COUNT { + // Acquire the forks. + // printkln!("Child {n} take left fork"); + forks[0].acquire(1).await.unwrap().disarm(); + // printkln!("Child {n} take right fork"); + forks[1].acquire(1).await.unwrap().disarm(); + + // printkln!("Child {n} eating"); + let delay = get_random_delay(n, 25); + Timer::after(delay).await; + stats.lock().await.record_eat(n, delay); + + // Release the forks. + // printkln!("Child {n} giving up forks"); + forks[1].release(1); + forks[0].release(1); + + let delay = get_random_delay(n, 25); + Timer::after(delay).await; + stats.lock().await.record_think(n, delay); + + printkln!("Philospher {n} finished eating time {i}"); + } + + DONE_SEM.release(1); +} diff --git a/samples/work-philosophers/src/lib.rs b/samples/async-philosophers/src/lib.rs similarity index 57% rename from samples/work-philosophers/src/lib.rs rename to samples/async-philosophers/src/lib.rs index 8037c89e..0cf4b3b7 100644 --- a/samples/work-philosophers/src/lib.rs +++ b/samples/async-philosophers/src/lib.rs @@ -9,76 +9,59 @@ extern crate alloc; -use zephyr::{ - kio::spawn, - kobj_define, printkln, - sync::Arc, - sys::uptime_get, - time::{Duration, Tick}, - work::WorkQueueBuilder, -}; +use embassy_executor::Spawner; +use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex, signal::Signal}; +use embassy_time::Duration; +use static_cell::StaticCell; +use zephyr::{embassy::Executor, printkln, sync::Arc, sys::uptime_get}; mod async_sem; /// How many philosophers. There will be the same number of forks. const NUM_PHIL: usize = 6; -/// Size of the stack for the work queue. -const WORK_STACK_SIZE: usize = 2048; - // The dining philosophers problem is a simple example of cooperation between multiple threads. // This implementation demonstrates a few ways that Zephyr's work-queues can be used to simulate // this problem. #[no_mangle] extern "C" fn rust_main() { - printkln!( - "Async/work-queue dining philosophers{}", - zephyr::kconfig::CONFIG_BOARD - ); + printkln!("Async dining philosophers{}", zephyr::kconfig::CONFIG_BOARD); printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY); - // Create the work queue to run this. - let worker = Arc::new( - WorkQueueBuilder::new() - .set_priority(1) - .start(WORK_STACK.init_once(()).unwrap()), - ); - - // In addition, create a lower priority worker. - let lower_worker = Arc::new( - WorkQueueBuilder::new() - .set_priority(5) - .start(LOWER_WORK_STACK.init_once(()).unwrap()), - ); - - // It is important that work queues are not dropped, as they are persistent objects in the - // Zephyr world. - let _ = Arc::into_raw(lower_worker.clone()); - let _ = Arc::into_raw(worker.clone()); + let executor = EXECUTOR.init(Executor::new()); + executor.run(|spawner| { + spawner.spawn(main(spawner)).unwrap(); + }) +} + +static EXECUTOR: StaticCell = StaticCell::new(); + +type ResultSignal = Signal>>; +static RESULT_SIGNAL: ResultSignal = Signal::new(); +#[embassy_executor::task] +async fn main(spawner: Spawner) -> () { // First run the async semaphore based one. printkln!("Running 'async-sem' test"); - let handle = spawn(async_sem::phil(), &worker, c"async-sem"); - let stats = handle.join(); + spawner + .spawn(async_sem::phil(spawner, &RESULT_SIGNAL)) + .unwrap(); + + let stats = RESULT_SIGNAL.wait().await; printkln!("Done with 'async-sem' test"); - stats.show(); + stats.lock().await.show(); printkln!("All threads done"); } -kobj_define! { - static WORK_STACK: ThreadStack; - static LOWER_WORK_STACK: ThreadStack; -} - /// Get a random delay, based on the ID of this user, and the current uptime. fn get_random_delay(id: usize, period: usize) -> Duration { - let tick = (uptime_get() & (usize::MAX as i64)) as usize; - let delay = (tick / 100 * (id + 1)) & 0x1f; + let tick = (uptime_get() & (usize::MAX as i64)) as u64; + let delay = (tick / 100 * (id as u64 + 1)) & 0x1f; // Use one greater to be sure to never get a delay of zero. - Duration::millis_at_least(((delay + 1) * period) as Tick) + Duration::from_millis((delay + 1) * (period as u64)) } /// Instead of just printint out so much information that the data just scolls by, gather @@ -95,11 +78,11 @@ struct Stats { impl Stats { fn record_eat(&mut self, index: usize, time: Duration) { - self.eating[index] += time.to_millis(); + self.eating[index] += time.as_millis(); } fn record_think(&mut self, index: usize, time: Duration) { - self.thinking[index] += time.to_millis(); + self.thinking[index] += time.as_millis(); self.count[index] += 1; } diff --git a/samples/bench/Cargo.toml b/samples/bench/Cargo.toml index 8088507b..dbf64ef6 100644 --- a/samples/bench/Cargo.toml +++ b/samples/bench/Cargo.toml @@ -13,11 +13,18 @@ license = "Apache-2.0 or MIT" crate-type = ["staticlib"] [dependencies] -zephyr = "0.1.0" +zephyr = { version = "0.1.0", features = ["time-driver", "executor-zephyr"] } critical-section = "1.1.2" heapless = "0.8" static_cell = "2.1" +embassy-executor = { version = "0.7.0", features = ["log", "task-arena-size-2048"] } +embassy-sync = "0.6.2" +embassy-futures = "0.1.1" + +# Hard code the tick rate. +embassy-time = { version = "0.4.0", features = ["tick-hz-10_000"] } + # Dependencies that are used by build.rs. [build-dependencies] zephyr-build = "0.1.0" diff --git a/samples/bench/prj.conf b/samples/bench/prj.conf index db5ad6b3..b6c47c37 100644 --- a/samples/bench/prj.conf +++ b/samples/bench/prj.conf @@ -8,10 +8,16 @@ CONFIG_MAIN_STACK_SIZE=8192 CONFIG_POLL=y # CONFIG_USERSPACE=y +# CONFIG_DEBUG=y +# CONFIG_ASSERT=y +# CONFIG_STACK_SENTINEL=y +# CONFIG_STACK_USAGE=y +# CONFIG_STACK_CANARIES=y # Some debugging CONFIG_THREAD_MONITOR=y CONFIG_THREAD_ANALYZER=y CONFIG_THREAD_ANALYZER_USE_PRINTK=y CONFIG_THREAD_ANALYZER_AUTO=n +CONFIG_DEBUG_THREAD_INFO=y # CONFIG_THREAD_ANALYZER_AUTO_INTERVAL=15 diff --git a/samples/bench/src/executor.rs b/samples/bench/src/executor.rs new file mode 100644 index 00000000..5597c18c --- /dev/null +++ b/samples/bench/src/executor.rs @@ -0,0 +1,349 @@ +//! Executor-based tests +//! +//! These tests try to be as similar as possible to the thread-based tests, but instead are built +//! around async. + +use core::{ffi::c_int, sync::atomic::Ordering}; + +use alloc::format; +use embassy_executor::SendSpawner; +use embassy_futures::yield_now; +#[allow(unused_imports)] +use embassy_sync::semaphore::{FairSemaphore, GreedySemaphore}; +use embassy_sync::{ + blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel, semaphore::Semaphore, + signal::Signal, +}; +use static_cell::StaticCell; +use zephyr::{embassy::Executor, sync::atomic::AtomicBool}; + +use crate::{BenchTimer, Command, TestResult, NUM_THREADS, THREAD_STACK_SIZE}; + +/// As the tests do exercise different executors at different priorities, use the critical section +/// variant. +// type ASemaphore = GreedySemaphore; +type ASemaphore = FairSemaphore; + +/// A Signal for reporting back spawners. +type SpawnerSignal = Signal; + +/// Async-based test runner. +pub struct AsyncTests { + /// Each test thread gets a semaphore, to use as appropriate for that test. + sems: &'static [ASemaphore], + + /// A back sem for the reverse direction. + back_sem: &'static ASemaphore, + + /// The spawners, of the three priorities. + spawners: [SendSpawner; 3], + + /// Tests results are communicated back via this channels. + results: Channel, + + /// Main started, allows main to be started lazily, after we've become static. + main_started: AtomicBool, + + /// Indicates back to the main thread that the given test is finished. + end_wait: zephyr::sys::sync::Semaphore, +} + +/// The executors for the tests. The values are low, medium, and high priority. +static EXECUTORS: [StaticCell; 3] = [const { StaticCell::new() }; 3]; + +/// Each executor returns a SendSpawner to span tasks on that executor. +static SPAWNERS: [Signal; 3] = [const { Signal::new() }; 3]; + +/// Static semaphores for the above. +static SEMS: [ASemaphore; NUM_THREADS] = [const { ASemaphore::new(0) }; NUM_THREADS]; + +static BACK_SEM: ASemaphore = ASemaphore::new(0); + +/// Main command +static MAIN_CMD: Channel = Channel::new(); + +impl AsyncTests { + /// Construct a new set of tests, firing up the threads for the differernt priority executors. + pub fn new(count: usize) -> Self { + // printkln!("Starting executors"); + let spawners = [0, 1, 2].map(|id| { + let thread = executor_thread(&EXECUTORS[id], &SPAWNERS[id]); + thread.set_priority(id as c_int + 5); + thread.start(); + + zephyr::time::sleep(zephyr::time::Duration::millis_at_least(1)); + if let Some(sp) = SPAWNERS[id].try_take() { + sp + } else { + panic!("Executor thread did not initialize properly"); + } + }); + + Self { + sems: &SEMS[0..count], + back_sem: &BACK_SEM, + spawners, + results: Channel::new(), + main_started: AtomicBool::new(false), + end_wait: zephyr::sys::sync::Semaphore::new(0, 1), + } + } + + /// Run one of the given tests. + /// + /// Fires off the appropriate workers for the given test. Tests follow a basic pattern: + /// There are NUM_THREADS + 2 tasks spawned. These communicate as appropriate for the given + /// test, and the results are then collected, waiting for everything to finish, and reported. + pub fn run(&'static self, command: Command) { + if !self.main_started.load(Ordering::Relaxed) { + self.spawners[1].spawn(main_run(self)).unwrap(); + self.main_started.store(true, Ordering::Relaxed); + } + + if MAIN_CMD.try_send(command).is_err() { + panic!("Main queue filled up"); + } + + // Wait for the Zephyr semaphore indicating the test is finished. + self.end_wait.take(zephyr::time::Forever).unwrap(); + } + + /// A simple semaphore test. Tests the time to use the semaphore with no blocking. + async fn simple_sem(&self, id: usize, count: usize) -> usize { + let sem = &self.sems[id]; + + for _ in 0..count { + sem.release(1); + let rel = sem.acquire(1).await.unwrap(); + rel.disarm(); + } + + count + } + + /// A simple semaphore test, with yield. Tests the time to use the semaphore with no blocking. + async fn simple_sem_yield(&self, id: usize, count: usize) -> usize { + let sem = &self.sems[id]; + + for _ in 0..count { + sem.release(1); + let rel = sem.acquire(1).await.unwrap(); + rel.disarm(); + yield_now().await; + } + + count + } + + /// The taker side of the SemWait test. + async fn sem_wait_taker(&self, id: usize, count: usize) -> usize { + let sem = &self.sems[id]; + for _ in 0..count { + let rel = sem.acquire(1).await.unwrap(); + rel.disarm(); + } + count + } + + /// The giver side of the SemWait test. + async fn sem_wait_giver(&self, count: usize) { + for _ in 0..count { + for sem in self.sems { + sem.release(1); + } + } + } + + /// The taker side of the SemPingPong test. + async fn sem_ping_pong_taker(&self, count: usize) -> usize { + let sem = &self.sems[0]; + let back_sem = self.back_sem; + + // zephyr::printkln!("Taking {count} sems"); + for _ in 0..count { + //zephyr::printkln!("acquire1"); + let rel = sem.acquire(1).await.unwrap(); + //zephyr::printkln!("acquired1"); + rel.disarm(); + //zephyr::printkln!("release2"); + back_sem.release(1); + } + // zephyr::printkln!("Taking sems done"); + + count + } + + /// The giver side of the SemPingPong test. This uses the first sem and back sem to ping pong + /// across all of the workers. + async fn sem_ping_pong_giver(&self, count: usize) { + let sem = &self.sems[0]; + let back_sem = self.back_sem; + + // zephyr::printkln!("Giving {},{} sems", count, self.sems.len()); + for _ in 0..count { + for _ in 0..self.sems.len() { + //zephyr::printkln!("release1"); + sem.release(1); + //zephyr::printkln!("acquire2"); + let rel = back_sem.acquire(1).await.unwrap(); + //zephyr::printkln!("acquired2"); + rel.disarm(); + } + } + // zephyr::printkln!("Giving sems done"); + } +} + +/// The low priority worker, depending on the test, performs some operations at a lower priority +/// than the main threads. +#[embassy_executor::task] +async fn low_worker(this: &'static AsyncTests, command: Command) { + match command { + Command::Empty => (), + Command::SimpleSem(_) => (), + Command::SimpleSemYield(_) => (), + Command::SemWait(count) => this.sem_wait_giver(count).await, + Command::SemWaitSame(count) => this.sem_wait_giver(count).await, + Command::SemPingPong(count) => this.sem_ping_pong_giver(count).await, + Command::SemOnePingPong(count) => this.sem_ping_pong_giver(count).await, + command => panic!("Not implemented: {:?}", command), + } + + this.results.send(TestResult::Low).await; +} + +/// The high priority worker, performs some operations at a higher priority than the main threads. +#[embassy_executor::task] +async fn high_worker(this: &'static AsyncTests, command: Command) { + // printkln!("high_worker started"); + match command { + Command::Empty => (), + Command::SimpleSem(_) => (), + Command::SimpleSemYield(_) => (), + Command::SemWait(_) => (), + Command::SemWaitSame(_) => (), + Command::SemPingPong(_) => (), + Command::SemOnePingPong(_) => (), + command => panic!("Not implemented: {:?}", command), + } + + this.results.send(TestResult::High).await; +} + +/// The main worker threads. +/// +/// These perform the main work of the test, generally communicating with either the main test task, +/// which is in the same executor, or with the higher or lower priority worker. +#[embassy_executor::task(pool_size = NUM_THREADS)] +async fn worker(this: &'static AsyncTests, command: Command, id: usize) { + let total; + match command { + Command::Empty => total = 1, + Command::SimpleSem(count) => total = this.simple_sem(id, count).await, + Command::SimpleSemYield(count) => total = this.simple_sem_yield(id, count).await, + Command::SemWait(count) => total = this.sem_wait_taker(id, count).await, + Command::SemWaitSame(count) => total = this.sem_wait_taker(id, count).await, + Command::SemPingPong(count) => total = this.sem_ping_pong_taker(count).await, + Command::SemOnePingPong(count) => total = this.sem_ping_pong_taker(count).await, + command => panic!("Not implemented: {:?}", command), + } + + this.results + .send(TestResult::Worker { id, count: total }) + .await; +} + +/// Main worker. +/// +/// This task performs the main work. +#[embassy_executor::task] +async fn main_run(this: &'static AsyncTests) { + loop { + let command = MAIN_CMD.receive().await; + let msg = format!("async {:?}", command); + + let mut timer = BenchTimer::new(&msg, 1); + + let ntasks = this.sems.len(); + + // Before starting, reset the semaphores. + for sem in this.sems { + sem.set(0); + } + this.back_sem.set(0); + + if true { + this.spawners[2].spawn(high_worker(this, command)).unwrap(); + if command.is_same_priority() { + this.spawners[1].spawn(low_worker(this, command)).unwrap(); + } else { + this.spawners[0].spawn(low_worker(this, command)).unwrap(); + } + } else { + this.spawners[1].spawn(high_worker(this, command)).unwrap(); + this.spawners[1].spawn(low_worker(this, command)).unwrap(); + } + + for id in 0..ntasks { + this.spawners[1].spawn(worker(this, command, id)).unwrap(); + } + + let mut results: heapless::Vec, NUM_THREADS> = heapless::Vec::new(); + let mut low = false; + let mut high = false; + let mut msg_count = (2 + ntasks) as isize; + + for _ in 0..ntasks { + results.push(None).unwrap(); + } + + loop { + match this.results.receive().await { + TestResult::Worker { id, count } => { + if results[id].replace(count).is_some() { + panic!("Multiple results from worker {}", id); + } + } + TestResult::Low => { + if low { + panic!("Multiple results from 'low' worker"); + } + low = true; + } + TestResult::High => { + if high { + panic!("Multiple results from 'high' worker"); + } + high = true; + } + } + msg_count -= 1; + if msg_count <= 0 { + break; + } + } + + let count: usize = results.iter().map(|x| x.unwrap_or(0)).sum(); + timer.adjust_count(count); + + timer.stop(); + this.end_wait.give(); + } +} + +/// Main for each executor. +/// +/// This thread starts up an executor, and then places that executor into a 'signal' so the +/// non-async code can get the value. +#[zephyr::thread(stack_size = THREAD_STACK_SIZE, pool_size = 3)] +fn executor_thread(exec: &'static StaticCell, spawner_sig: &'static SpawnerSignal) -> ! { + let exec = exec.init(Executor::new()); + exec.run(|spawner| { + // TODO: Should we try to have a local spawner as well? + spawner_sig.signal(spawner.make_send()); + }) +} + +// For debugging +#[unsafe(no_mangle)] +extern "C" fn invalid_spinlock(_l: *mut zephyr::raw::k_spinlock, _thread: u32, _id: u32) {} diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 2e91148e..432384fe 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -14,37 +14,45 @@ use core::pin::Pin; use alloc::collections::vec_deque::VecDeque; use alloc::vec; -use alloc::vec::Vec; +use executor::AsyncTests; use static_cell::StaticCell; -use zephyr::sync::SpinMutex; +use zephyr::kobj_define; +use zephyr::raw::k_yield; +use zephyr::sync::{PinWeak, SpinMutex}; use zephyr::time::NoWait; -use zephyr::work::futures::work_size; -use zephyr::work::{SimpleAction, Work}; +use zephyr::work::{SimpleAction, Work, WorkQueueBuilder}; use zephyr::{ kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, - kio::{spawn, yield_now}, - kobj_define, printkln, - raw::{k_cycle_get_64, k_yield}, + printkln, + raw::k_cycle_get_64, sync::{ channel::{bounded, unbounded, Receiver, Sender}, Arc, }, sys::sync::Semaphore, time::Forever, - work::{WorkQueue, WorkQueueBuilder}, + work::WorkQueue, }; +mod executor; + /// How many threads to run in the tests. const NUM_THREADS: usize = 6; /// Stack size to use for the threads. -const THREAD_STACK_SIZE: usize = 2048; +#[cfg(target_pointer_width = "32")] +const THREAD_STACK_SIZE: usize = 4 * 1024; + +/// Stack size to use for the threads. +#[cfg(target_pointer_width = "64")] +const THREAD_STACK_SIZE: usize = 5120; /// Stack size to use for the work queue. -const WORK_STACK_SIZE: usize = 2048; +const WORK_STACK_SIZE: usize = 4096; /// This is a global iteration. Small numbers still test functionality within CI, and large numbers /// give more meaningful benchmark results. +// const TOTAL_ITERS: usize = 10; const TOTAL_ITERS: usize = 1_000; // const TOTAL_ITERS: usize = 10_000; @@ -55,6 +63,18 @@ type HeaplessVec = heapless::Vec; extern "C" fn rust_main() { let tester = ThreadTests::new(NUM_THREADS); + static ATESTER: StaticCell = StaticCell::new(); + let atester = ATESTER.init(AsyncTests::new(NUM_THREADS)); + + if false { + atester.run(Command::SemPingPong(TOTAL_ITERS)); + printkln!("Stopping"); + return; + } + + atester.run(Command::SimpleSem(TOTAL_ITERS)); + atester.run(Command::SimpleSem(TOTAL_ITERS)); + // Some basic benchmarks arc_bench(); spin_bench(); @@ -62,36 +82,41 @@ extern "C" fn rust_main() { let simple = Simple::new(tester.workq.clone()); let mut num = 6; - while num < 500 { + while num < 250 { simple.run(num, TOTAL_ITERS / num); num = num * 13 / 10; } tester.run(Command::Empty); + atester.run(Command::Empty); tester.run(Command::SimpleSem(TOTAL_ITERS)); - tester.run(Command::SimpleSemAsync(TOTAL_ITERS)); + atester.run(Command::SimpleSem(TOTAL_ITERS)); tester.run(Command::SimpleSemYield(TOTAL_ITERS)); - tester.run(Command::SimpleSemYieldAsync(TOTAL_ITERS)); + atester.run(Command::SimpleSemYield(TOTAL_ITERS)); tester.run(Command::SemWait(TOTAL_ITERS)); - tester.run(Command::SemWaitAsync(TOTAL_ITERS)); - tester.run(Command::SemWaitSameAsync(TOTAL_ITERS)); + atester.run(Command::SemWait(TOTAL_ITERS)); + atester.run(Command::SemWaitSame(TOTAL_ITERS)); tester.run(Command::SemHigh(TOTAL_ITERS)); + // atester.run(Command::SemHigh(TOTAL_ITERS)); + atester.run(Command::SemOnePingPong(TOTAL_ITERS)); tester.run(Command::SemPingPong(TOTAL_ITERS)); - tester.run(Command::SemPingPongAsync(TOTAL_ITERS)); + atester.run(Command::SemPingPong(TOTAL_ITERS)); tester.run(Command::SemOnePingPong(TOTAL_ITERS)); - /* - tester.run(Command::SemOnePingPongAsync(NUM_THREADS, TOTAL_ITERS / 6)); - tester.run(Command::SemOnePingPongAsync(20, TOTAL_ITERS / 20)); - tester.run(Command::SemOnePingPongAsync(50, TOTAL_ITERS / 50)); - tester.run(Command::SemOnePingPongAsync(100, TOTAL_ITERS / 100)); - tester.run(Command::SemOnePingPongAsync(500, TOTAL_ITERS / 500)); + atester.run(Command::SemOnePingPong(TOTAL_ITERS)); + // tester.run(command::semonepingpongasync(num_threads, total_iters / 6)); + // tester.run(command::semonepingpongasync(20, total_iters / 20)); + // tester.run(command::semonepingpongasync(50, total_iters / 50)); + // tester.run(command::semonepingpongasync(100, total_iters / 100)); + // tester.run(command::semonepingpongasync(500, total_iters / 500)); tester.run(Command::Empty); - */ + + /* let mut num = 6; while num < 100 { tester.run(Command::SemOnePingPongAsync(num, TOTAL_ITERS / num)); num = num * 13 / 10; } + */ printkln!("Done with all tests\n"); } @@ -105,10 +130,10 @@ extern "C" fn rust_main() { /// low priority task is providing the data. struct ThreadTests { /// Each test thread gets a semaphore, to use as appropriate for that test. - sems: HeaplessVec<&'static Semaphore>, + sems: &'static [Semaphore; NUM_THREADS], /// This semaphore is used to ping-ping back to another thread. - back_sems: HeaplessVec<&'static Semaphore>, + back_sems: &'static [Semaphore; NUM_THREADS], /// Each test also has a message queue, for testing, that it has sender and receiver for. chans: HeaplessVec>, @@ -126,7 +151,7 @@ struct ThreadTests { /// The test also all return their result to the main. The threads Send, the main running /// receives. - results: ChanPair, + results: ChanPair, } impl ThreadTests { @@ -149,73 +174,52 @@ impl ThreadTests { let _ = Arc::into_raw(workq.clone()); let mut result = Self { - sems: HeaplessVec::new(), - back_sems: HeaplessVec::new(), + sems: &SEMS, + back_sems: &BACK_SEMS, chans: HeaplessVec::new(), commands: HeaplessVec::new(), - results: ChanPair::new_unbounded(), low_command: low_send, high_command: high_send, + results: ChanPair::new_unbounded(), workq, }; - let mut thread_commands = Vec::new(); - - static SEMS: [StaticCell; NUM_THREADS] = - [const { StaticCell::new() }; NUM_THREADS]; - static BACK_SEMS: [StaticCell; NUM_THREADS] = - [const { StaticCell::new() }; NUM_THREADS]; - - for i in 0..count { - let sem = SEMS[i].init(Semaphore::new(0, u32::MAX)); - result.sems.push(sem).unwrap(); - - let sem = BACK_SEMS[i].init(Semaphore::new(0, u32::MAX)); - result.back_sems.push(sem).unwrap(); + let mut thread_commands = HeaplessVec::new(); + for _ in 0..count { let chans = ChanPair::new_bounded(1); result.chans.push(chans.clone()).unwrap(); let (csend, crecv) = bounded(1); result.commands.push(csend).unwrap(); - thread_commands.push(crecv); + thread_commands.push(crecv).unwrap(); } - // Result is initialized, move it into the Arc. let result = Arc::new(result); + // Spawn the worker threads. for i in 0..count { let result2 = result.clone(); let cmd2 = thread_commands[i].clone(); - let mut thread = TEST_THREADS[i] - .init_once(TEST_STACKS[i].init_once(()).unwrap()) - .unwrap(); - // Main test threads run at priority 0. + let thread = test_worker(result2, cmd2, i); thread.set_priority(5); - thread.spawn(move || { - Self::worker(result2, cmd2, i); - }); + thread.start(); } // And fire up the low and high priority workers. let result2 = result.clone(); - let mut thread = LOW_THREAD - .init_once(LOW_STACK.init_once(()).unwrap()) - .unwrap(); + let thread = test_low_runner(result2, low_recv); thread.set_priority(6); - thread.spawn(move || { - Self::low_runner(result2, low_recv); - }); + thread.start(); let result2 = result.clone(); - let mut thread = HIGH_THREAD - .init_once(HIGH_STACK.init_once(()).unwrap()) - .unwrap(); + let thread = test_high_runner(result2, high_recv); thread.set_priority(4); - thread.spawn(move || { - Self::high_runner(result2, high_recv); - }); + thread.start(); + + result + /* // Calculate a size to show. printkln!( "worker size: {} bytes", @@ -229,6 +233,7 @@ impl ThreadTests { ); result + */ } fn run(&self, command: Command) { @@ -236,13 +241,13 @@ impl ThreadTests { // In case previous runs left the semaphore non-zero, reset all of them. This is safe due // to nothing using the semaphores right now. - for sem in &self.sems { + for sem in self.sems { if sem.count_get() > 0 { printkln!("Warning: previous test left count: {}", sem.count_get()); sem.reset(); } } - for sem in &self.back_sems { + for sem in self.back_sems { if sem.count_get() > 0 { printkln!("Warning: previous test left count: {}", sem.count_get()); sem.reset(); @@ -265,7 +270,7 @@ impl ThreadTests { while let Ok(cmd) = self.results.receiver.recv() { match cmd { - Result::Worker { id, count } => { + TestResult::Worker { id, count } => { if results[id].replace(count).is_some() { panic!("Multiple result from worker {}", id); } @@ -274,7 +279,7 @@ impl ThreadTests { break; } } - Result::Low => { + TestResult::Low => { if low { panic!("Multiple results from 'low' worker"); } @@ -285,7 +290,7 @@ impl ThreadTests { break; } } - Result::High => { + TestResult::High => { if high { panic!("Multiple results from 'high' worker"); } @@ -317,7 +322,7 @@ impl ThreadTests { printkln!(" {:8.3} us, {} of {:?}", time, total, command); } - /// Run the thread worker itself. + /// The worker threads themselves. fn worker(this: Arc, command: Receiver, id: usize) { while let Ok(cmd) = command.recv() { let mut total = 0; @@ -353,107 +358,14 @@ impl ThreadTests { this.ping_pong_worker(id, &this.sems[0], &this.back_sems[0], count, &mut total); } - // For the async commands, spawn this on the worker thread and don't reply - // ourselves. - Command::SimpleSemAsync(count) => { - spawn( - Self::simple_sem_async(this.clone(), id, this.sems[id], count), - &this.workq, - c"worker", - ); - continue; - } - - Command::SimpleSemYieldAsync(count) => { - spawn( - Self::simple_sem_yield_async(this.clone(), id, this.sems[id], count), - &this.workq, - c"worker", - ); - continue; - } - - Command::SemWaitAsync(count) => { - spawn( - Self::sem_take_async(this.clone(), id, this.sems[id], count), - &this.workq, - c"worker", - ); - continue; - } - - Command::SemWaitSameAsync(count) => { - spawn( - Self::sem_take_async(this.clone(), id, this.sems[id], count), - &this.workq, - c"worker", - ); - if id == 0 { - spawn( - Self::sem_giver_async(this.clone(), this.sems.clone(), count), - &this.workq, - c"giver", - ); - } - continue; - } - - Command::SemPingPongAsync(count) => { - spawn( - Self::ping_pong_worker_async( - this.clone(), - id, - this.sems[id], - this.back_sems[id], - count, - ), - &this.workq, - c"worker", - ); - if id == 0 { - spawn( - Self::ping_pong_replier_async(this.clone(), count), - &this.workq, - c"giver", - ); - } - - continue; - } - - Command::SemOnePingPongAsync(nthread, count) => { - if id == 0 { - for th in 0..nthread { - spawn( - Self::ping_pong_worker_async( - this.clone(), - th, - this.sems[0], - this.back_sems[0], - count, - ), - &this.workq, - c"worker", - ); - } - spawn( - Self::one_ping_pong_replier_async(this.clone(), nthread, count), - &this.workq, - c"giver", - ); - } - - // Avoid the reply for the number of workers that are within the range. This - // does assume that nthread will always be >= the number configured. - if id < this.sems.len() { - continue; - } + Command::SemWaitSame(count) => { + this.sem_take(&this.sems[id], count, &mut total); } } this.results .sender - .send(Result::Worker { id, count: total }) + .send(TestResult::Worker { id, count: total }) .unwrap(); } } @@ -468,38 +380,6 @@ impl ThreadTests { } } - async fn simple_sem_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { - for _ in 0..count { - sem.give(); - sem.take_async(NoWait).await.unwrap(); - } - - this.results - .sender - .send_async(Result::Worker { id, count }) - .await - .unwrap(); - } - - async fn simple_sem_yield_async( - this: Arc, - id: usize, - sem: &'static Semaphore, - count: usize, - ) { - for _ in 0..count { - sem.give(); - sem.take_async(NoWait).await.unwrap(); - yield_now().await; - } - - this.results - .sender - .send_async(Result::Worker { id, count }) - .await - .unwrap(); - } - /// A simple semaphore test, worker thread version, with a yield fn simple_sem_yield(&self, sem: &Semaphore, count: usize, total: &mut usize) { for _ in 0..count { @@ -518,24 +398,6 @@ impl ThreadTests { } } - async fn sem_take_async(this: Arc, id: usize, sem: &'static Semaphore, count: usize) { - for _ in 0..count { - // Enable this to verify that we are actually blocking. - if false { - if let Ok(_) = sem.take(NoWait) { - panic!("Semaphore was already available"); - } - } - sem.take_async(Forever).await.unwrap(); - } - - this.results - .sender - .send_async(Result::Worker { id, count }) - .await - .unwrap(); - } - /// Worker side of the ping pong sem, takes the 'sem' and gives to the back_sem. fn ping_pong_worker( &self, @@ -557,36 +419,9 @@ impl ThreadTests { } } - async fn ping_pong_worker_async( - this: Arc, - id: usize, - sem: &'static Semaphore, - back_sem: &'static Semaphore, - count: usize, - ) { - for i in 0..count { - if false { - if let Ok(_) = sem.take(NoWait) { - panic!("Semaphore was already available: {} loop:{}", id, i); - } - } - sem.take_async(Forever).await.unwrap(); - back_sem.give(); - } - - // Only send for an ID in range. - if id < this.sems.len() { - this.results - .sender - .send_async(Result::Worker { id, count }) - .await - .unwrap(); - } - } - fn ping_pong_replier(&self, count: usize) { for _ in 0..count { - for (sem, back) in self.sems.iter().zip(&self.back_sems) { + for (sem, back) in self.sems.iter().zip(self.back_sems) { sem.give(); back.take(Forever).unwrap(); } @@ -602,43 +437,6 @@ impl ThreadTests { } } - async fn ping_pong_replier_async(this: Arc, count: usize) { - for _ in 0..count { - for (sem, back) in this.sems.iter().zip(&this.back_sems) { - sem.give(); - back.take_async(Forever).await.unwrap(); - } - } - - // No reply. - } - - async fn one_ping_pong_replier_async(this: Arc, nthread: usize, count: usize) { - for _ in 0..count { - for _ in 0..nthread { - this.sems[0].give(); - this.back_sems[0].take_async(Forever).await.unwrap(); - } - } - - // No reply. - } - - async fn sem_giver_async(this: Arc, sems: HeaplessVec<&'static Semaphore>, count: usize) { - for _ in 0..count { - for sem in &sems { - sem.give(); - - // Yield after each loop. This should only force a reschedule each task's operation, - // just enough to make sure everything still blocks. - yield_now().await; - } - } - - // No report. - let _ = this; - } - /// And the low priority worker. fn low_runner(this: Arc, command: Receiver) { let _ = this; @@ -646,19 +444,16 @@ impl ThreadTests { match cmd { Command::Empty => (), Command::SimpleSem(_) => (), - Command::SimpleSemAsync(_) => (), Command::SimpleSemYield(_) => (), - Command::SimpleSemYieldAsync(_) => (), - Command::SemWait(count) | Command::SemWaitAsync(count) => { + Command::SemWait(count) | Command::SemWaitSame(count) => { // The low-priority thread does all of the gives, this should cause every single // semaphore operation to wait. for _ in 0..count { - for sem in &this.sems { + for sem in this.sems { sem.give(); } } } - Command::SemWaitSameAsync(_) => (), Command::SemHigh(_) => (), Command::SemPingPong(count) => { this.ping_pong_replier(count); @@ -666,12 +461,10 @@ impl ThreadTests { Command::SemOnePingPong(count) => { this.one_ping_pong_replier(count); } - Command::SemPingPongAsync(_) => (), - Command::SemOnePingPongAsync(_, _) => (), } // printkln!("low command: {:?}", cmd); - this.results.sender.send(Result::Low).unwrap(); + this.results.sender.send(TestResult::Low).unwrap(); } } @@ -681,33 +474,46 @@ impl ThreadTests { match cmd { Command::Empty => (), Command::SimpleSem(_) => (), - Command::SimpleSemAsync(_) => (), Command::SimpleSemYield(_) => (), - Command::SimpleSemYieldAsync(_) => (), Command::SemWait(_) => (), - Command::SemWaitAsync(_) => (), - Command::SemWaitSameAsync(_) => (), + Command::SemWaitSame(_) => (), Command::SemPingPong(_) => (), Command::SemOnePingPong(_) => (), Command::SemHigh(count) => { // The high-priority thread does all of the gives, this should cause every single // semaphore operation to be ready. for _ in 0..count { - for sem in &this.sems { + for sem in this.sems { sem.give(); } } } - Command::SemPingPongAsync(_) => (), - Command::SemOnePingPongAsync(_, _) => (), } // printkln!("high command: {:?}", cmd); - this.results.sender.send(Result::High).unwrap(); + this.results.sender.send(TestResult::High).unwrap(); } } } +/// Top level Zephyr thread for the workers. +#[zephyr::thread(stack_size = THREAD_STACK_SIZE, pool_size = NUM_THREADS)] +fn test_worker(this: Arc, command: Receiver, id: usize) { + ThreadTests::worker(this, command, id) +} + +/// The low priority worker. +#[zephyr::thread(stack_size = THREAD_STACK_SIZE)] +fn test_low_runner(this: Arc, command: Receiver) { + ThreadTests::low_runner(this, command) +} + +/// The high priority worker. +#[zephyr::thread(stack_size = THREAD_STACK_SIZE)] +fn test_high_runner(this: Arc, command: Receiver) { + ThreadTests::high_runner(this, command) +} + #[derive(Clone, Debug)] struct ChanPair { sender: Sender, @@ -726,41 +532,41 @@ impl ChanPair { } } -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] enum Command { /// The empty test. Does nothing, but invoke everything. Useful to determine overhead. Empty, /// A simple semaphore, we give and take the semaphore ourselves. SimpleSem(usize), - /// Simple semaphore, with async - SimpleSemAsync(usize), /// A simple semaphore, we give and take the semaphore ourselves. With a yield between each to /// force context switches. SimpleSemYield(usize), /// SimpleSemYield by async with yield (non context switch yield, just work-queue reschedule - SimpleSemYieldAsync(usize), - /// Semaphore test where the low priority thread does the 'give', so every wait should - /// block. SemWait(usize), - /// Same as SemWait, but async. - SemWaitAsync(usize), /// SemWaitAsync but with the 'give' task also on the same work queue. - SemWaitSameAsync(usize), + SemWaitSame(usize), /// Semaphore tests where the high priority thread does the 'give', so every wait should be /// read. SemHigh(usize), /// Semaphores ping-ponging between worker threads and a low priority thread. SemPingPong(usize), /// SemPingPong, but async - SemPingPongAsync(usize), - /// PingPong but with a single shared semaphore. Demonstrates multiple threads queued on the - /// same object. SemOnePingPong(usize), - /// Same as SemOnePingPong, but async. The first parameter is the number of async tasks. - SemOnePingPongAsync(usize, usize), } -enum Result { +impl Command { + /// Determine if this command intents for the "low" worker to be at the same priority as the + /// main worker. + pub fn is_same_priority(self) -> bool { + match self { + Self::SemWaitSame(_) => true, + Self::SemOnePingPong(_) => true, + _ => false, + } + } +} + +enum TestResult { Worker { /// What is the id of this worker. id: usize, @@ -819,12 +625,23 @@ impl Simple { workers, iterations ); + + // Before we go away, make sure that there aren't any leaked workers. + /* + let mut locked = main.action().locked.lock().unwrap(); + while let Some(other) = locked.works.pop_front() { + // Portable atomic's Arc seems to be a problem here. + let other = unsafe { Pin::into_inner_unchecked(other) }; + assert_eq!(Arc::strong_count(&other), 1); + // printkln!("Child: {} refs", Arc::strong_count(&other)); + } + */ } } /// A simple worker. When run, it submits the main worker to do the next work. struct SimpleWorker { - main: Pin>>, + main: PinWeak>, workq: Arc, _id: usize, } @@ -832,7 +649,7 @@ struct SimpleWorker { impl SimpleWorker { fn new(main: Pin>>, workq: Arc, id: usize) -> Self { Self { - main, + main: PinWeak::downgrade(main), workq, _id: id, } @@ -842,7 +659,8 @@ impl SimpleWorker { impl SimpleAction for SimpleWorker { fn act(self: Pin<&Self>) { // Each time we are run, fire the main worker back up. - Work::submit_to_queue(self.main.clone(), &self.workq).unwrap(); + let main = self.main.upgrade().unwrap(); + Work::submit_to_queue(main.clone(), &self.workq).unwrap(); } } @@ -968,8 +786,14 @@ impl<'a> BenchTimer<'a> { } } + /// Adjust the count, for cases where we don't know the count until later. + pub fn adjust_count(&mut self, count: usize) { + self.count = count; + } + pub fn stop(self) { let stop = now(); + let raw = (stop - self.start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1.0e6; let time = (stop - self.start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0; let time = if self.count > 0 { @@ -978,19 +802,19 @@ impl<'a> BenchTimer<'a> { 0.0 }; - printkln!(" {:8.3} us, {} of {}", time, self.count, self.what); + printkln!( + " {:8.3} us, {} of {} {:8.3}us raw", + time, + self.count, + self.what, + raw, + ); } } kobj_define! { - static TEST_THREADS: [StaticThread; NUM_THREADS]; - static TEST_STACKS: [ThreadStack; NUM_THREADS]; - - static LOW_THREAD: StaticThread; - static LOW_STACK: ThreadStack; - - static HIGH_THREAD: StaticThread; - static HIGH_STACK: ThreadStack; - static WORK_STACK: ThreadStack; } + +static SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; +static BACK_SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index f602ec49..473e84b5 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -14,7 +14,7 @@ use alloc::boxed::Box; use alloc::vec::Vec; use zephyr::time::{sleep, Duration, Tick}; use zephyr::{ - kobj_define, printkln, + printkln, sync::{Arc, Mutex}, sys::uptime_get, }; @@ -75,12 +75,7 @@ extern "C" fn rust_main() { printkln!("Pre fork"); for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) { - let thread = PHIL_THREADS[i] - .init_once(PHIL_STACKS[i].init_once(()).unwrap()) - .unwrap(); - thread.spawn(move || { - phil_thread(i, syncer, stats); - }); + phil_thread(i, syncer, stats).start(); } let delay = Duration::secs_at_least(10); @@ -129,6 +124,7 @@ fn get_syncer() -> Vec> { get_channel_syncer() } +#[zephyr::thread(stack_size = PHIL_STACK_SIZE, pool_size = NUM_PHIL)] fn phil_thread(n: usize, syncer: Arc, stats: &'static Mutex) { printkln!("Child {} started: {:?}", n, syncer); @@ -219,8 +215,3 @@ impl Stats { } static STAT_MUTEX: Mutex = Mutex::new(Stats::new()); - -kobj_define! { - static PHIL_THREADS: [StaticThread; NUM_PHIL]; - static PHIL_STACKS: [ThreadStack; NUM_PHIL]; -} diff --git a/samples/work-philosophers/Cargo.toml b/samples/work-philosophers/Cargo.toml deleted file mode 100644 index af2dcd45..00000000 --- a/samples/work-philosophers/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (c) 2024 Linaro LTD -# SPDX-License-Identifier: Apache-2.0 - -[package] -# This must be rustapp for now. -name = "rustapp" -version = "0.1.0" -edition = "2021" -description = "A sample hello world application in Rust" -license = "Apache-2.0 or MIT" - -[lib] -crate-type = ["staticlib"] - -[dependencies] -zephyr = "0.1.0" - -# Dependencies that are used by build.rs. -[build-dependencies] -zephyr-build = "0.1.0" - -[profile.release] -debug-assertions = true -overflow-checks = true -debug = true diff --git a/samples/work-philosophers/src/async_sem.rs b/samples/work-philosophers/src/async_sem.rs deleted file mode 100644 index 5c593d15..00000000 --- a/samples/work-philosophers/src/async_sem.rs +++ /dev/null @@ -1,113 +0,0 @@ -//! Async Semaphore based demo -//! -//! This implementation on the dining philosopher problem uses Zephyr semaphores to represent the -//! forks. Each philosopher dines as per the algorithm a number of times, and when the are all -//! finished, the test is considered successful. Deadlock will result in the primary thread not -//! completing. -//! -//! Notably, this uses Rc and RefCell along with spawn_local to demonstrate that multiple async -//! tasks run on the same worker do not need Send. It is just important that write operations on -//! the RefCell do not `.await` or a panic is likely. - -use core::cell::RefCell; - -use alloc::{rc::Rc, vec::Vec}; -use zephyr::{ - kio::{sleep, spawn_local}, - printkln, - sys::sync::Semaphore, - time::Forever, -}; - -use crate::{get_random_delay, Stats, NUM_PHIL}; - -/// Number of iterations of each philospher. -/// -/// Should be long enough to exercise the test, but too -/// long and the test will timeout. The delay calculated will randomly be between 25 and 775, and -/// there are two waits, so typically, each "eat" will take about a second. -const EAT_COUNT: usize = 10; - -pub async fn phil() -> Stats { - // It is a little tricky to be able to use local workers. We have to have this nested thread - // that waits. This is because the Future from `local_phil()` does not implement Send, since it - // waits for the philosophers, which are not Send. However, this outer async function does not - // hold onto any data that is not send, and therefore will be Send. Fortunately, this extra - // Future is very lightweight. - spawn_local(local_phil(), c"phil_wrap").join_async().await -} - -async fn local_phil() -> Stats { - // Our overall stats. - let stats = Rc::new(RefCell::new(Stats::default())); - - // One fork for each philospher. - let forks: Vec<_> = (0..NUM_PHIL) - .map(|_| Rc::new(Semaphore::new(1, 1))) - .collect(); - - // Create all of the philosphers - let phils: Vec<_> = (0..NUM_PHIL) - .map(|i| { - // Determine the two forks. The forks are paired with each philosopher taking the fork of - // their number, and the next on, module the size of the ring. However, for the last case, - // we need to swap the forks used, it is necessary to obey a strict ordering of the locks to - // avoid deadlocks. - let forks = if i == NUM_PHIL - 1 { - [forks[0].clone(), forks[i].clone()] - } else { - [forks[i].clone(), forks[i + 1].clone()] - }; - - spawn_local(one_phil(forks, i, stats.clone()), c"phil") - }) - .collect(); - - // Wait for them all to finish. - for p in phils { - p.join_async().await; - } - - // Leak the stats as a test. - // Uncomment this to test that the expect below does truly detect a missed drop. - // let _ = Rc::into_raw(stats.clone()); - - // At this point, all of the philosphers should have dropped their stats ref, and we should be - // able to turn stats back into it's value. - // This tests that completed work does drop the future. - Rc::into_inner(stats) - .expect("Failure: a philospher didn't drop it's future") - .into_inner() -} - -/// Simulate a single philospher. -/// -/// The forks must be ordered with the first fork having th lowest number, otherwise this will -/// likely deadlock. -/// -/// This will run for EAT_COUNT times, and then return. -async fn one_phil(forks: [Rc; 2], n: usize, stats: Rc>) { - for i in 0..EAT_COUNT { - // Acquire the forks. - // printkln!("Child {n} take left fork"); - forks[0].take_async(Forever).await.unwrap(); - // printkln!("Child {n} take right fork"); - forks[1].take_async(Forever).await.unwrap(); - - // printkln!("Child {n} eating"); - let delay = get_random_delay(n, 25); - sleep(delay).await; - stats.borrow_mut().record_eat(n, delay); - - // Release the forks. - // printkln!("Child {n} giving up forks"); - forks[1].give(); - forks[0].give(); - - let delay = get_random_delay(n, 25); - sleep(delay).await; - stats.borrow_mut().record_think(n, delay); - - printkln!("Philospher {n} finished eating time {i}"); - } -} diff --git a/zephyr-macros/Cargo.toml b/zephyr-macros/Cargo.toml new file mode 100644 index 00000000..a9c386b0 --- /dev/null +++ b/zephyr-macros/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "zephyr-macros" +version = "0.1.0" +edition = "2024" +license = "MIT OR Apache-2.0" +descriptions = "Macros for managing tasks and work queues in Zephyr" + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "2.0.85", features = ["full", "visit"] } +quote = "1.0.37" +proc-macro2 = "1.0.86" +darling = "0.20.1" diff --git a/zephyr-macros/src/lib.rs b/zephyr-macros/src/lib.rs new file mode 100644 index 00000000..1806821a --- /dev/null +++ b/zephyr-macros/src/lib.rs @@ -0,0 +1,36 @@ +//! Zephyr macros + +use proc_macro::TokenStream; + +mod task; + +/// Declares a Zephyr thread (or pool of threads) that can be spawned. +/// +/// There are some restrictions on this: +/// - All arguments to the function must be Send. +/// - The function must not use generics. +/// - The optional `pool_size` attribute must be 1 or greater. +/// - The `stack_size` must be specified, and will set the size of the pre-defined stack for _each_ +/// task in the pool. +/// +/// ## Examples +/// +/// Declaring a task with a simple argument: +/// +/// ```rust +/// #[zephyr::thread(stack_size = 1024)] +/// fn mytask(arg: u32) { +/// // Function body. +/// } +/// ``` +/// +/// The result will be a function `mytask` that takes this argument, and returns a `ReadyThread`. A +/// simple use case is to call `.start()` on this, to start the Zephyr thread. +/// +/// Threads can be reused after they have exited. Calling the `mytask` function before the thread +/// has exited will result in a panic. The `RunningThread`'s `join` method can be used to wait for +/// thread termination. +#[proc_macro_attribute] +pub fn thread(args: TokenStream, item: TokenStream) -> TokenStream { + task::run(args.into(), item.into()).into() +} diff --git a/zephyr-macros/src/task.rs b/zephyr-macros/src/task.rs new file mode 100644 index 00000000..32f029a7 --- /dev/null +++ b/zephyr-macros/src/task.rs @@ -0,0 +1,274 @@ +//! Expansion of `#[zephyr::task(...)]`. + +use std::{ffi::CString, fmt::Display}; + +use darling::FromMeta; +use darling::export::NestedMeta; +use proc_macro2::{Literal, Span, TokenStream}; +use quote::{ToTokens, format_ident, quote}; +use syn::{ + Expr, ExprLit, ItemFn, Lit, LitInt, ReturnType, Type, + visit::{self, Visit}, +}; + +#[derive(Debug, FromMeta, Default)] +struct Args { + #[darling(default)] + pool_size: Option, + #[darling(default)] + stack_size: Option, +} + +pub fn run(args: TokenStream, item: TokenStream) -> TokenStream { + let mut errors = TokenStream::new(); + + // If any of the steps for this macro fail, we still want to expand to an item that is as close + // to the expected output as possible. This helps out IDEs such that completions and other + // related features keep working. + let f: ItemFn = match syn::parse2(item.clone()) { + Ok(x) => x, + Err(e) => return token_stream_with_error(item, e), + }; + + let args = match NestedMeta::parse_meta_list(args) { + Ok(x) => x, + Err(e) => return token_stream_with_error(item, e), + }; + + let args = match Args::from_list(&args) { + Ok(x) => x, + Err(e) => { + errors.extend(e.write_errors()); + Args::default() + } + }; + + let pool_size = args.pool_size.unwrap_or(Expr::Lit(ExprLit { + attrs: vec![], + lit: Lit::Int(LitInt::new("1", Span::call_site())), + })); + + let stack_size = args.stack_size.unwrap_or(Expr::Lit(ExprLit { + attrs: vec![], + // TODO: Instead of a default, require this. + lit: Lit::Int(LitInt::new("2048", Span::call_site())), + })); + + if !f.sig.asyncness.is_none() { + error(&mut errors, &f.sig, "thread function must not be async"); + } + + if !f.sig.generics.params.is_empty() { + error(&mut errors, &f.sig, "thread function must not be generic"); + } + + if !f.sig.generics.where_clause.is_none() { + error( + &mut errors, + &f.sig, + "thread function must not have `where` clauses", + ); + } + + if !f.sig.abi.is_none() { + error( + &mut errors, + &f.sig, + "thread function must not have an ABI qualifier", + ); + } + + if !f.sig.variadic.is_none() { + error(&mut errors, &f.sig, "thread function must not be variadic"); + } + + match &f.sig.output { + ReturnType::Default => {} + ReturnType::Type(_, ty) => match &**ty { + Type::Tuple(tuple) if tuple.elems.is_empty() => {} + Type::Never(_) => {} + _ => error( + &mut errors, + &f.sig, + "thread functions must either not return a value, return (), or return `!`", + ), + }, + } + + let mut args = Vec::new(); + let mut fargs = f.sig.inputs.clone(); + let mut inner_calling = Vec::new(); + let mut inner_args = Vec::new(); + + for arg in fargs.iter_mut() { + match arg { + syn::FnArg::Receiver(_) => { + error( + &mut errors, + arg, + "thread functions must not have `self` arguments", + ); + } + syn::FnArg::Typed(t) => { + check_arg_ty(&mut errors, &t.ty); + match t.pat.as_mut() { + syn::Pat::Ident(id) => { + id.mutability = None; + args.push((id.clone(), t.attrs.clone())); + inner_calling.push(quote! { + data.#id, + }); + inner_args.push(quote! {#id,}); + } + _ => { + error( + &mut errors, + arg, + "pattern matching in task arguments is not yet supported", + ); + } + } + } + } + } + + let thread_ident = f.sig.ident.clone(); + let thread_inner_ident = format_ident!("__{}_thread", thread_ident); + + let mut thread_inner = f.clone(); + let visibility = thread_inner.vis.clone(); + thread_inner.vis = syn::Visibility::Inherited; + thread_inner.sig.ident = thread_inner_ident.clone(); + + // Assemble the original input arguments. + let mut full_args = Vec::new(); + for (arg, cfgs) in &args { + full_args.push(quote! { + #(#cfgs)* + #arg + }); + } + + let thread_name = Literal::c_string(&CString::new(thread_ident.to_string()).unwrap()); + + let mut thread_outer_body = quote! { + const _ZEPHYR_INTERNAL_STACK_SIZE: usize = zephyr::thread::stack_len(#stack_size); + const _ZEPHYR_INTERNAL_POOL_SIZE: usize = #pool_size; + struct _ZephyrInternalArgs { + // This depends on the argument syntax being valid as a struct definition, which should + // be the case with the above constraints. + #fargs + } + + static THREAD: [zephyr::thread::ThreadData<_ZephyrInternalArgs>; _ZEPHYR_INTERNAL_POOL_SIZE] + = [const { zephyr::thread::ThreadData::new() }; _ZEPHYR_INTERNAL_POOL_SIZE]; + #[unsafe(link_section = ".noinit.TODO_STACK")] + static STACK: [zephyr::thread::ThreadStack<_ZEPHYR_INTERNAL_STACK_SIZE>; _ZEPHYR_INTERNAL_POOL_SIZE] + = [const { zephyr::thread::ThreadStack::new() }; _ZEPHYR_INTERNAL_POOL_SIZE]; + + extern "C" fn startup( + arg0: *mut ::core::ffi::c_void, + _: *mut ::core::ffi::c_void, + _: *mut ::core::ffi::c_void, + ) { + let init = unsafe { &mut *(arg0 as *mut ::zephyr::thread::InitData<_ZephyrInternalArgs>) }; + let init = init.0.get(); + match unsafe { init.replace(None) } { + None => { + ::core::panic!("Incorrect thread initialization"); + } + Some(data) => { + #thread_inner_ident(#(#inner_calling)*); + } + } + } + + zephyr::thread::ThreadData::acquire( + &THREAD, + &STACK, + _ZephyrInternalArgs { #(#inner_args)* }, + Some(startup), + 0, + #thread_name, + ) + }; + + let thread_outer_attrs = thread_inner.attrs.clone(); + + if !errors.is_empty() { + thread_outer_body = quote! { + #[allow(unused_variables, unreachable_code)] + let _x: ::zephyr::thread::ReadyThread = ::core::todo!(); + _x + }; + } + + // Copy the generics + where clause to avoid more spurious errors. + let generics = &f.sig.generics; + let where_clause = &f.sig.generics.where_clause; + + quote! { + // This is the user's thread function, renamed. + #[doc(hidden)] + #thread_inner + + #(#thread_outer_attrs)* + #visibility fn #thread_ident #generics (#fargs) -> ::zephyr::thread::ReadyThread #where_clause { + #thread_outer_body + } + + #errors + } +} + +// Taken from embassy-executor-macros. +fn check_arg_ty(errors: &mut TokenStream, ty: &Type) { + struct Visitor<'a> { + errors: &'a mut TokenStream, + } + + impl<'a, 'ast> Visit<'ast> for Visitor<'a> { + fn visit_type_reference(&mut self, i: &'ast syn::TypeReference) { + // only check for elided lifetime here. If not elided, it is checked by + // `visit_lifetime`. + if i.lifetime.is_none() { + error( + self.errors, + i.and_token, + "Arguments for threads must live forever. Try using the `'static` lifetime.", + ); + } + visit::visit_type_reference(self, i); + } + + fn visit_lifetime(&mut self, i: &'ast syn::Lifetime) { + if i.ident.to_string() != "static" { + error( + self.errors, + i, + "Arguments for threads must live forever. Try using the `'static` lifetime.", + ); + } + } + + fn visit_type_impl_trait(&mut self, i: &'ast syn::TypeImplTrait) { + error( + self.errors, + i, + "`impl Trait` is not allowed in thread arguments. It is syntax sugar for generics, and threads cannot be generic.", + ); + } + } + + Visit::visit_type(&mut Visitor { errors }, ty); +} + +// Utility borrowed from embassy-executor-macros. +pub fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenStream { + tokens.extend(error.into_compile_error()); + tokens +} + +pub fn error(s: &mut TokenStream, obj: A, msg: T) { + s.extend(syn::Error::new_spanned(obj.into_token_stream(), msg).into_compile_error()) +} diff --git a/zephyr/Cargo.toml b/zephyr/Cargo.toml index bb70549c..b5551db1 100644 --- a/zephyr/Cargo.toml +++ b/zephyr/Cargo.toml @@ -11,6 +11,7 @@ Functionality for Rust-based applications that run on Zephyr. [dependencies] zephyr-sys = { version = "0.1.0", path = "../zephyr-sys" } +zephyr-macros = { version = "0.1.0", path = "../zephyr-macros" } # Although paste is brought in, it is a compile-time macro, and is not linked into the application. paste = "1.0" diff --git a/zephyr/src/kio.rs b/zephyr/src/kio.rs deleted file mode 100644 index 8e839e4c..00000000 --- a/zephyr/src/kio.rs +++ /dev/null @@ -1,183 +0,0 @@ -//! Async IO for Zephyr -//! -//! This implements the basics of using Zephyr's work queues to implement async code on Zephyr. -//! -//! Most of the work happens in [`work`] and in [`futures`] -//! -//! [`work`]: crate::work -//! [`futures`]: crate::work::futures - -use core::ffi::CStr; -use core::task::{Context, Poll}; -use core::{future::Future, pin::Pin}; - -use crate::sys::queue::Queue; -use crate::sys::sync::Semaphore; -use crate::time::{NoWait, Timeout}; -use crate::work::futures::WakeInfo; -use crate::work::Signal; -use crate::work::{futures::JoinHandle, futures::WorkBuilder, WorkQueue}; - -pub mod sync; - -pub use crate::work::futures::sleep; - -/// Run an async future on the given worker thread. -/// -/// Arrange to have the given future run on the given worker thread. The resulting `JoinHandle` has -/// `join` and `join_async` methods that can be used to wait for the given thread. -pub fn spawn(future: F, worker: &WorkQueue, name: &'static CStr) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - WorkBuilder::new() - .set_worker(worker) - .set_name(name) - .start(future) -} - -/// Run an async future on the current worker thread. -/// -/// Arrange to have the given future run on the current worker thread. The resulting `JoinHandle` -/// has `join` and `join_async` methods that can be used to wait for the given thread. -/// -/// By constraining the spawn to the current worker, this function is able to remove the Send -/// constraint from the future (and its return type), allowing tasks to share data using -/// lighter-weight mechanimsms, such as `Rc` and `Rc>`, or `&'static RefCell`. -/// -/// To be able to use tasks running on different workers, sharing must be done with types such as -/// `Arc`, and `Arc>`, or `&'static Mutex`. -/// -/// It is important, when using RefCell, that a borrow from the cell not be carried across an await -/// boundary, or RefCell's runtime multi-borrow check can cause a panic. -/// -/// # Panics -/// If this is called other than from a worker task running on a work thread, it will panic. -pub fn spawn_local(future: F, name: &'static CStr) -> JoinHandle -where - F: Future + 'static, - F::Output: Send + 'static, -{ - WorkBuilder::new().set_name(name).start_local(future) -} - -/// Yield the current thread, returning it to the work queue to be run after other work on that -/// queue. (This has to be called `yield_now` in Rust, because `yield` is a keyword.) -pub fn yield_now() -> impl Future { - YieldNow { waited: false } -} - -struct YieldNow { - waited: bool, -} - -impl Future for YieldNow { - type Output = (); - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll { - if self.waited { - Poll::Ready(()) - } else { - // Enqueue outselves with no wait and no events. - let info = unsafe { WakeInfo::from_context(cx) }; - - // Unsafely check if the work queue running us is empty. We only check explicitly - // specified workers (TODO access the system work queue). The check is racy, but should - // always fail indicating that the queue is not empty when it could be. Checking this - // avoids re-scheduling the only worker back into the queue. - // SAFETY: The check is racy, but will fail with us yielding when we didn't need to. - if let Some(wq) = info.queue { - let wq = unsafe { wq.as_ref() }; - if wq.pending.head == wq.pending.tail { - return Poll::Ready(()); - } - } - - info.timeout = NoWait.into(); - self.waited = true; - - Poll::Pending - } - } -} - -/// Extensions on [`Context`] to support scheduling via Zephyr's workqueue system. -/// -/// All of these are called from within the context of running work, and indicate what _next_ -/// should cause this work to be run again. If none of these methods are called before the work -/// exits, the work will be scheduled to run after `Forever`, which is not useful. There may be -/// later support for having a `Waker` that can schedule work from another context. -/// -/// Note that the events to wait on, such as Semaphores or channels, if there are multiple threads -/// that can wait for them, might cause this worker to run, but not actually be available. As such, -/// to maintain the non-blocking requirements of Work, [`Semaphore::take`], and the blocking `send` -/// and `recv` operations on channels should not be used, even after being woken. -/// -/// For the timeout [`Forever`] is useful to indicate there is no timeout. If called with -/// [`NoWait`], the work will be immediately scheduled. In general, it is better to query the -/// underlying object directly rather than have the overhead of being rescheduled. -/// -/// # Safety -/// -/// The lifetime bounds on the items waited for ensure that these items live at least as long as the -/// work queue. Practically, this can only be satisfied by using something with 'static' lifetime, -/// or embedding the value in the Future itself. -/// -/// With the Zephyr executor, the `Context` is embedded within a `WakeInfo` struct, which this makes -/// use of. If a different executor were to be used, these calls would result in undefined -/// behavior. -/// -/// This could be checked at runtime, but it would have runtime cost. -/// -/// [`Forever`]: crate::time::Forever -pub trait ContextExt { - /// Indicate the work should next be scheduled based on a semaphore being available for "take". - /// - /// The work will be scheduled either when the given semaphore becomes available to 'take', or - /// after the timeout. - fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into); - - /// Indicate that the work should be scheduled after receiving the given [`Signal`], or the - /// timeout occurs. - fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into); - - /// Indicate that the work should be scheduled when the given [`Queue`] has data available to - /// recv, or the timeout occurs. - fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into); - - /// Indicate that the work should just be scheduled after the given timeout. - /// - /// Note that this only works if none of the other wake methods are called, as those also set - /// the timeout. - fn add_timeout(&mut self, timeout: impl Into); -} - -/// Implementation of ContextExt for the Rust [`Context`] type. -impl<'b> ContextExt for Context<'b> { - fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into) { - let info = unsafe { WakeInfo::from_context(self) }; - info.add_semaphore(sem); - info.timeout = timeout.into(); - } - - fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into) { - let info = unsafe { WakeInfo::from_context(self) }; - info.add_signal(signal); - info.timeout = timeout.into(); - } - - fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into) { - let info = unsafe { WakeInfo::from_context(self) }; - info.add_queue(queue); - info.timeout = timeout.into(); - } - - fn add_timeout(&mut self, timeout: impl Into) { - let info = unsafe { WakeInfo::from_context(self) }; - info.timeout = timeout.into(); - } -} diff --git a/zephyr/src/kio/sync.rs b/zephyr/src/kio/sync.rs deleted file mode 100644 index 769a05ca..00000000 --- a/zephyr/src/kio/sync.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! Synchronization mechanisms that work with async. -//! -//! Notably, Zephyr's `k_mutex` type isn't supported as a type that can be waited for -//! asynchronously. -//! -//! The main problem with `k_mutex` (meaning [`crate::sync::Mutex`]) is that the `lock` operation -//! can block, and since multiple tasks may be scheduled for the same work queue, the system can -//! deadlock, as the scheduler may not run to allow the task that actually holds the mutex to run. -//! -//! As an initial stopgap. We provide a [`Mutex`] type that is usable within an async context. We -//! do not currently implement an associated `Condvar`. -//! -//! Note that using Semaphores for locking means that this mechanism doesn't handle priority -//! inversion issues. Be careful with workers that run at different priorities. - -// Use the same error types from the regular sync version. - -use core::{ - cell::UnsafeCell, - fmt, - marker::PhantomData, - ops::{Deref, DerefMut}, -}; - -use crate::{ - sync::{LockResult, TryLockError, TryLockResult}, - sys::sync::Semaphore, - time::{Forever, NoWait}, -}; - -/// A mutual exclusion primitive useful for protecting shared data. Async version. -/// -/// This mutex will block a task waiting for the lock to become available. -pub struct Mutex { - /// The semaphore indicating ownership of the data. When it is "0" the task that did the 'take' - /// on it owns the data, and will use `give` when it is unlocked. This mechanism works for - /// simple Mutex that protects the data without needing a condition variable. - inner: Semaphore, - data: UnsafeCell, -} - -// SAFETY: The semaphore, with the semantics provided here, provide Send and Sync. -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -impl fmt::Debug for Mutex { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Mutex {:?}", self.inner) - } -} - -/// An RAII implementation of a held lock. -pub struct MutexGuard<'a, T: ?Sized + 'a> { - lock: &'a Mutex, - // Mark !Send explicitly until support is added to Rust for this. - _nosend: PhantomData>, -} - -unsafe impl Sync for MutexGuard<'_, T> {} - -impl Mutex { - /// Construct a new Mutex. - pub fn new(t: T) -> Mutex { - Mutex { - inner: Semaphore::new(1, 1), - data: UnsafeCell::new(t), - } - } -} - -impl Mutex { - /// Acquire the mutex, blocking the current thread until it is able to do so. - /// - /// This is a sync version, and calling it from an async task will possibly block the async work - /// thread, potentially causing deadlock. - pub fn lock(&self) -> LockResult> { - self.inner.take(Forever).unwrap(); - unsafe { Ok(MutexGuard::new(self)) } - } - - /// Aquire the mutex, async version. - pub async fn lock_async(&self) -> LockResult> { - self.inner.take_async(Forever).await.unwrap(); - unsafe { Ok(MutexGuard::new(self)) } - } - - /// Attempt to aquire the lock. - pub fn try_lock(&self) -> TryLockResult> { - match self.inner.take(NoWait) { - Ok(()) => unsafe { Ok(MutexGuard::new(self)) }, - // TODO: Distinguish timeout from other errors. - Err(_) => Err(TryLockError::WouldBlock), - } - } -} - -impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> { - unsafe fn new(lock: &'mutex Mutex) -> MutexGuard<'mutex, T> { - MutexGuard { - lock, - _nosend: PhantomData, - } - } -} - -impl Deref for MutexGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.lock.data.get() } - } -} - -impl DerefMut for MutexGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.lock.data.get() } - } -} - -impl Drop for MutexGuard<'_, T> { - #[inline] - fn drop(&mut self) { - self.lock.inner.give(); - } -} diff --git a/zephyr/src/lib.rs b/zephyr/src/lib.rs index 7ccb7675..4eb4c250 100644 --- a/zephyr/src/lib.rs +++ b/zephyr/src/lib.rs @@ -80,14 +80,13 @@ pub mod align; pub mod device; pub mod embassy; pub mod error; -#[cfg(CONFIG_RUST_ALLOC)] -pub mod kio; pub mod logging; pub mod object; #[cfg(CONFIG_RUST_ALLOC)] pub mod simpletls; pub mod sync; pub mod sys; +pub mod thread; pub mod time; #[cfg(CONFIG_RUST_ALLOC)] pub mod timer; @@ -101,6 +100,9 @@ pub use logging::set_logger; /// Re-exported for local macro use. pub use paste::paste; +/// Re-export the proc macros. +pub use zephyr_macros::thread; + // Bring in the generated kconfig module pub mod kconfig { //! Zephyr Kconfig values. diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index 6e2e5882..8b8820a1 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -26,9 +26,42 @@ pub mod atomic { } #[cfg(CONFIG_RUST_ALLOC)] -pub use portable_atomic_util::Arc; +mod pinweak { + use core::pin::Pin; + pub use portable_atomic_util::Arc; + pub use portable_atomic_util::Weak; + + /// Safe Pinned Weak references. + /// + /// Pin> can't be converted to/from Weak safely, because there is know way to know if a given + /// weak reference came from a pinned Arc. This wraps the weak reference in a new type so we know + /// that it came from a pinned Arc. + /// + /// There is a pin-weak crate that provides this for `std::sync::Arc`, but not for the one in the + /// portable-atomic-utils crate. + pub struct PinWeak(Weak); + + impl PinWeak { + /// Downgrade an `Pin>` into a `PinWeak`. + /// + /// This would be easier to use if it could be added to Arc. + pub fn downgrade(this: Pin>) -> Self { + // SAFETY: we will never return anything other than a Pin>. + Self(Arc::downgrade(&unsafe { Pin::into_inner_unchecked(this) })) + } + + /// Upgrade back to a `Pin>`. + pub fn upgrade(&self) -> Option>> { + // SAFETY: The weak was only constructed from a `Pin>`. + self.0 + .upgrade() + .map(|arc| unsafe { Pin::new_unchecked(arc) }) + } + } +} + #[cfg(CONFIG_RUST_ALLOC)] -pub use portable_atomic_util::Weak; +pub use pinweak::*; mod mutex; diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index f76d6a4b..872495fe 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -45,15 +45,12 @@ use alloc::boxed::Box; use core::cell::UnsafeCell; use core::ffi::c_void; use core::fmt; -use core::future::Future; use core::marker::PhantomData; use core::mem::MaybeUninit; use core::pin::Pin; -use core::task::Poll; -use crate::kio::ContextExt; use crate::sys::queue::Queue; -use crate::time::{Duration, Forever, NoWait, Timeout}; +use crate::time::{Forever, NoWait, Timeout}; mod counter; @@ -206,90 +203,6 @@ impl Sender { } } -// A little note about the Unpin constraint here. Because Futures are pinned in Rust Async code, -// and the future stores the messages, we can only send and receive messages that aren't pinned. -impl Sender { - /// Waits for a message to be sent into the channel, but only for a limited time. Async - /// version. - /// - /// This has the same behavior as [`send_timeout`], but as an Async function. - /// - /// [`send_timeout`]: Sender::send_timeout - pub fn send_timeout_async<'a>( - &'a self, - msg: T, - timeout: impl Into, - ) -> impl Future>> + 'a { - SendFuture { - sender: self, - msg: Some(msg), - timeout: timeout.into(), - waited: false, - } - } - - /// Sends a message over the given channel, waiting if necessary. Async version. - pub async fn send_async(&self, msg: T) -> Result<(), SendError> { - self.send_timeout_async(msg, Forever).await - } - - // Note that there is no async version of `try_send`. -} - -/// The implementation of Future for Sender::send_timeout_async. -struct SendFuture<'a, T: Unpin> { - sender: &'a Sender, - msg: Option, - timeout: Timeout, - waited: bool, -} - -impl<'a, T: Unpin> Future for SendFuture<'a, T> { - type Output = Result<(), SendError>; - - fn poll( - self: Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll { - /* - let this = unsafe { - Pin::get_unchecked_mut(self) - }; - */ - let this = Pin::get_mut(self); - - // Take the message out in preparation to try sending it. It is a logic error if the unwrap - // fails. - let msg = this.msg.take().unwrap(); - - // Try sending the message, with no timeout. - let msg = match this.sender.try_send(msg) { - Ok(()) => return Poll::Ready(Ok(())), - Err(SendError(msg)) => msg, - }; - - if this.waited { - // We already waited, and no message, so give the messagre back, indiciating a timeout. - return Poll::Ready(Err(SendError(msg))); - } - - // Send didn't happen, put the message back to have for the next call. - this.msg = Some(msg); - - // Otherwise, schedule to wake up on receipt or timeout. - match &this.sender.flavor { - SenderFlavor::Unbounded { .. } => { - panic!("Implementation error: unbounded queues should never fail"); - } - SenderFlavor::Bounded(chan) => { - cx.add_queue(&chan.free, this.timeout); - } - } - - Poll::Pending - } -} - impl Drop for Sender { fn drop(&mut self) { match &self.flavor { @@ -423,43 +336,6 @@ impl Receiver { } } -// Note that receive doesn't need the Unpin constraint, as we aren't storing any message. -impl Receiver { - /// Waits for a message to be received from the channel, but only for a limited time. - /// Async version. - /// - /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed or the operation times out. - /// wake up and return an error. - pub fn recv_timeout_async<'a>( - &'a self, - timeout: impl Into, - ) -> impl Future> + 'a { - RecvFuture { - receiver: self, - timeout: timeout.into(), - waited: false, - } - } - - /// Blocks the current thread until a message is received or the channel is empty and - /// disconnected. Async version. - /// - /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed. - pub async fn recv_async(&self) -> Result { - self.recv_timeout_async(Forever).await - } - - /// Return a reference to the inner queue. - fn as_queue(&self) -> &Queue { - match &self.flavor { - ReceiverFlavor::Unbounded { queue, .. } => queue, - ReceiverFlavor::Bounded(chan) => &chan.chan, - } - } -} - impl Drop for Receiver { fn drop(&mut self) { match &self.flavor { @@ -505,34 +381,6 @@ impl fmt::Debug for Receiver { } } -struct RecvFuture<'a, T> { - receiver: &'a Receiver, - timeout: Timeout, - waited: bool, -} - -impl<'a, T> Future for RecvFuture<'a, T> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { - // Try to receive a message. - if let Ok(msg) = self.receiver.try_recv() { - return Poll::Ready(Ok(msg)); - } - - if self.waited { - // Wait already happened, so this is a timeout. - return Poll::Ready(Err(RecvError)); - } - - // Otherwise, schedule to wakeup on receipt or timeout. - cx.add_queue(self.receiver.as_queue(), self.timeout); - self.waited = true; - - Poll::Pending - } -} - /// The "flavor" of a receiver. This maps to the type of the channel. enum ReceiverFlavor { /// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt. @@ -629,92 +477,3 @@ impl fmt::Debug for SendError { /// [`recv`]: Receiver::recv #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub struct RecvError; - -/// Wait loop -/// -/// A common scenario for async work tasks is to wait for, and process messages off of a queue, but -/// to also wake periodically to perform some task. -/// -/// This performs this periodic loop. It has some support for handling the case where the -/// processing takes longer than the loop duration, but it merely re-schedules for the period past -/// the current time. This means the phase of the period will change upon dropped ticks. -/// -/// Each time an event is received, 'handle' is called with `Some(ev)`. In addition, periodically -/// (based on `period`) `handle` will be called with None. -/// -/// **Note**: It needs to be a single handler, because this closure will frequently be in a move -/// closure, and this would force shared data to be shared in Sync types of wrappers. The main -/// purpose of combining the event handling and the periodic is to avoid that. -/// -/// Note that also, if the timer is just barely able to run, it will still be scheduled "shortly" in -/// the future. -/// -/// T is the type of the messages expected to be received. -/// -/// TODO: This function, in general, is completely worthless without Rust support for [async -/// closures](https://rust-lang.github.io/rfcs/3668-async-closures.html). -pub async fn event_loop_useless( - events: Receiver, - period: Duration, - mut handle: EF, -) -> ! -where - EF: FnMut(Option) -> EFF, - EFF: Future, -{ - // Start with a deadline 'period' out in the future. - let mut next = crate::time::now() + period; - loop { - if let Ok(ev) = events.recv_timeout_async(next).await { - handle(Some(ev)).await; - continue; - } - - // We either reached, or exceeded our timeout. - handle(None).await; - - // Calculate the next time. - next += period; - - // If this is passed, just reschedule after our Duration from "now". - let now = crate::time::now(); - if next <= now { - next = now + period; - } - } -} - -/// Wait loop, as a macro. -/// -/// This is the `event loop` above, implemented as a macro, which becomes more useful as the async -/// closures aren't needed. -#[macro_export] -macro_rules! event_loop { - ($events:expr, $period:expr, - Some($eventvar:ident) => $event_body:block, - None => $periodic_body: block $(,)?) => - { - let events = $events; - let period = $period; - let mut next = $crate::time::now() + period; - loop { - if let Ok($eventvar) = events.recv_timeout_async(next).await { - $event_body - } else { - // Note that ':block' above requires the braces, so this body can't introduce - // bindings that shadow our local variables. - $periodic_body - next += period; - - // If this is passed, just reschedule after our Duration from "now". - let now = $crate::time::now(); - if next <= now { - ::log::warn!("periodic overflow: {} ticks, {}:{}", - (now - next).ticks(), - core::file!(), core::line!()); - next = now + period; - } - } - } - }; -} diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs index 4e1bcb24..a5e69717 100644 --- a/zephyr/src/sys/sync/semaphore.rs +++ b/zephyr/src/sys/sync/semaphore.rs @@ -13,21 +13,8 @@ use core::ffi::c_uint; use core::fmt; -#[cfg(CONFIG_RUST_ALLOC)] -use core::future::Future; -#[cfg(CONFIG_RUST_ALLOC)] -use core::pin::Pin; -#[cfg(CONFIG_RUST_ALLOC)] -use core::task::{Context, Poll}; - -#[cfg(CONFIG_RUST_ALLOC)] -use zephyr_sys::ETIMEDOUT; - -#[cfg(CONFIG_RUST_ALLOC)] -use crate::kio::ContextExt; + use crate::object::{ObjectInit, ZephyrObject}; -#[cfg(CONFIG_RUST_ALLOC)] -use crate::time::NoWait; use crate::{ error::{to_result_void, Result}, raw::{k_sem, k_sem_count_get, k_sem_give, k_sem_init, k_sem_reset, k_sem_take}, @@ -53,7 +40,6 @@ impl Semaphore { /// /// Note that this API has changed, and it now doesn't return a Result, since the Result time /// generally doesn't work (in stable rust) with const. - #[cfg(CONFIG_RUST_ALLOC)] pub const fn new(initial_count: c_uint, limit: c_uint) -> Semaphore { // Due to delayed init, we need to replicate the object checks in the C `k_sem_init`. @@ -87,21 +73,6 @@ impl Semaphore { to_result_void(ret) } - /// Take a semaphore, async version. - /// - /// Returns a future that either waits for the semaphore, or returns status. - #[cfg(CONFIG_RUST_ALLOC)] - pub fn take_async<'a>( - &'a self, - timeout: impl Into, - ) -> impl Future> + 'a { - SemTake { - sem: self, - timeout: timeout.into(), - ran: false, - } - } - /// Give a semaphore. /// /// This routine gives to the semaphore, unless the semaphore is already at its maximum @@ -146,40 +117,6 @@ impl ObjectInit for ZephyrObject { } } -/// The async 'take' Future -#[cfg(CONFIG_RUST_ALLOC)] -struct SemTake<'a> { - /// The semaphore we're waiting on. - sem: &'a Semaphore, - /// The timeout to use. - timeout: Timeout, - /// Set after we've waited once. - ran: bool, -} - -#[cfg(CONFIG_RUST_ALLOC)] -impl<'a> Future for SemTake<'a> { - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Always check if data is available. - if let Ok(()) = self.sem.take(NoWait) { - return Poll::Ready(Ok(())); - } - - if self.ran { - // If we ran once, and still don't have any data, indicate this as a timeout. - return Poll::Ready(Err(crate::Error(ETIMEDOUT))); - } - - // TODO: Clean this up. - cx.add_semaphore(self.sem, self.timeout); - self.ran = true; - - Poll::Pending - } -} - impl fmt::Debug for Semaphore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "sys::Semaphore") diff --git a/zephyr/src/thread.rs b/zephyr/src/thread.rs new file mode 100644 index 00000000..8af186d0 --- /dev/null +++ b/zephyr/src/thread.rs @@ -0,0 +1,333 @@ +//! Thread support. +//! +//! Implement the friendly Thread types used by the `zephyr::thread` proc macro to declare new +//! threads. +//! +//! This is intended to be completely usable without alloc, while still allow threads to be +//! started with any arbitrary Send arguments. Threads can be joined, and reused after they have +//! exited. The model intentionally tries to be similar to how async tasks work in something like +//! Embassy, but some changes due to the different semantics of Zephyr threads. + +use core::{ + cell::UnsafeCell, + ffi::{c_int, c_void, CStr}, + mem, + ptr::null_mut, + sync::atomic::Ordering, +}; + +use portable_atomic::AtomicU8; +use zephyr_sys::{ + k_thread, k_thread_create, k_thread_entry_t, k_thread_join, k_thread_name_set, + k_thread_priority_set, k_wakeup, z_thread_stack_element, ZR_STACK_ALIGN, ZR_STACK_RESERVED, +}; + +use crate::{ + align::AlignAs, + sys::{K_FOREVER, K_NO_WAIT}, +}; + +/// Adjust a given requested stack size up for the alignment. This is just the stack, and the +/// reservation is explicitly included in the stack declaration below. +pub const fn stack_len(size: usize) -> usize { + size.next_multiple_of(ZR_STACK_ALIGN) +} + +/// States a Zephyr thread can be in. +#[repr(u8)] +pub enum ThreadState { + /// A non running thread, that is free. + Init, + /// An allocated thread. There is a ThreadHandle for this thread, but it has not been started. + Allocated, + /// A thread that is running, as far as we know. Termination is not checked unless demanded. + Running, +} + +/// The holder of data that is to be shared with the target thread. +/// +/// # Safety +/// +/// The Option is kept in an UnsafeCell, and it's use governed by an atomic in the `TaskData` +/// below. When the task is not initialized/not running, this should be set to None. It will be +/// set to Some in a critical section during startup, where the critical section provides the +/// barrier. Once the atomic is set to true, the thread owns this data. +/// +/// The Send constraint force arguments passed to threads to be Send. +pub struct InitData(pub UnsafeCell>); + +impl InitData { + /// Construct new Shared init state. + pub const fn new() -> Self { + Self(UnsafeCell::new(None)) + } +} + +unsafe impl Sync for InitData {} + +/// The static data associated with each thread. The stack is kept separate, as it is intended to +/// go into an uninitialized linker section. +pub struct ThreadData { + init: InitData, + state: AtomicU8, + thread: Thread, +} + +impl ThreadData { + /// Construct new ThreadData. + pub const fn new() -> Self { + Self { + init: InitData::new(), + state: AtomicU8::new(ThreadState::Init as u8), + thread: unsafe { Thread::new() }, + } + } + + /// Acquire the thread, in preparation to run it. + pub fn acquire_old( + &'static self, + args: T, + stack: &'static ThreadStack, + entry: k_thread_entry_t, + ) { + critical_section::with(|_| { + // Relaxed is sufficient, as the critical section provides both synchronization and + // a memory barrier. + let old = self.state.load(Ordering::Relaxed); + if old != ThreadState::Init as u8 { + // TODO: This is where we should check for termination. + panic!("Attempt to use a thread that is already in use"); + } + self.state + .store(ThreadState::Allocated as u8, Ordering::Relaxed); + + let init = self.init.0.get(); + unsafe { + *init = Some(args); + } + }); + + // For now, just directly start the thread. We'll want to delay this so that parameters + // (priority and/or flags) can be passed, as well as to have a handle to be able to join and + // restart threads. + let _tid = unsafe { + k_thread_create( + self.thread.0.get(), + stack.data.get() as *mut z_thread_stack_element, + stack.size(), + entry, + self.init.0.get() as *mut c_void, + null_mut(), + null_mut(), + 0, + 0, + K_NO_WAIT, + ) + }; + } + + /// Acquire a thread from the pool of threads, panicing if the pool is exhausted. + pub fn acquire( + pool: &'static [Self], + stacks: &'static [ThreadStack], + args: T, + entry: k_thread_entry_t, + priority: c_int, + name: &'static CStr, + ) -> ReadyThread { + let id = Self::find_thread(pool); + + let init = pool[id].init.0.get(); + unsafe { + *init = Some(args); + } + + // Create the thread in Zephyr, in a non-running state. + let tid = unsafe { + k_thread_create( + pool[id].thread.0.get(), + stacks[id].data.get() as *mut z_thread_stack_element, + SIZE, + entry, + pool[id].init.0.get() as *mut c_void, + null_mut(), + null_mut(), + priority, + 0, + K_FOREVER, + ) + }; + + // Set the name. + unsafe { + k_thread_name_set(tid, name.as_ptr()); + } + + ReadyThread { id: tid } + } + + /// Scan the pool of threads, looking for an available thread. + /// + /// Returns the index of a newly allocated thread. The thread will be marked 'Allocated' after + /// this. + fn find_thread(pool: &'static [Self]) -> usize { + let id = critical_section::with(|_| { + for (id, thread) in pool.iter().enumerate() { + // Relaxed is sufficient, due to the critical section. + let old = thread.state.load(Ordering::Relaxed); + + match old { + v if v == ThreadState::Init as u8 => { + // This is available. Mark as allocated and return from the closure. + thread + .state + .store(ThreadState::Allocated as u8, Ordering::Relaxed); + return Some(id); + } + v if v == ThreadState::Allocated as u8 => { + // Allocate threads haven't started, so aren't available. + } + v if v == ThreadState::Running as u8 => { + // A running thread might be available if it has terminated. We could + // improve performance here by not checking these until after the pool has + // been checked for Init threads. + if unsafe { k_thread_join(thread.thread.0.get(), K_NO_WAIT) } == 0 { + thread + .state + .store(ThreadState::Allocated as u8, Ordering::Relaxed); + return Some(id); + } + } + _ => unreachable!(), + } + } + + None + }); + + if let Some(id) = id { + id + } else { + panic!("Attempt to use more threads than declared pool size"); + } + } +} + +/// A thread that has been set up and is ready to start. +/// +/// Represents a thread that has been created, but not yet started. +pub struct ReadyThread { + id: *mut k_thread, +} + +impl ReadyThread { + /// Change the priority of this thread before starting it. The initial default priority was + /// determined by the declaration of the thread. + pub fn set_priority(&self, priority: c_int) { + // SAFETY: ReadyThread should only exist for valid created threads. + unsafe { + k_thread_priority_set(self.id, priority); + } + } + + /// Start this thread. + pub fn start(self) -> RunningThread { + // SAFETY: ReadyThread should only exist for valid created threads. + unsafe { + // As per the docs, this should no longer be `k_thread_start`, but `k_wakeup` is fine + // these days. + k_wakeup(self.id); + } + + RunningThread { id: self.id } + } +} + +/// A thread that has been started. +pub struct RunningThread { + id: *mut k_thread, +} + +impl RunningThread { + /// Wait for this thread to finish executing. + /// + /// Will block until the thread has terminated. + /// + /// TODO: Allow a timeout? + /// TODO: Should we try to return a value? + pub fn join(&self) { + unsafe { + // TODO: Can we do something meaningful with the result? + k_thread_join(self.id, K_FOREVER); + + // TODO: Ideally, we could put the thread state back to avoid the need for another join + // check when re-allocating the thread. + } + } +} + +/// A Zephyr stack declaration. +/// +/// This isn't intended to be used directly, as it needs additional decoration about linker sections +/// and such. Unlike the C declaration, the reservation is a separate field. As long as the SIZE +/// is properly aligned, this should work without padding between the fields. +/// +/// Generally, this should be placed in a noinit linker section to avoid having to initialize the +/// memory. +#[repr(C)] +pub struct ThreadStack { + /// Align the stack itself according to the Kconfig determined alignment. + #[allow(dead_code)] + align: AlignAs, + /// The data of the stack itself. + #[allow(dead_code)] + pub data: UnsafeCell<[z_thread_stack_element; SIZE]>, + /// Extra data, used by Zephyr. + #[allow(dead_code)] + extra: [z_thread_stack_element; ZR_STACK_RESERVED], +} + +unsafe impl Sync for ThreadStack {} + +impl ThreadStack { + /// Construct a new ThreadStack + /// + /// # Safety + /// + /// This is unsafe as the memory remains uninitialized, and it is the responsibility of the + /// caller to use the stack correctly. The stack should be associated with a single thread. + pub const fn new() -> Self { + // SAFETY: Although this is declared as zeroed, the linker section actually used on the + // stack can be used to place it in no-init memory. + unsafe { mem::zeroed() } + } + + /// Retrieve the size of this stack. + pub const fn size(&self) -> usize { + SIZE + } +} + +/// A zephyr thread. +/// +/// This declares a single k_thread in Zephyr. +pub struct Thread(pub UnsafeCell); + +// Threads are "sort of" thread safe. But, this declaration is needed to be able to declare these +// statically, and all generated versions will protect the thread with a critical section. +unsafe impl Sync for Thread {} + +impl Thread { + /// Static allocation of a thread + /// + /// This makes the zero-initialized memory that can later be used as a thread. + /// + /// # Safety + /// + /// The caller is responsible for using operations such as `create` to construct the thread, + /// according to the underlying semantics of the Zephyr operations. + pub const unsafe fn new() -> Self { + // SAFETY: Zero initialized to match thread declarations in the C macros. + unsafe { mem::zeroed() } + } +} diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index f4d820d7..2da6786e 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -180,31 +180,25 @@ extern crate alloc; use core::{ cell::UnsafeCell, ffi::{c_int, c_uint, CStr}, - future::Future, mem, pin::Pin, ptr, - task::Poll, }; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, - k_work_queue_start, k_work_submit, k_work_submit_to_queue, ETIMEDOUT, + k_work_queue_start, k_work_submit, k_work_submit_to_queue, }; use crate::{ error::to_result_void, - kio::ContextExt, object::Fixed, simpletls::SimpleTls, sync::{Arc, Mutex}, sys::thread::ThreadStack, - time::Timeout, }; -pub mod futures; - /// A builder for work queues themselves. /// /// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work @@ -454,24 +448,6 @@ impl Signal { pub fn raise(&self, result: c_int) -> crate::Result<()> { to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) }) } - - /// Asynchronously wait for a signal to be signaled. - /// - /// If the signal has not been raised, will wait until it has been. If the signal has been - /// raised, the Future will immediately return that value without waiting. - /// - /// **Note**: there is no sync wait, as Zephyr does not provide a convenient mechanmism for - /// this. It could be implemented with `k_poll` if needed. - pub fn wait_async<'a>( - &'a self, - timeout: impl Into, - ) -> impl Future> + 'a { - SignalWait { - signal: self, - timeout: timeout.into(), - ran: false, - } - } } impl Default for Signal { @@ -480,43 +456,6 @@ impl Default for Signal { } } -/// The Future for Signal::wait_async. -struct SignalWait<'a> { - /// The signal we are waiting on. - signal: &'a Signal, - /// The timeout to use. - timeout: Timeout, - /// Set after we've waited once, - ran: bool, -} - -impl<'a> Future for SignalWait<'a> { - type Output = crate::Result; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll { - // We can check if the even happened immediately, and avoid blocking if we were already - // signaled. - if let Some(result) = self.signal.check() { - return Poll::Ready(Ok(result)); - } - - if self.ran { - // If it is not ready, assuming a timeout. Note that if a thread other than this work - // thread resets the signal, it is possible to see a timeout even if `Forever` was given - // as the timeout. - return Poll::Ready(Err(crate::Error(ETIMEDOUT))); - } - - cx.add_signal(self.signal, self.timeout); - self.ran = true; - - Poll::Pending - } -} - /// Possible returns from work queue submission. #[derive(Debug, Clone, Copy)] pub enum SubmitResult { diff --git a/zephyr/src/work/futures.rs b/zephyr/src/work/futures.rs deleted file mode 100644 index be4e47d6..00000000 --- a/zephyr/src/work/futures.rs +++ /dev/null @@ -1,620 +0,0 @@ -//! Zephyr work wrappers targeted for the `Future` type. -//! -//! The future is similar to our [`SimpleAction`], with a few additional features: -//! - The poll function returns an enum indicating that either it can be suspended, or that it -//! is finished and has a result. -//! - The poll function takes a `Waker` which is used to "wake" the work item. -//! -//! However, there is a bit of a semantic mismatch between work queues and Futures. Futures are -//! effectively built with the assumption that the the waking will happen, by Rust code, at the -//! time the event is ready. However, work queues expect the work to be queued immediately, -//! with a "poll" indicating what kind of even the work. Work will be scheduled either based on -//! one of these events, or a timeout. -//! -//! [`SimpleAction`]: super::SimpleAction - -extern crate alloc; - -use alloc::boxed::Box; - -use core::{ - cell::UnsafeCell, - ffi::{c_int, c_void, CStr}, - future::Future, - mem, - pin::Pin, - ptr::{self, NonNull}, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, -}; - -use arrayvec::ArrayVec; -use zephyr_sys::{ - k_poll_event, k_poll_event_init, k_poll_modes_K_POLL_MODE_NOTIFY_ONLY, k_work, k_work_poll, - k_work_poll_init, k_work_poll_submit, k_work_poll_submit_to_queue, k_work_q, - ZR_POLL_TYPE_DATA_AVAILABLE, ZR_POLL_TYPE_SEM_AVAILABLE, ZR_POLL_TYPE_SIGNAL, -}; - -use crate::{ - printkln, - sync::{Arc, Mutex, Weak}, - sys::{queue::Queue, sync::Semaphore}, - time::{Duration, Forever, NoWait, Tick, Timeout}, -}; - -use super::{get_current_workq, Signal, SubmitResult, WorkQueue}; - -/// An answer to a completed Future. -/// -/// The are two times we need to wait on a future running to completion: the outer initial executor -/// invocation from the main thread, and running an async thread which will have a join method. -/// -/// For both cases, we will use a Semaphore to indicate when the data is available. -/// -/// The main issue is that this type is intended to be one shot. Trying to load a second value will -/// invalidate the data structure (the item will be replaced, but there is a race with the -/// semaphore). -/// -/// TODO: Currently, the data is stored inside of a Mutex. This isn't actually necessary (the -/// semaphore already manages the coordination), and only a memory barrier would be needed, which -/// would be provided by the semaphore. So, this should be changed to just unsafely share the data, -/// similar to how a mutex is implemented. -pub struct Answer { - item: Mutex>, - wake: Semaphore, -} - -impl Answer { - /// Construct a new Answer that does not have the result. - pub fn new() -> Self { - Self { - item: Mutex::new(None), - wake: Semaphore::new(0, 1), - } - } - - /// Place the item into the Answer. - /// - /// # Panic - /// - /// If the answer already contains an item, this will panic. - /// - /// # TODO - /// - /// We could check that the Answer has ever been used, not just that it has an answer in it. - pub fn place(&self, item: T) { - let mut inner = self.item.lock().expect("Get Mutex"); - if inner.is_some() { - panic!("Answer already contains a value"); - } - *inner = Some(item); - self.wake.give(); - } - - /// Synchronously wait for an Answer. - /// - /// Blocks the current thread until an answer is available, returning it. - pub fn take(&self) -> T { - self.wake.take(Forever).expect("Forever returned early"); - self.item - .lock() - .expect("Get Mutex") - .take() - .expect("Answer should contain value") - } - - /// Asynchronously wait for an answer. - pub async fn take_async(&self) -> T { - self.wake - .take_async(Forever) - .await - .expect("Forever returnd early"); - self.item - .lock() - .expect("Get Mutex") - .take() - .expect("Answer should contain value") - } -} - -/// Build a combiner for Future and a Zephyr work queue. This encapsulates the idea of starting -/// a new thread of work, and is the basis of both the main `run` for work queues, as well as -/// any calls to spawn that happen within the Future world. -pub struct WorkBuilder { - queue: Option>, - // A name for this task, used by debugging and such. - name: Option<&'static CStr>, -} - -impl WorkBuilder { - /// Construct a new builder for work. - /// - /// The builder will default to running on the system workqueue. - pub fn new() -> Self { - Self { - queue: None, - name: None, - } - } - - /// Set the work queue for this worker to run on. - /// - /// By default, A Worker will run on the system work-queue. - pub fn set_worker(&mut self, worker: &WorkQueue) -> &mut Self { - self.queue = Some(NonNull::new(worker.item.get()).expect("work must not be null")); - self - } - - /// Set a name for this worker, for debugging. - pub fn set_name(&mut self, name: &'static CStr) -> &mut Self { - self.name = Some(name); - self - } - - /// Start this working, consuming the given Future to do the work. - /// - /// The work queue is in a pinned Arc to meet requirements of how Futures are used. The Arc - /// maintains lifetime while the worker is running. See notes below for issues of lifetimes - /// and canceled work. - pub fn start(&self, future: F) -> JoinHandle { - JoinHandle::new(self, future) - } - - /// Start this work, locally running on the current worker thread. - /// - /// This is the same as `start`, but the work will always be started on the current work queue - /// thread. This relaxes the `Send` requirement, as the data will always be contained in a - /// single thread. - /// - /// # Panics - /// - /// If called from other than a Future running on a work queue, will panic. The System work - /// queue is not yet supported. - pub fn start_local(&self, future: F) -> JoinHandle { - JoinHandle::new_local(self, future) - } -} - -/// Calculate the memory needed by scheduled work. This does not include the size of the Answer -/// (which can be dropped). -pub fn work_size(f: F) -> usize { - WorkData::size_of(f) -} - -/// A potentially running Work. -/// -/// This encapsulates a Future that is potentially running in the Zephyr work queue system. -/// -/// # Safety -/// -/// Once the worker has been started (meaning once WorkBuilder::start returns this `Work`), all -/// but one field here is owned by the worker itself (it runs on the worker thread, hence the -/// Send constraint). The exception is the 'answer' field which can be used by the caller to -/// wait for the Work to finish. -pub struct JoinHandle { - /// The answer will be placed here. This Arc holds a strong reference, and if the spawning - /// thread doesn't hold the `Work`, it will be dropped. - answer: Arc>, -} - -// SAFETY: The join handle can be Send as long as the Output is send. It does not depend on the -// Future being send. -unsafe impl Send for JoinHandle -where - F: Future, - F::Output: Send, -{ -} - -impl JoinHandle { - /// Construct new [`JoinHandle`] that runs on a specified [`WorkQueue`]. - fn new(builder: &WorkBuilder, future: F) -> Self { - // Answer holds the result when the work finishes. - let answer = Arc::new(Answer::new()); - - let work = WorkData::new(future, Arc::downgrade(&answer), builder.queue, builder.name); - WorkData::submit(work).expect("Unable to enqueue worker"); - - Self { answer } - } -} - -impl JoinHandle { - /// Construct a new [`JoinHandle`] that runs on the current [`WorkQueue`]. - /// - /// # Panics - /// - /// If `new_local` is called from a context other than running within a worker defined in this - /// crate, it will panic. - /// - /// Note that currently, the system workq is not considered a worked defined in this crate. - fn new_local(builder: &WorkBuilder, future: F) -> Self { - let workq = get_current_workq().expect("Called new_local not from worker"); - let answer = Arc::new(Answer::new()); - - let work = WorkData::new( - future, - Arc::downgrade(&answer), - Some(NonNull::new(workq).unwrap()), - builder.name, - ); - WorkData::submit(work).expect("Unable to enqueue worker"); - - Self { answer } - } -} - -impl JoinHandle { - /// Synchronously wait for this future to have an answer. - pub fn join(&self) -> F::Output { - self.answer.take() - } - - /// Asynchronously wait for this future to have an answer. - pub async fn join_async(&self) -> F::Output { - self.answer.take_async().await - } -} - -/// Futures will need to be able to set the events and timeout of this waker. Because the Waker is -/// parameterized, they will not have access to the whole WorkWaker, but only this WakeInfo. -pub struct WakeInfo { - /// The work queue to submit this work to. None indicates the system workq. - pub(crate) queue: Option>, - /// Events to use for our next wakeup. Currently cleared before calling the future (although - /// this discards the wakeup reason, so needs to be fixed). - pub events: EventArray, - /// Timeout to use for the next wakeup. Will be set to Forever before calling the Future's - /// poll. - pub timeout: Timeout, - /// A Context to use for invoking workers. This `WakeInfo` can be recovered from this context. - /// Note that our contexts are `'static` as they are maintained inside of the worker. - pub context: Context<'static>, -} - -impl WakeInfo { - /// Recover the WakeInfo from a given context. - /// - /// # Safety - /// - /// Although the lifetime of Context is `'static`, the generic type passed to `Future` does not - /// specify a lifetime. As such, it is not possible for the future to store the Context, and - /// rescheduling must be specified before this Future invocation returns. - /// - /// This does assume we are only using the Zephyr scheduler. The Context does have an any-based - /// data pointer mechanism, but it is nightly. This recovery would be easier using that - /// mechanism. - pub unsafe fn from_context<'b>(context: &'b mut Context) -> &'b mut Self { - // SAFETY: We're doing pointer arithmetic to recover Self from a reference to the embedded - // context. The 'mut' is preserved to keep the rules of mut in Rust. - unsafe { - let this: *mut Context = context; - let this = this - .cast::() - .sub(mem::offset_of!(Self, context)) - .cast::(); - &mut *this - } - } - - /// Add an event that represents waiting for a semaphore to be available for "take". - pub fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore) { - // SAFETY: Fill with zeroed memory, initializatuon happens in the init function next. - self.events.push(unsafe { mem::zeroed() }); - let ev = self.events.last().unwrap(); - - unsafe { - k_poll_event_init( - ev.get(), - ZR_POLL_TYPE_SEM_AVAILABLE, - k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32, - sem.0.get() as *mut c_void, - ); - } - } - - /// Add an event that represents waiting for a signal. - pub fn add_signal<'a>(&'a mut self, signal: &'a Signal) { - // SAFETY: Fill with zeroed memory, initializatuon happens in the init function next. - self.events.push(unsafe { mem::zeroed() }); - let ev = self.events.last().unwrap(); - - unsafe { - k_poll_event_init( - ev.get(), - ZR_POLL_TYPE_SIGNAL, - k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32, - signal.item.get() as *mut c_void, - ); - } - } - - /// Add an event that represents waiting for a queue to have a message. - pub fn add_queue<'a>(&'a mut self, queue: &'a Queue) { - // SAFETY: Fill with zeroed memory, initializatuon happens in the init function next. - self.events.push(unsafe { mem::zeroed() }); - let ev = self.events.last().unwrap(); - - unsafe { - k_poll_event_init( - ev.get(), - ZR_POLL_TYPE_DATA_AVAILABLE, - k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32, - queue.0.get() as *mut c_void, - ); - } - } -} - -/// The worker-owned information about that worker. -/// -/// This holds a single worker, and will be owned by that worker itself. -struct WorkData { - /// Info needed to reschedule the work. - info: WakeInfo, - /// The Zephyr worker. This struct is allocated in a Box, and only used by the worker thread, - /// so it is easy to recover. The UnsafeCell is to indicate that Zephyr is free to mutate the - /// work. - work: UnsafeCell, - /// Where the answer is placed. This is weak because the spawning thread may not be interested - /// in the result, which will drop the only reference to the Arc, breaking the weak reference. - answer: Weak>, - /// The future that is running this work. - future: F, -} - -// SAFETY: The worker struct is explicitly safe to send by the Zephyr docs. -// unsafe impl Send for WorkData {} - -impl WorkData { - /// Build a new WorkWaker around the given future. The weak reference to the answer is where - /// the answer is stored if the task spawner is still interested in the answer. - fn new( - future: F, - answer: Weak>, - queue: Option>, - name: Option<&'static CStr>, - ) -> Pin> { - // name is only used for SystemView debugging, so prevent a warning when that is not - // enabled. - let _ = name; - - let this = Box::pin(Self { - // SAFETY: This will be initialized below, once the Box allocates and the memory won't - // move. - work: unsafe { mem::zeroed() }, - future, - answer, - info: WakeInfo { - queue, - events: EventArray::new(), - // Initial timeout is NoWait so work starts as soon as submitted. - timeout: NoWait.into(), - context: Context::from_waker(&VOID_WAKER), - }, - }); - - unsafe { - // SAFETY: The above Arc allocates the worker. The code here is careful to not move it. - k_work_poll_init(this.work.get(), Some(Self::handler)); - - // If we have a name, send it to Segger. - #[cfg(CONFIG_SEGGER_SYSTEMVIEW)] - { - let ww = &(&*this.work.get()).work; - if let Some(name) = name { - let info = crate::raw::SEGGER_SYSVIEW_TASKINFO { - TaskID: this.work.get() as ::core::ffi::c_ulong, - sName: name.as_ptr(), - Prio: 1, - StackBase: 0, - StackSize: 32, - }; - crate::raw::SEGGER_SYSVIEW_OnTaskCreate(this.work.get() as ::core::ffi::c_ulong); - crate::raw::SEGGER_SYSVIEW_SendTaskInfo(&info); - } - } - } - - this - } - - /// Submit this work to the Zephyr work queue. This consumes the Box, with the primary owner - /// being the work thread itself. Not that canceling work will leak the worker. - fn submit(mut this: Pin>) -> crate::Result { - // SAFETY: This is unsafe because the pointer lose the Pin guarantee, but C code will not - // move it. - let this_ref = unsafe { Pin::get_unchecked_mut(this.as_mut()) }; - - let result = if let Some(queue) = this_ref.info.queue { - unsafe { - // SAFETY: We're transferring ownership of the box to the enqueued work. For - // regular re-submission as the worker runs, the worker won't be run until this - // method exits. For initial creation, there is a possible period where our - // reference here survives while the worker is schedule (when the work queue is - // higher priority than this. I'm not sure if this fully followes the rules, as - // there is still a reference to this here, but as long as we only use it to leak - // the box, I believe we are safe. If this is deemed unsafe, these values could be - // copied to variables and the box leaked before we enqueue. - k_work_poll_submit_to_queue( - queue.as_ptr(), - this_ref.work.get(), - this_ref.info.events.as_mut_ptr() as *mut k_poll_event, - this.info.events.len() as c_int, - this.info.timeout.0, - ) - } - } else { - unsafe { - // SAFETY: See above, safety here is the same. - k_work_poll_submit( - this_ref.work.get(), - this_ref.info.events.as_mut_ptr() as *mut k_poll_event, - this_ref.info.events.len() as c_int, - this_ref.info.timeout.0, - ) - } - }; - - // The Box has been handed to C. Consume the box, leaking the value. We use `into_raw` as - // it is the raw pointer we will be recovering the Box with when the worker runs. - let _ = Self::into_raw(this); - - match result { - 0 => Ok(SubmitResult::Enqueued), - code => panic!("Unexpected result from work poll submit: {}", code), - } - } - - /// The work callback, coming from the Zephyr C world. The box was into_raw(), We recover the - /// WorkWaker by using container_of and recovering it back into a box, which we will leak when - /// we re-submit it. - extern "C" fn handler(work: *mut k_work) { - // Note that we want to avoid needing a `repr(C)` on our struct, so the k_work pointer is - // not necessarily at the beginning of the struct. - let mut this = unsafe { Self::from_raw(work) }; - - let this_ref = unsafe { Pin::get_unchecked_mut(this.as_mut()) }; - - // Set the next work to Forever, with no events. TODO: This prevents the next poll from - // being able to determine the reason for the wakeup. - this_ref.info.events.clear(); - this_ref.info.timeout = Forever.into(); - - // SAFETY: poll requires the pointer to be pinned, in case that is needed. We rely on the - // Boxing of the pointer, and that our code does not move the future. - let future = unsafe { Pin::new_unchecked(&mut this_ref.future) }; - #[cfg(CONFIG_SEGGER_SYSTEMVIEW)] - unsafe { - crate::raw::SEGGER_SYSVIEW_OnTaskStartExec(work as u32); - } - match future.poll(&mut this_ref.info.context) { - Poll::Pending => { - #[cfg(CONFIG_SEGGER_SYSTEMVIEW)] - unsafe { - crate::raw::SEGGER_SYSVIEW_OnTaskStopExec(); - } - // With pending, use the timeout and events to schedule ourselves to do more work. - // TODO: If we want to support a real Waker, this would need to detect that, and - // schedule a possible wake on this no wake case. - // Currently, this check is only testing that something is missed, and is really - // more of a debug assertion. - if this.info.events.is_empty() && this.info.timeout == Forever.into() { - printkln!("Warning: worker scheduled to never wake up"); - } - - // The re-submission will give ownership of the box back to the scheduled work. - Self::submit(this).expect("Unable to schedule work"); - } - Poll::Ready(answer) => { - #[cfg(CONFIG_SEGGER_SYSTEMVIEW)] - unsafe { - crate::raw::SEGGER_SYSVIEW_OnTaskStopExec(); - } - // TODO: Delete the task as well. - // If the spawning task is still interested in the answer, provide it. - if let Some(store) = this.answer.upgrade() { - store.place(answer); - } - - // Work is finished, so allow the Box to be dropped. - } - } - } - - /// Consume the pinned box containing Self, and return the internal pointer. - fn into_raw(this: Pin>) -> *mut Self { - // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't - // generally use move. - let this = unsafe { Pin::into_inner_unchecked(this) }; - Box::into_raw(this) - } - - /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data. - unsafe fn from_raw(ptr: *mut k_work) -> Pin> { - // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the - // pointer is valid. - let ptr = ptr - .cast::() - .sub(mem::offset_of!(k_work_poll, work)) - .sub(mem::offset_of!(Self, work)) - .cast::(); - let this = Box::from_raw(ptr); - Pin::new_unchecked(this) - } - - /// Determine the size of WorkData for a given Future. - /// - /// It is difficult to otherwise calculate this. The future will be dropped by this. - pub fn size_of(_: F) -> usize { - mem::size_of::() - } -} - -/// A VoidWaker is used when we don't use the Waker mechanism. There is no data associated with -/// this waker, and it panics if anyone tries to clone it or use it to wake a task. -/// This is static to simplify lifetimes. -static VOID_WAKER: Waker = unsafe { - Waker::from_raw(RawWaker::new( - ptr::null(), - &RawWakerVTable::new(void_clone, void_wake, void_wake_by_ref, void_drop), - )) -}; - -/// Void clone operation. Panics for now. If we want to implement a real waker, this will need -/// to be managed. -unsafe fn void_clone(_: *const ()) -> RawWaker { - panic!("Zephyr Wakers not yet supported for general 'Waker' use"); -} - -/// Void wake operation. Panics for now. If we want to implement a real waker, this will need -/// to be managed. -unsafe fn void_wake(_: *const ()) { - panic!("Zephyr Wakers not yet supported for general 'Waker' use"); -} - -/// Void wake_by_ref operation. Panics for now. If we want to implement a real waker, this will need -/// to be managed. -unsafe fn void_wake_by_ref(_: *const ()) { - panic!("Zephyr Wakers not yet supported for general 'Waker' use"); -} - -/// The void drop will be called when the Context is dropped after the first invocation. Because -/// clone above panics, we know there aren't references hanging around. So, it is safe to just -/// do nothing. -unsafe fn void_drop(_: *const ()) {} -/// To avoid having to parameterize everything, we limit the size of the ArrayVec of events to -/// this amount. The amount needed her depends on overall use, but so far, 1 is sufficient. -type EventArray = ArrayVec, 1>; - -/// Async sleep. -pub fn sleep(duration: Duration) -> Sleep { - Sleep { - ticks_left: duration.ticks(), - } -} - -/// A future that sleeps for a while. -pub struct Sleep { - // How much time is left. TODO: Change this into an absolute sleep once we have the ability to - // determine why were were scheduled. - ticks_left: Tick, -} - -impl Future for Sleep { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // If the sleep is done, so are we. - if self.ticks_left == 0 { - return Poll::Ready(()); - } - - // Otherwise, queue outselves back. - let this = unsafe { WakeInfo::from_context(cx) }; - - this.timeout = Duration::from_ticks(self.ticks_left).into(); - self.ticks_left = 0; - - Poll::Pending - } -}