Skip to content

Commit e34fd58

Browse files
committed
zephyr: sync: channel: Implement Async operations
Add async operations to the channels, where it makes sense. Signed-off-by: David Brown <[email protected]>
1 parent 006d6d9 commit e34fd58

File tree

6 files changed

+184
-12
lines changed

6 files changed

+184
-12
lines changed

zephyr-sys/wrapper.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ const uintptr_t ZR_STACK_RESERVED = K_KERNEL_STACK_RESERVED;
4848

4949
const uint32_t ZR_POLL_TYPE_SEM_AVAILABLE = K_POLL_TYPE_SEM_AVAILABLE;
5050
const uint32_t ZR_POLL_TYPE_SIGNAL = K_POLL_TYPE_SIGNAL;
51+
const uint32_t ZR_POLL_TYPE_DATA_AVAILABLE = K_POLL_TYPE_DATA_AVAILABLE;

zephyr/src/sync/channel.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ use alloc::boxed::Box;
4545
use core::cell::UnsafeCell;
4646
use core::ffi::c_void;
4747
use core::fmt;
48+
use core::future::Future;
4849
use core::marker::PhantomData;
4950
use core::mem::MaybeUninit;
5051
use core::pin::Pin;
52+
use core::task::Poll;
5153

5254
use crate::sys::queue::Queue;
5355
use crate::time::{Forever, NoWait, Timeout};
56+
use crate::work::futures::WorkWaker;
5457

5558
mod counter;
5659

@@ -205,6 +208,88 @@ impl<T> Sender<T> {
205208
}
206209
}
207210

