Skip to content

Commit d78fd67

Browse files
committed
zephyr: work: Create ContextExt
Because Zephyr's workqueue re-scheduling is based on the Context struct, not the waker, we need a way to get back to our `WorkInfo` struct that this Context is embedded in. Create a `ContextExt` that adds some methods to the `Context` to be able to indicate when the scheduler should reschedule this work. Signed-off-by: David Brown <[email protected]>
1 parent cf6f5df commit d78fd67

File tree

5 files changed

+81
-34
lines changed

5 files changed

+81
-34
lines changed

zephyr/src/kio.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@
88
//! [`futures`]: crate::work::futures
99
1010
use core::ffi::CStr;
11-
use core::task::Poll;
11+
use core::task::{Context, Poll};
1212
use core::{future::Future, pin::Pin};
1313

14-
use crate::time::NoWait;
14+
use crate::sys::queue::Queue;
15+
use crate::sys::sync::Semaphore;
16+
use crate::time::{NoWait, Timeout};
1517
use crate::work::futures::WakeInfo;
18+
use crate::work::Signal;
1619
use crate::work::{futures::JoinHandle, futures::WorkBuilder, WorkQueue};
1720

1821
pub mod sync;
1922

23+
pub use crate::work::futures::sleep;
24+
2025
/// Run an async future on the given worker thread.
2126
///
2227
/// Arrange to have the given future run on the given worker thread. The resulting `JoinHandle` has
@@ -94,3 +99,67 @@ impl Future for YieldNow {
9499
}
95100
}
96101
}
102+
103+
/// Extensions on [`Context`] to support scheduling via Zephyr's workqueue system.
104+
///
105+
/// All of these are called from within the context of running work, and indicate what _next_
106+
/// should cause this work to be run again. If none of these methods are called before the work
107+
/// exits, the work will be scheduled to run after `Forever`, which is not useful. There may be
108+
/// later support for having a `Waker` that can schedule work from another context.
109+
///
110+
/// Note that the events to wait on, such as Semaphores or channels, if there are multiple threads
111+
/// that can wait for them, might cause this worker to run, but not actually be available. As such,
112+
/// to maintain the non-blocking requirements of Work, [`Semaphore::take`], and the blocking `send`
113+
/// and `recv` operations on channels should not be used, even after being woken.
114+
///
115+
/// For the timeout [`Forever`] is useful to indicate there is no timeout. If called with
116+
/// [`NoWait`], the work will be immediately scheduled. In general, it is better to query the
117+
/// underlying object directly rather than have the overhead of being rescheduled.
118+
///
119+
/// # Safety
120+
///
121+
/// The lifetime bounds on the items waited for ensure that these items live at least as long as the
122+
/// work queue. Practically, this can only be satisfied by using something with 'static' lifetime,
123+
/// or embedding the value in the Future itself.
124+
///
125+
/// With the Zephyr executor, the `Context` is embedded within a `WakeInfo` struct, which this makes
126+
/// use of. If a different executor were to be used, these calls would result in undefined
127+
/// behavior.
128+
///
129+
/// This could be checked at runtime, but it would have runtime cost.
130+
pub trait ContextExt {
131+
/// Indicate the work should next be scheduled based on a semaphore being available for "take".
132+
///
133+
/// The work will be scheduled either when the given semaphore becomes available to 'take', or
134+
/// after the timeout.
135+
fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into<Timeout>);
136+
137+
/// Indicate that the work should be scheduled after receiving the given [`Signal`], or the
138+
/// timeout occurs.
139+
fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into<Timeout>);
140+
141+
/// Indicate that the work should be scheduled when the given [`Queue`] has data available to
142+
/// recv, or the timeout occurs.
143+
fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into<Timeout>);
144+
}
145+
146+
/// Implementation of ContextExt for the Rust [`Context`] type.
147+
impl<'b> ContextExt for Context<'b> {
148+
fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into<Timeout>) {
149+
let info = unsafe { WakeInfo::from_context(self) };
150+
info.add_semaphore(sem);
151+
info.timeout = timeout.into();
152+
}
153+
154+
fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into<Timeout>) {
155+
let info = unsafe { WakeInfo::from_context(self) };
156+
info.add_signal(signal);
157+
info.timeout = timeout.into();
158+
}
159+
160+
fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into<Timeout>) {
161+
let info = unsafe { WakeInfo::from_context(self) };
162+
info.add_queue(queue);
163+
info.timeout = timeout.into();
164+
}
165+
}

zephyr/src/sync/channel.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ use core::mem::MaybeUninit;
5151
use core::pin::Pin;
5252
use core::task::Poll;
5353

54+
use crate::kio::ContextExt;
5455
use crate::sys::queue::Queue;
5556
use crate::time::{Duration, Forever, NoWait, Timeout};
56-
use crate::work::futures::WakeInfo;
5757

5858
mod counter;
5959

