Skip to content

Commit 6003752

Browse files
committed
zephyr: Add initial implementation of work queue and async support
This provides a first pass an an implementation of management of Zephyr work queues, and an executor that schedules work using Zephyr's work queues. TODO: Clean up how Futures can use the Context to indicate scheduling. TODO: Move a few things to make the used modules a bit cleaner. Signed-off-by: David Brown <[email protected]>
1 parent da280a6 commit 6003752

File tree

15 files changed

+1881
-6
lines changed

15 files changed

+1881
-6
lines changed

zephyr-sys/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ fn main() -> Result<()> {
9292
.allowlist_item("K_.*")
9393
.allowlist_item("ZR_.*")
9494
.allowlist_item("LOG_LEVEL_.*")
95+
.allowlist_item("k_poll_modes")
9596
// Deprecated
9697
.blocklist_function("sys_clock_timeout_end_calc")
9798
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))

zephyr-sys/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,5 @@ macro_rules! derive_copy {
4949

5050
derive_copy!(z_spinlock_key);
5151
derive_clone!(z_spinlock_key);
52+
derive_copy!(k_timeout_t);
53+
derive_clone!(k_timeout_t);

zephyr-sys/wrapper.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,7 @@ extern int errno;
5252
*/
5353
const uintptr_t ZR_STACK_ALIGN = Z_KERNEL_STACK_OBJ_ALIGN;
5454
const uintptr_t ZR_STACK_RESERVED = K_KERNEL_STACK_RESERVED;
55+
56+
const uint32_t ZR_POLL_TYPE_SEM_AVAILABLE = K_POLL_TYPE_SEM_AVAILABLE;
57+
const uint32_t ZR_POLL_TYPE_SIGNAL = K_POLL_TYPE_SIGNAL;
58+
const uint32_t ZR_POLL_TYPE_DATA_AVAILABLE = K_POLL_TYPE_DATA_AVAILABLE;

zephyr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ cfg-if = "1.0"
2020

2121
# The log create is used if the user desires logging, and calls `set_logger()`.
2222
log = "0.4.22"
23+
arrayvec = { version = "0.7.6", default-features = false }
2324

2425
[dependencies.fugit]
2526
version = "0.3.7"

zephyr/src/kio.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//! Async IO for Zephyr
2+
//!
3+
//! This implements the basics of using Zephyr's work queues to implement async code on Zephyr.
4+
//!
5+
//! Most of the work happens in [`work`] and in [`futures`]
6+
//!
7+
//! [`work`]: crate::work
8+
//! [`futures`]: crate::work::futures
9+
10+
use core::ffi::CStr;
11+
use core::task::Poll;
12+
use core::{future::Future, pin::Pin};
13+
14+
use crate::time::NoWait;
15+
use crate::work::futures::WakeInfo;
16+
use crate::work::{futures::JoinHandle, futures::WorkBuilder, WorkQueue};
17+
18+
pub mod sync;
19+
20+
/// Run an async future on the given worker thread.
21+
///
22+
/// Arrange to have the given future run on the given worker thread. The resulting `JoinHandle` has
23+
/// `join` and `join_async` methods that can be used to wait for the given thread.
24+
pub fn spawn<F>(future: F, worker: &WorkQueue, name: &'static CStr) -> JoinHandle<F>
25+
where
26+
F: Future + Send + 'static,
27+
F::Output: Send + 'static,
28+
{
29+
WorkBuilder::new()
30+
.set_worker(worker)
31+
.set_name(name)
32+
.start(future)
33+
}
34+
35+
/// Run an async future on the current worker thread.
36+
///
37+
/// Arrange to have the given future run on the current worker thread. The resulting `JoinHandle`
38+
/// has `join` and `join_async` methods that can be used to wait for the given thread.
39+
///
40+
/// The main use for this is to allow work threads to use `Rc` and `Rc<RefCell<T>>` within async
41+
/// tasks. The main constraint is that references inside cannot be held across an `.await`.
42+
///
43+
/// # Panics
44+
/// If this is called other than from a worker task running on a work thread, it will panic.
45+
pub fn spawn_local<F>(future: F, name: &'static CStr) -> JoinHandle<F>
46+
where
47+
F: Future + 'static,
48+
F::Output: Send + 'static,
49+
{
50+
WorkBuilder::new()
51+
.set_name(name)
52+
.start_local(future)
53+
}
54+
55+
/// Yield the current thread, returning it to the work queue to be run after other work on that
56+
/// queue. (This has to be called `yield_now` in Rust, because `yield` is a keyword.
57+
pub fn yield_now() -> impl Future<Output = ()> {
58+
YieldNow { waited: false }
59+
}
60+
61+
struct YieldNow {
62+
waited: bool,
63+
}
64+
65+
impl Future for YieldNow {
66+
type Output = ();
67+
68+
fn poll(
69+
mut self: Pin<&mut Self>,
70+
cx: &mut core::task::Context<'_>,
71+
) -> core::task::Poll<Self::Output> {
72+
if self.waited {
73+
Poll::Ready(())
74+
} else {
75+
// Enqueue outselves with no wait and no events.
76+
let info = unsafe { WakeInfo::from_context(cx) };
77+
78+
// Unsafely check if the work queue running us is empty. We only check explicitly
79+
// specified workers (TODO access the system work queue). The check is racy, but should
80+
// always fail indicating that the queue is not empty when it could be. Checking this
81+
// avoids re-scheduling the only worker back into the queue.
82+
// SAFETY: The check is racy, but will fail with us yielding when we didn't need to.
83+
if let Some(wq) = info.queue {
84+
let wq = unsafe { wq.as_ref() };
85+
if wq.pending.head == wq.pending.tail {
86+
return Poll::Ready(());
87+
}
88+
}
89+
90+
info.timeout = NoWait.into();
91+
self.waited = true;
92+
93+
Poll::Pending
94+
}
95+
}
96+
}

zephyr/src/kio/sync.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//! Synchronization mechanisms that work with async.
2+
//!
3+
//! Notably, Zephyr's `k_mutex` type isn't supported as a type that can be waited for
4+
//! asynchronously.
5+
//!
6+
//! The main problem with `k_mutex` (meaning [`crate::sync::Mutex`]) is that the `lock` operation
7+
//! can block, and since multiple tasks may be scheduled for the same work queue, the system can
8+
//! deadlock, as the scheduler may not run to allow the task that actually holds the mutex to run.
9+
//!
10+
//! As an initial stopgap. We provide a [`Mutex`] type that is usable within an async context. We
11+
//! do not currently implement an associated `Condvar`.
12+
//!
13+
//! Note that using Semaphores for locking means that this mechanism doesn't handle priority
14+
//! inversion issues. Be careful with workers that run at different priorities.
15+
16+
// Use the same error types from the regular sync version.
17+
18+
use core::{
19+
cell::UnsafeCell,
20+
fmt,
21+
marker::PhantomData,
22+
ops::{Deref, DerefMut},
23+
};
24+
25+
use crate::{
26+
sync::{LockResult, TryLockError, TryLockResult},
27+
sys::sync::Semaphore,
28+
time::{Forever, NoWait},
29+
};
30+
31+
/// A mutual exclusion primitive useful for protecting shared data. Async version.
32+
///
33+
/// This mutex will block a task waiting for the lock to become available.
34+
pub struct Mutex<T: ?Sized> {
35+
/// The semaphore indicating ownership of the data. When it is "0" the task that did the 'take'
36+
/// on it owns the data, and will use `give` when it is unlocked. This mechanism works for
37+
/// simple Mutex that protects the data without needing a condition variable.
38+
inner: Semaphore,
39+
data: UnsafeCell<T>,
40+
}
41+
42+
// SAFETY: The semaphore, with the semantics provided here, provide Send and Sync.
43+
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
44+
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
45+
46+
impl<T> fmt::Debug for Mutex<T> {
47+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48+
write!(f, "Mutex {:?}", self.inner)
49+
}
50+
}
51+
52+
/// An RAII implementation of a held lock.
53+
pub struct MutexGuard<'a, T: ?Sized + 'a> {
54+
lock: &'a Mutex<T>,
55+
// Mark !Send explicitly until support is added to Rust for this.
56+
_nosend: PhantomData<UnsafeCell<()>>,
57+
}
58+
59+
// unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
60+
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
61+
62+
impl<T> Mutex<T> {
63+
/// Construct a new Mutex.
64+
pub fn new(t: T) -> Mutex<T> {
65+
Mutex {
66+
inner: Semaphore::new(1, 1).unwrap(),
67+
data: UnsafeCell::new(t),
68+
}
69+
}
70+
}
71+
72+
impl<T: ?Sized> Mutex<T> {
73+
/// Acquire the mutex, blocking the current thread until it is able to do so.
74+
///
75+
/// This is a sync version, and calling it from an async task will possibly block the async work
76+
/// thread, potentially causing deadlock.
77+
pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
78+
self.inner.take(Forever).unwrap();
79+
unsafe { Ok(MutexGuard::new(self)) }
80+
}
81+
82+
/// Aquire the mutex, async version.
83+
pub async fn lock_async(&self) -> LockResult<MutexGuard<'_, T>> {
84+
self.inner.take_async(Forever).await.unwrap();
85+
unsafe { Ok(MutexGuard::new(self)) }
86+
}
87+
88+
/// Attempt to aquire the lock.
89+
pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
90+
match self.inner.take(NoWait) {
91+
Ok(()) => unsafe { Ok(MutexGuard::new(self)) },
92+
// TODO: Distinguish timeout from other errors.
93+
Err(_) => Err(TryLockError::WouldBlock),
94+
}
95+
}
96+
}
97+
98+
impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> {
99+
unsafe fn new(lock: &'mutex Mutex<T>) -> MutexGuard<'mutex, T> {
100+
MutexGuard {
101+
lock,
102+
_nosend: PhantomData,
103+
}
104+
}
105+
}
106+
107+
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
108+
type Target = T;
109+
110+
fn deref(&self) -> &T {
111+
unsafe { &*self.lock.data.get() }
112+
}
113+
}
114+
115+
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
116+
fn deref_mut(&mut self) -> &mut T {
117+
unsafe { &mut *self.lock.data.get() }
118+
}
119+
}
120+
121+
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
122+
#[inline]
123+
fn drop(&mut self) {
124+
self.lock.inner.give();
125+
}
126+
}

zephyr/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@ pub mod device;
1515
pub mod error;
1616
pub mod logging;
1717
pub mod object;
18+
pub mod simpletls;
1819
pub mod sync;
1920
pub mod sys;
2021
pub mod time;
2122
#[cfg(CONFIG_RUST_ALLOC)]
2223
pub mod timer;
24+
#[cfg(CONFIG_RUST_ALLOC)]
25+
pub mod work;
26+
#[cfg(CONFIG_RUST_ALLOC)]
27+
pub mod kio;
2328

2429
pub use error::{Error, Result};
2530

@@ -108,3 +113,11 @@ pub mod _export {
108113
// If allocation has been requested, provide the allocator.
109114
#[cfg(CONFIG_RUST_ALLOC)]
110115
pub mod alloc_impl;
116+
117+
#[cfg(CONFIG_RUST_ALLOC)]
118+
pub mod task {
119+
//! Provides the portable-atomic version of `alloc::task::Wake`, which uses the compatible
120+
//! versionm of Arc.
121+
122+
pub use portable_atomic_util::task::Wake;
123+
}

0 commit comments

Comments
 (0)