211+
// A little note about the Unpin constraint here. Because Futures are pinned in Rust Async code,
212+
// and the future stores the messages, we can only send and receive messages that aren't pinned.
213+
impl<T: Unpin> Sender<T> {
214+
/// Waits for a message to be sent into the channel, but only for a limited time. Async
215+
/// version.
216+
///
217+
/// This has the same behavior as [`send_timeout`], but as an Async function.
218+
pub fn send_timeout_async<'a>(&'a self, msg: T, timeout: impl Into<Timeout>)
219+
-> impl Future<Output = Result<(), SendError<T>>> + 'a
220+
{
221+
SendFuture {
222+
sender: self,
223+
msg: Some(msg),
224+
timeout: timeout.into(),
225+
waited: false,
226+
}
227+
}
228+
229+
/// Sends a message over the given channel, waiting if necessary. Async version.
230+
pub async fn send_async(&self, msg: T) -> Result<(), SendError<T>> {
231+
self.send_timeout_async(msg, Forever).await
232+
}
233+
234+
// Note that there is no async version of `try_send`.
235+
}
236+
237+
/// The implementation of Future for Sender::send_timeout_async.
238+
struct SendFuture<'a, T: Unpin> {
239+
sender: &'a Sender<T>,
240+
msg: Option<T>,
241+
timeout: Timeout,
242+
waited: bool,
243+
}
244+
245+
impl<'a, T: Unpin> Future for SendFuture<'a, T> {
246+
type Output = Result<(), SendError<T>>;
247+
248+
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
249+
/*
250+
let this = unsafe {
251+
Pin::get_unchecked_mut(self)
252+
};
253+
*/
254+
let this = Pin::get_mut(self);
255+
256+
// Take the message out in preparation to try sending it. It is a logic error if the unwrap
257+
// fails.
258+
let msg = this.msg.take().unwrap();
259+
260+
// Try sending the message, with no timeout.
261+
let msg = match this.sender.try_send(msg) {
262+
Ok(()) => return Poll::Ready(Ok(())),
263+
Err(SendError(msg)) => msg,
264+
};
265+
266+
if this.waited {
267+
// We already waited, and no message, so give the messagre back, indiciating a timeout.
268+
return Poll::Ready(Err(SendError(msg)));
269+
}
270+
271+
// Send didn't happen, put the message back to have for the next call.
272+
this.msg = Some(msg);
273+
274+
// Otherwise, schedule to wake up on receipt or timeout.
275+
let work = unsafe { WorkWaker::from_waker(cx.waker()) };
276+
let mut lock = work.lock().unwrap();
277+
match &this.sender.flavor {
278+
SenderFlavor::Unbounded { .. } => {
279+
panic!("Implementation error: unbounded queues should never fail");
280+
}
281+
SenderFlavor::Bounded(chan) => {
282+
unsafe {
283+
lock.add_queue(&chan.free);
284+
}
285+
}
286+
}
287+
lock.timeout = this.timeout;
288+
289+
Poll::Pending
290+
}
291+
}
292+
208293
impl<T> Drop for Sender<T> {
209294
fn drop(&mut self) {
210295
match &self.flavor {
@@ -341,6 +426,42 @@ impl<T> Receiver<T> {
341426
}
342427
}
343428

429+
// Note that receive doesn't need the Unpin constraint, as we aren't storing any message.
430+
impl<T> Receiver<T> {
431+
/// Waits for a message to be received from the channel, but only for a limited time.
432+
/// Async version.
433+
///
434+
/// If the channel is empty and not disconnected, this call will block until the receive
435+
/// operation can proceed or the operation times out.
436+
/// wake up and return an error.
437+
pub fn recv_timeout_async<'a>(&'a self, timeout: impl Into<Timeout>)
438+
-> impl Future<Output = Result<T, RecvError>> + 'a
439+
{
440+
RecvFuture {
441+
receiver: self,
442+
timeout: timeout.into(),
443+
waited: false,
444+
}
445+
}
446+
447+
/// Blocks the current thread until a message is received or the channel is empty and
448+
/// disconnected. Async version.
449+
///
450+
/// If the channel is empty and not disconnected, this call will block until the receive
451+
/// operation can proceed.
452+
pub async fn recv_async(&self) -> Result<T, RecvError> {
453+
self.recv_timeout_async(Forever).await
454+
}
455+
456+
/// Return a reference to the inner queue.
457+
fn as_queue(&self) -> &Queue {
458+
match &self.flavor {
459+
ReceiverFlavor::Unbounded { queue, .. } => queue,
460+
ReceiverFlavor::Bounded(chan) => &chan.chan,
461+
}
462+
}
463+
}
464+
344465
impl<T> Drop for Receiver<T> {
345466
fn drop(&mut self) {
346467
match &self.flavor {
@@ -390,6 +511,38 @@ impl<T> fmt::Debug for Receiver<T> {
390511
}
391512
}
392513

514+
struct RecvFuture<'a, T> {
515+
receiver: &'a Receiver<T>,
516+
timeout: Timeout,
517+
waited: bool,
518+
}
519+
520+
impl<'a, T> Future for RecvFuture<'a, T> {
521+
type Output = Result<T, RecvError>;
522+
523+
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
524+
// Try to receive a message.
525+
if let Ok(msg) = self.receiver.try_recv() {
526+
return Poll::Ready(Ok(msg));
527+
}
528+
529+
if self.waited {
530+
// Wait already happened, so this is a timeout.
531+
return Poll::Ready(Err(RecvError));
532+
}
533+
534+
// Otherwise, schedule to wakeup on receipt or timeout.
535+
let work = unsafe { WorkWaker::from_waker(cx.waker()) };
536+
let mut lock = work.lock().unwrap();
537+
unsafe {
538+
lock.add_queue(self.receiver.as_queue());
539+
}
540+
lock.timeout = self.timeout;
541+
542+
Poll::Pending
543+
}
544+
}
545+
393546
/// The "flavor" of a receiver. This maps to the type of the channel.
394547
enum ReceiverFlavor<T> {
395548
/// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt.

zephyr/src/sys/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::time::Timeout;
2323

2424
/// A wrapper around a Zephyr `k_queue` object.
2525
pub struct Queue {
26-
item: Fixed<k_queue>,
26+
pub(crate) item: Fixed<k_queue>,
2727
}
2828

2929
unsafe impl Sync for StaticKernelObject<k_queue> { }

zephyr/src/sys/sync/semaphore.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use core::fmt;
1818
#[cfg(CONFIG_RUST_ALLOC)]
1919
use core::mem;
2020

21-
use crate::printkln;
21+
use zephyr_sys::ETIMEDOUT;
22+
2223
use crate::time::NoWait;
2324
use crate::work::futures::WorkWaker;
2425
use crate::{
@@ -130,13 +131,14 @@ impl<'a> Future for SemTake<'a> {
130131
type Output = Result<()>;
131132

132133
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134+
// Always check if data is available.
135+
if let Ok(()) = self.sem.take(NoWait) {
136+
return Poll::Ready(Ok(()));
137+
}
138+
133139
if self.ran {
134-
// The worker ran, we don't actually care about the reason we were awoken, as we can
135-
// just do the take on the semaphore with NoWait, and use that as our result. This
136-
// eliminates the race where we wake due to timeout, but the semaphore is given before
137-
// we are actually run.
138-
printkln!("Semaphore kpoll woke and we're reading");
139-
return Poll::Ready(self.sem.take(NoWait));
140+
// If we ran once, and still don't have any data, indicate this as a timeout.
141+
return Poll::Ready(Err(crate::Error(ETIMEDOUT)));
140142

141143
}
142144

@@ -148,7 +150,7 @@ impl<'a> Future for SemTake<'a> {
148150
// Future won't outlive the semaphore.
149151
lock.add_semaphore(self.sem);
150152
}
151-
self.timeout = self.timeout.into();
153+
lock.timeout = self.timeout;
152154
self.ran = true;
153155

154156
Poll::Pending

zephyr/src/work.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ impl<'a> Future for SignalWait<'a> {
321321
// Future can't outlive the Signal.
322322
lock.add_signal(self.signal);
323323
}
324-
self.timeout = self.timeout.into();
324+
lock.timeout = self.timeout;
325325
self.ran = true;
326326

327327
Poll::Pending

zephyr/src/work/futures.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ extern crate alloc;
1616
use core::{cell::UnsafeCell, ffi::{c_int, c_void}, future::Future, mem, pin::Pin, ptr::{self, NonNull}, task::{Context, Poll, Waker}};
1717

1818
use arrayvec::ArrayVec;
19-
use zephyr_sys::{k_poll_event, k_poll_event_init, k_poll_modes_K_POLL_MODE_NOTIFY_ONLY, k_work, k_work_poll, k_work_poll_init, k_work_poll_submit, k_work_poll_submit_to_queue, k_work_q, ZR_POLL_TYPE_SEM_AVAILABLE, ZR_POLL_TYPE_SIGNAL};
19+
use zephyr_sys::{k_poll_event, k_poll_event_init, k_poll_modes_K_POLL_MODE_NOTIFY_ONLY, k_work, k_work_poll, k_work_poll_init, k_work_poll_submit, k_work_poll_submit_to_queue, k_work_q, ZR_POLL_TYPE_DATA_AVAILABLE, ZR_POLL_TYPE_SEM_AVAILABLE, ZR_POLL_TYPE_SIGNAL};
2020

21-
use crate::{printkln, sync::{Arc, Mutex}, sys::sync::Semaphore, task::Wake, time::{Duration, Forever, NoWait, Tick, Timeout}};
21+
use crate::{printkln, sync::{Arc, Mutex}, sys::{queue::Queue, sync::Semaphore}, task::Wake, time::{Duration, Forever, NoWait, Tick, Timeout}};
2222

2323
use super::{Signal, SubmitResult, WorkQueue};
2424

@@ -385,6 +385,22 @@ impl WorkWaker {
385385
);
386386
}
387387
}
388+
389+
/// Add an event that represents waiting for a queue to have a message.
390+
pub unsafe fn add_queue<'a>(&'a mut self, queue: &'a Queue) {
391+
// SAFETY: Fill with zeroed memory, initializatuon happens in the init function next.
392+
self.events.push(unsafe { mem::zeroed() });
393+
let ev = self.events.last().unwrap();
394+
395+
unsafe {
396+
k_poll_event_init(
397+
ev.get(),
398+
ZR_POLL_TYPE_DATA_AVAILABLE,
399+
k_poll_modes_K_POLL_MODE_NOTIFY_ONLY as i32,
400+
queue.item.get() as *mut c_void,
401+
);
402+
}
403+
}
388404
}
389405

390406
/// To avoid having to parameterize everything, we limit the size of the ArrayVec of events to

0 commit comments

Comments
 (0)