@@ -270,18 +270,14 @@ impl<'a, T: Unpin> Future for SendFuture<'a, T> {
270270
this.msg = Some(msg);
271271

272272
// Otherwise, schedule to wake up on receipt or timeout.
273-
let info = unsafe { WakeInfo::from_context(cx) };
274273
match &this.sender.flavor {
275274
SenderFlavor::Unbounded { .. } => {
276275
panic!("Implementation error: unbounded queues should never fail");
277276
}
278277
SenderFlavor::Bounded(chan) => {
279-
unsafe {
280-
info.add_queue(&chan.free);
281-
}
278+
cx.add_queue(&chan.free, this.timeout);
282279
}
283280
}
284-
info.timeout = this.timeout;
285281

286282
Poll::Pending
287283
}
@@ -522,11 +518,7 @@ impl<'a, T> Future for RecvFuture<'a, T> {
522518
}
523519

524520
// Otherwise, schedule to wakeup on receipt or timeout.
525-
let info = unsafe { WakeInfo::from_context(cx) };
526-
unsafe {
527-
info.add_queue(self.receiver.as_queue());
528-
}
529-
info.timeout = self.timeout;
521+
cx.add_queue(self.receiver.as_queue(), self.timeout);
530522
self.waited = true;
531523

532524
Poll::Pending

zephyr/src/sys/sync/semaphore.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ use core::mem;
2525
#[cfg(CONFIG_RUST_ALLOC)]
2626
use zephyr_sys::ETIMEDOUT;
2727

28+
use crate::kio::ContextExt;
2829
#[cfg(CONFIG_RUST_ALLOC)]
2930
use crate::time::NoWait;
30-
#[cfg(CONFIG_RUST_ALLOC)]
31-
use crate::work::futures::WakeInfo;
3231
use crate::{
3332
error::{to_result_void, Result},
3433
object::{Fixed, StaticKernelObject, Wrapped},
@@ -144,13 +143,7 @@ impl<'a> Future for SemTake<'a> {
144143
}
145144

146145
// TODO: Clean this up.
147-
let info = unsafe { WakeInfo::from_context(cx) };
148-
unsafe {
149-
// SAFETY: The semaphore must outlive the queued event. The lifetime ensures that the
150-
// Future won't outlive the semaphore.
151-
info.add_semaphore(self.sem);
152-
}
153-
info.timeout = self.timeout;
146+
cx.add_semaphore(self.sem, self.timeout);
154147
self.ran = true;
155148

156149
Poll::Pending

zephyr/src/work.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,14 @@ use core::{
179179
ptr,
180180
task::Poll,
181181
};
182-
use futures::WakeInfo;
183182

184183
use zephyr_sys::{
185184
k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
186185
k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
187186
k_work_queue_start, k_work_submit, k_work_submit_to_queue, ETIMEDOUT,
188187
};
189188

190-
use crate::{error::to_result_void, object::Fixed, simpletls::StaticTls, sys::thread::ThreadStack, time::Timeout};
189+
use crate::{error::to_result_void, kio::ContextExt, object::Fixed, simpletls::StaticTls, sys::thread::ThreadStack, time::Timeout};
191190

192191
pub mod futures;
193192

@@ -485,13 +484,7 @@ impl<'a> Future for SignalWait<'a> {
485484
return Poll::Ready(Err(crate::Error(ETIMEDOUT)));
486485
}
487486

488-
let info = unsafe { WakeInfo::from_context(cx) };
489-
unsafe {
490-
// SAFETY: The Signal must outlive the queued event. The lifetime ensure that the
491-
// Future can't outlive the Signal.
492-
info.add_signal(self.signal);
493-
}
494-
info.timeout = self.timeout;
487+
cx.add_signal(self.signal, self.timeout);
495488
self.ran = true;
496489

497490
Poll::Pending

zephyr/src/work/futures.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ impl WakeInfo {
285285
}
286286

287287
/// Add an event that represents waiting for a semaphore to be available for "take".
288-
pub unsafe fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore) {
288+
pub fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore) {
289289
// SAFETY: Fill with zeroed memory, initializatuon happens in the init function next.
290290
self.events.push(unsafe { mem::zeroed() });
291291
let ev = self.events.last().unwrap();
@@ -301,7 +301,7 @@ impl WakeInfo {
301301
}
302302

303303
/// Add an event that represents waiting for a signal.
304-
pub unsafe fn add_signal<'a>(&'a mut self, signal: &'a Signal) {
304+
pub fn add_signal<'a>(&'a mut self, signal: &'a Signal) {
305305
// SAFETY: Fill with zeroed memory, initializatuon happens in the init function next.
306306
self.events.push(unsafe { mem::zeroed() });
307307
let ev = self.events.last().unwrap();
@@ -317,7 +317,7 @@ impl WakeInfo {
317317
}
318318

319319
/// Add an event that represents waiting for a queue to have a message.
320-
pub unsafe fn add_queue<'a>(&'a mut self, queue: &'a Queue) {
320+
pub fn add_queue<'a>(&'a mut self, queue: &'a Queue) {
321321
// SAFETY: Fill with zeroed memory, initializatuon happens in the init function next.
322322
self.events.push(unsafe { mem::zeroed() });
323323
let ev = self.events.last().unwrap();

0 commit comments

Comments
 (0)