From 1edfe8bd71b200c53483f48ad58c65ec95161ec9 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 28 Jul 2025 11:18:36 +0800 Subject: [PATCH 01/22] mctp-estack: Remove Stack mtu argument The stack now defaults to 64 (MCTP protocol minimum MTU). send_message() calls can provide a larger value if desired. Signed-off-by: Matt Johnston --- mctp-estack/src/fragment.rs | 22 ++++++++++------------ mctp-estack/src/lib.rs | 19 ++++--------------- standalone/src/serial.rs | 3 +-- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/mctp-estack/src/fragment.rs b/mctp-estack/src/fragment.rs index 5f4298c..c19d1c9 100644 --- a/mctp-estack/src/fragment.rs +++ b/mctp-estack/src/fragment.rs @@ -10,7 +10,7 @@ use crate::fmt::{debug, error, info, trace, warn}; use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag}; -use crate::{AppCookie, MctpHeader, MAX_MTU}; +use crate::{AppCookie, MctpHeader}; /// Fragments a MCTP message. /// @@ -44,15 +44,10 @@ impl Fragmenter { } debug_assert!(typ.0 & 0x80 == 0, "IC bit's set in typ"); debug_assert!(initial_seq & !mctp::MCTP_SEQ_MASK == 0); - if mtu < MctpHeader::LEN + 1 { + if mtu < mctp::MCTP_MIN_MTU { debug!("mtu too small"); return Err(Error::BadArgument); } - if mtu > MAX_MTU { - debug!("mtu too large"); - return Err(Error::BadArgument); - } - // TODO other validity checks let header = MctpHeader { dest, @@ -93,6 +88,8 @@ impl Fragmenter { /// /// The same input message `payload` should be passed to each `fragment()` call. /// In `SendOutput::Packet(buf)`, `out` is borrowed as the returned fragment, filled with packet contents. + /// + /// `out` must be at least as large as the specified `mtu`. pub fn fragment<'f>( &mut self, payload: &[u8], @@ -102,11 +99,11 @@ impl Fragmenter { return SendOutput::success(self); } - // first fragment needs type byte - let min = MctpHeader::LEN + self.header.som as usize; - - if out.len() < min { - return SendOutput::failure(Error::NoSpace, self); + // Require at least MTU buffer size, to ensure that all non-end + // fragments are the same size per the spec. + if out.len() < self.mtu { + debug!("small out buffer"); + return SendOutput::failure(Error::BadArgument, self); } // Reserve header space, the remaining buffer keeps being @@ -123,6 +120,7 @@ impl Fragmenter { if payload.len() < self.payload_used { // Caller is passing varying payload buffers + debug!("varying payload"); return SendOutput::failure(Error::BadArgument, self); } diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 9d8f345..3661db8 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -164,8 +164,6 @@ pub struct Stack { /// cached next expiry time from update() next_timeout: u64, - mtu: usize, - // Arbitrary counter to make tag allocation more variable. next_tag: u8, @@ -180,22 +178,15 @@ impl Stack { /// /// `now_millis` is the current timestamp, the same style as would be /// passed to `update_clock()`. - /// - /// `mtu` is the default MTU of the stack. Specific [`start_send()`](Self::start_send) - /// calls may use a smaller MTU if needed (for example a per-link or per-EID MTU). - /// `new()` will panic if a MTU smaller than 5 is given (minimum MCTP header and type byte). - pub fn new(own_eid: Eid, mtu: usize, now_millis: u64) -> Self { + pub fn new(own_eid: Eid, now_millis: u64) -> Self { let now = EventStamp { clock: now_millis, counter: 0, }; - assert!(mtu >= MctpHeader::LEN + 1); - Self { own_eid, now, next_timeout: 0, - mtu, flows: Default::default(), reassemblers: Default::default(), next_tag: 0, @@ -298,7 +289,7 @@ impl Stack { /// /// Returns a [`Fragmenter`] that will packetize the message. /// - /// `mtu` is an optional override, will be the min of the stack MTU and the argument. + /// `mtu` is optional, the default and minimum is 64 (MCTP protocol minimum). /// /// The provided cookie will be returned when `send_fill()` completes. /// @@ -330,10 +321,8 @@ impl Stack { Some(Tag::Unowned(tv)) => Tag::Unowned(tv), }; - let mut frag_mtu = self.mtu; - if let Some(m) = mtu { - frag_mtu = frag_mtu.min(m); - } + let frag_mtu = mtu.unwrap_or(mctp::MCTP_MIN_MTU); + // mtu size checked by Fragmenter::new() // Vary the starting seq self.next_seq = (self.next_seq + 1) & mctp::MCTP_SEQ_MASK; diff --git a/standalone/src/serial.rs b/standalone/src/serial.rs index ac782a0..572f097 100644 --- a/standalone/src/serial.rs +++ b/standalone/src/serial.rs @@ -31,8 +31,7 @@ struct Inner { impl Inner { fn new(own_eid: Eid, serial: S) -> Self { let start_time = Instant::now(); - let todo_mtu = 64; - let mctp = Stack::new(own_eid, todo_mtu, 0); + let mctp = Stack::new(own_eid, 0); let mctpserial = MctpSerialHandler::new(); Self { mctp, From 4f9000384ae97017c3f0023fd8f0af17067a8779 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Wed, 11 Jun 2025 09:32:22 +0800 Subject: [PATCH 02/22] mctp-estack: Make PortLookup take const references A const reference will make it easier to update the PortLookup externally. Implementations can use internal mutability with Cell if needed. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 9438ab2..d9287bc 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -49,7 +49,7 @@ type PortRawMutex = embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; pub struct PortId(pub u8); /// A trait implemented by applications to determine the routing table. -pub trait PortLookup: Send { +pub trait PortLookup: Sync + Send { /// Returns the `PortId` for a destination EID. /// /// `PortId` is an index into the array of `ports` provided to [`Router::new`] @@ -60,11 +60,7 @@ pub trait PortLookup: Send { /// /// `source_port` is the incoming interface of a forwarded packet, /// or `None` for locally generated packets. - fn by_eid( - &mut self, - eid: Eid, - source_port: Option, - ) -> Option; + fn by_eid(&self, eid: Eid, source_port: Option) -> Option; } /// Used like `heapless::Vec`, but lets the mut buffer be written into @@ -416,7 +412,7 @@ pub struct RouterInner<'r> { /// Core MCTP stack stack: Stack, - lookup: &'r mut dyn PortLookup, + lookup: &'r dyn PortLookup, } impl<'r> Router<'r> { @@ -431,7 +427,7 @@ impl<'r> Router<'r> { pub fn new( stack: Stack, ports: &'r [PortTop<'r>], - lookup: &'r mut dyn PortLookup, + lookup: &'r dyn PortLookup, ) -> Self { let inner = RouterInner { stack, lookup }; From ed6a83b43e0e40140bae87d806aad359a7768bbe Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 28 Jul 2025 11:43:13 +0800 Subject: [PATCH 03/22] mctp-estack: Temporarily vendor zerocopy_channel This is temporarily copied from embassy-sync, with modifications to add FixedChannel and Channel sender()/receiver(). It will be replaced once upstream. https://github.com/embassy-rs/embassy/pull/4299 Based on upstream 6186d111a5c150946ee5b7e9e68d987a38c1a463 plus 0ab37d4dbdcf ("embassy-sync: zerocopy Channel sender()/receiver()") e9db18bf5051 ("embassy-sync: Add zerocopy_channel::FixedChannel") Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 8 +- mctp-estack/src/router.rs | 2 +- mctp-estack/src/zerocopy_channel.rs | 767 ++++++++++++++++++++++++++++ 3 files changed, 775 insertions(+), 2 deletions(-) create mode 100644 mctp-estack/src/zerocopy_channel.rs diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 3661db8..56089a4 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -26,7 +26,9 @@ //! These can be configured at build time, see [`config`] #![cfg_attr(not(feature = "std"), no_std)] -#![forbid(unsafe_code)] +// Temporarily relaxed for zerocopy_channel vendored code. +#![deny(unsafe_code)] +// #![forbid(unsafe_code)] #![allow(clippy::int_plus_one)] #![allow(clippy::too_many_arguments)] // defmt does not currently allow inline format arguments, so we don't want @@ -65,6 +67,10 @@ pub mod usb; mod util; mod proto; +#[rustfmt::skip] +#[allow(clippy::needless_lifetimes)] +mod zerocopy_channel; + use fragment::{Fragmenter, SendOutput}; use reassemble::Reassembler; pub use router::Router; diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index d9287bc..04cf16f 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -20,8 +20,8 @@ use crate::{ }; use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue}; +use crate::zerocopy_channel::{Channel, Receiver, Sender}; use embassy_sync::waitqueue::WakerRegistration; -use embassy_sync::zerocopy_channel::{Channel, Receiver, Sender}; use heapless::{Entry, FnvIndexMap, Vec}; diff --git a/mctp-estack/src/zerocopy_channel.rs b/mctp-estack/src/zerocopy_channel.rs new file mode 100644 index 0000000..bfeec78 --- /dev/null +++ b/mctp-estack/src/zerocopy_channel.rs @@ -0,0 +1,767 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 +/* Copyright (c) Embassy project contributors */ + +/* This is temporarily copied from embassy-sync, with modifications to add + * FixedChannel and Channel sender()/receiver(). It will be replaced + * once upstream. + * https://github.com/embassy-rs/embassy/pull/4299 + * + * Based on upstream 6186d111a5c150946ee5b7e9e68d987a38c1a463 + * plus + * 0ab37d4dbdcf ("embassy-sync: zerocopy Channel sender()/receiver()") + * e9db18bf5051 ("embassy-sync: Add zerocopy_channel::FixedChannel") + */ + +#![allow(unsafe_code)] +#![allow(dead_code)] + +//! A zero-copy queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by a producer (sender) and a +//! consumer (receiver), i.e. it is an "SPSC channel". +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. + +use core::cell::{RefCell, UnsafeCell}; +use core::future::{poll_fn, Future}; +use core::marker::PhantomData; +use core::task::{Context, Poll}; + +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::blocking_mutex::Mutex; +use embassy_sync::waitqueue::WakerRegistration; + +struct ChannelInner { + state: Mutex>>, +} + +impl ChannelInner { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. + fn new(len: usize, buf: BufferPtr) -> Self { + assert!(len != 0); + + Self { + state: Mutex::new(RefCell::new(State { + capacity: len, + buf, + front: 0, + back: 0, + full: false, + send_waker: WakerRegistration::new(), + receive_waker: WakerRegistration::new(), + have_sender: false, + have_receiver: false, + })), + } + } + + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + let mut s = self.state.get_mut().borrow_mut(); + // We can unconditionally add a sender/receiver since + // split() takes a mut reference, so there must be no + // existing Sender or Receiver. + s.add_sender(); + s.add_receiver(); + drop(s); + (Sender { channel: self }, Receiver { channel: self }) + } + + /// Create a [`Receiver`] from an existing channel. + /// + /// Only one `Receiver` may be borrowed. + pub fn receiver(&self) -> Option> { + self.state + .lock(|s| s.borrow_mut().add_receiver()) + .then(|| Receiver { channel: self }) + } + + /// Create a [`Sender`] from an existing channel. + /// + /// Only one `Sender` may be borrowed. + pub fn sender(&self) -> Option> { + self.state + .lock(|s| s.borrow_mut().add_sender()) + .then(|| Sender { channel: self }) + } + + /// Clears all elements in the channel. + pub fn clear(&mut self) { + self.state.get_mut().borrow_mut().clear(); + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.state.lock(|s| s.borrow().len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.state.lock(|s| s.borrow().is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.state.lock(|s| s.borrow().is_full()) + } +} + +#[repr(transparent)] +struct BufferPtr(*mut T); + +impl BufferPtr { + unsafe fn add(&self, count: usize) -> *mut T { + self.0.add(count) + } +} + +unsafe impl Send for BufferPtr {} +unsafe impl Sync for BufferPtr {} + +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. Uses a borrowed buffer. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel requires a buffer of recyclable elements. Writing to the channel is done through +/// an `&mut T`. +pub struct Channel<'a, M: RawMutex, T> { + channel: ChannelInner, + phantom: PhantomData<&'a mut T>, +} + +impl<'a, M: RawMutex, T> Channel<'a, M, T> { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. + pub fn new(buf: &'a mut [T]) -> Self { + Self { + channel: ChannelInner::new(buf.len(), BufferPtr(buf.as_mut_ptr())), + phantom: PhantomData, + } + } + + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + self.channel.split() + } + + /// Create a [`Receiver`] from an existing channel. + /// + /// Only one `Receiver` may be borrowed. + pub fn receiver(&self) -> Option> { + self.channel.receiver() + } + + /// Create a [`Sender`] from an existing channel. + /// + /// Only one `Sender` may be borrowed. + pub fn sender(&self) -> Option> { + self.channel.sender() + } + + /// Clears all elements in the channel. + pub fn clear(&mut self) { + self.channel.clear() + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() + } +} + +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. Uses a local buffer. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel uses an internal buffer of `N` elements, they must implement +/// `Default` for initial placeholders. +// TODO could make buf MaybeUninit then write through that? +pub struct FixedChannel { + channel: ChannelInner, + // Storage must not be accessed directly, only in update_ptr() + storage: Storage, +} + +// Storage is always accessed locked by channel +// +// The storage is safe from aliasing because only a single Sender or Reciver +// can access any array element at a time. +// +// It is safe to implement Sync and Safe because any Sender or Receiver will +// borrow from the ChannelInner (having the same lifetime as storage), and storage +// is only manipulated through Sender and Receiver. +#[repr(transparent)] +struct Storage(UnsafeCell<[T; N]>); +unsafe impl Sync for Storage {} +unsafe impl Send for Storage {} + +impl FixedChannel { + /// Initialize a new [`FixedChannel`]. + pub fn new() -> Self { + // Initial pointer is null, set before use with update_ptr() + let channel = ChannelInner::new(N, BufferPtr(core::ptr::null_mut())); + Self { + channel, + storage: Storage(UnsafeCell::new([(); N].map(|_| Default::default()))), + } + } +} + +impl Default for FixedChannel { + fn default() -> Self { + Self::new() + } +} + +impl FixedChannel { + /// Initialize a new [`FixedChannel`]. + /// + /// This take an initial buffer value to clone. + pub fn new_cloned(initial: &T) -> Self { + // Initial pointer is null, set before use with update_ptr() + let channel = ChannelInner::new(N, BufferPtr(core::ptr::null_mut())); + Self { + channel, + storage: Storage(UnsafeCell::new([(); N].map(|_| initial.clone()))), + } + } +} + +impl FixedChannel { + /// Update the buf pointer. + /// + /// This must occur before each Sender/Receiver borrow to ensure it's not stale. + /// The lifetime of Sender/Receiver guarantees that it won't go stale + /// while one of them is active, and buf is only used by a Sender or Receiver. + fn update_ptr(&self) { + self.channel.state.lock(|s| { + // Point to first storage array element + s.borrow_mut().buf = BufferPtr(self.storage.0.get() as *mut T); + }); + } + + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + self.update_ptr(); + self.channel.split() + } + + /// Create a [`Receiver`] from an existing channel. + /// + /// Only one `Receiver` may be borrowed. + pub fn receiver(&self) -> Option> { + self.update_ptr(); + self.channel.receiver() + } + + /// Create a [`Sender`] from an existing channel. + /// + /// Only one `Sender` may be borrowed. + pub fn sender(&self) -> Option> { + self.update_ptr(); + self.channel.sender() + } + + /// Clears all elements in the channel. + pub fn clear(&mut self) { + self.channel.clear() + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() + } +} + +/// Send-only access to a [`Channel`] or [`FixedChannel`]. +pub struct Sender<'a, M: RawMutex, T> { + channel: &'a ChannelInner, +} + +impl<'a, M: RawMutex, T> Sender<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. + pub fn borrow(&mut self) -> Sender<'_, M, T> { + Sender { channel: self.channel } + } + + /// Attempts to send a value over the channel. + pub fn try_send(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Some(unsafe { &mut *s.buf.add(i) }), + None => None, + } + }) + } + + /// Attempts to send a value over the channel. + pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(unsafe { &mut *s.buf.add(i) }), + None => { + s.receive_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + /// Asynchronously send a value over the channel. + pub fn send(&mut self) -> impl Future { + poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => { + let r = unsafe { &mut *s.buf.add(i) }; + Poll::Ready(r) + } + None => { + s.receive_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + } + + /// Notify the channel that the sending of the value has been finalized. + pub fn send_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().push_done()) + } + + /// Clears all elements in the channel. + pub fn clear(&mut self) { + self.channel.state.lock(|s| { + s.borrow_mut().clear(); + }); + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.channel.state.lock(|s| s.borrow().len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.channel.state.lock(|s| s.borrow().is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.channel.state.lock(|s| s.borrow().is_full()) + } +} + +impl Drop for Sender<'_, M, T> { + fn drop(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().remove_sender()) + } +} + +/// Receive-only access to a [`Channel`] or [`FixedChannel`]. +pub struct Receiver<'a, M: RawMutex, T> { + channel: &'a ChannelInner, +} + +impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + /// Creates one further [`Receiver`] over the same channel. + pub fn borrow(&mut self) -> Receiver<'_, M, T> { + Receiver { channel: self.channel } + } + + /// Attempts to receive a value over the channel. + pub fn try_receive(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Some(unsafe { &mut *s.buf.add(i) }), + None => None, + } + }) + } + + /// Attempts to asynchronously receive a value over the channel. + pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(unsafe { &mut *s.buf.add(i) }), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + /// Asynchronously receive a value over the channel. + pub fn receive(&mut self) -> impl Future { + poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => { + let r = unsafe { &mut *s.buf.add(i) }; + Poll::Ready(r) + } + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + } + + /// Notify the channel that the receiving of the value has been finalized. + pub fn receive_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().pop_done()) + } + + /// Clears all elements in the channel. + pub fn clear(&mut self) { + self.channel.state.lock(|s| { + s.borrow_mut().clear(); + }); + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.channel.state.lock(|s| s.borrow().len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.channel.state.lock(|s| s.borrow().is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.channel.state.lock(|s| s.borrow().is_full()) + } +} + +impl Drop for Receiver<'_, M, T> { + fn drop(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().remove_receiver()) + } +} + +struct State { + /// Maximum number of elements the channel can hold. + capacity: usize, + + /// Pointer to the channel's buffer. + /// + /// Will always/only be valid when a Sender or Receiver + /// is borrowed. + buf: BufferPtr, + + /// Front index. Always 0..=(N-1) + front: usize, + /// Back index. Always 0..=(N-1). + back: usize, + + /// Used to distinguish "empty" and "full" cases when `front == back`. + /// May only be `true` if `front == back`, always `false` otherwise. + full: bool, + + send_waker: WakerRegistration, + receive_waker: WakerRegistration, + + have_receiver: bool, + have_sender: bool, +} + +impl State { + fn increment(&self, i: usize) -> usize { + if i + 1 == self.capacity { + 0 + } else { + i + 1 + } + } + + fn clear(&mut self) { + if self.full { + self.receive_waker.wake(); + } + self.front = 0; + self.back = 0; + self.full = false; + } + + fn len(&self) -> usize { + if !self.full { + if self.back >= self.front { + self.back - self.front + } else { + self.capacity + self.back - self.front + } + } else { + self.capacity + } + } + + fn is_full(&self) -> bool { + self.full + } + + fn is_empty(&self) -> bool { + self.front == self.back && !self.full + } + + fn push_index(&mut self) -> Option { + match self.is_full() { + true => None, + false => Some(self.back), + } + } + + fn push_done(&mut self) { + assert!(!self.is_full()); + self.back = self.increment(self.back); + if self.back == self.front { + self.full = true; + } + self.send_waker.wake(); + } + + fn pop_index(&mut self) -> Option { + match self.is_empty() { + true => None, + false => Some(self.front), + } + } + + fn pop_done(&mut self) { + assert!(!self.is_empty()); + self.front = self.increment(self.front); + self.full = false; + self.receive_waker.wake(); + } + + // Returns true if a sender was added, false if one already existed + fn add_sender(&mut self) -> bool { + !core::mem::replace(&mut self.have_sender, true) + } + + fn remove_sender(&mut self) { + debug_assert!(self.have_sender); + self.have_sender = false; + } + + // Returns true if a receiver was added, false if one already existed + fn add_receiver(&mut self) -> bool { + !core::mem::replace(&mut self.have_receiver, true) + } + + fn remove_receiver(&mut self) { + debug_assert!(self.have_receiver); + self.have_receiver = false; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + extern crate std; + use core::ops::{Deref, DerefMut}; + use std::boxed::Box; + + #[test] + fn split() { + let mut buf = [0u32; 10]; + let mut c = Channel::::new(&mut buf); + + let (mut s, mut r) = c.split(); + + *s.try_send().unwrap() = 4; + s.send_done(); + + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + r.receive_done(); + let b = r.try_receive(); + assert!(b.is_none()); + } + + #[test] + fn sender_receiver() { + let mut buf = [0u32; 10]; + let c = Channel::::new(&mut buf); + + let mut s = c.sender().unwrap(); + assert!(c.sender().is_none(), "can't borrow again"); + + *s.try_send().unwrap() = 4; + s.send_done(); + drop(s); + + // borrow again + let mut s = c.sender().unwrap(); + *s.try_send().unwrap() = 5; + s.send_done(); + + let mut r = c.receiver().unwrap(); + assert!(c.receiver().is_none(), "can't borrow again"); + + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + r.receive_done(); + + drop(r); + // borrow again + let mut r = c.receiver().unwrap(); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 5); + r.receive_done(); + let b = r.try_receive(); + assert!(b.is_none()); + } + + #[test] + fn fixed() { + let c = FixedChannel::::new(); + + let mut s = c.sender().unwrap(); + assert!(c.sender().is_none(), "can't borrow again"); + assert!(s.is_empty()); + + *s.try_send().unwrap() = 4; + assert!(s.is_empty()); + s.send_done(); + assert!(!s.is_empty()); + drop(s); + + let mut s = c.sender().unwrap(); + *s.try_send().unwrap() = 5; + assert!(!s.is_full()); + s.send_done(); + assert!(s.try_send().is_none(), "queue is full"); + assert!(s.is_full()); + + let mut r = c.receiver().unwrap(); + assert!(c.receiver().is_none(), "can't borrow again"); + + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 4); + r.receive_done(); + + drop(r); + let mut r = c.receiver().unwrap(); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 5); + assert!(!s.is_empty()); + r.receive_done(); + assert!(s.is_empty()); + let b = r.try_receive(); + assert!(b.is_none()); + + // Later send works + *s.try_send().unwrap() = 6; + assert!(r.is_empty()); + s.send_done(); + assert!(!r.is_empty()); + } + + #[test] + fn fixed_move() { + // Check that the buffer pointer updates if moved + let c = FixedChannel::::new_cloned(&123); + + let p1 = &c as *const _; + let mut s = c.sender().unwrap(); + *s.try_send().unwrap() = 99u32; + s.send_done(); + drop(s); + + let mut cbox = Box::new(Some(c)); + let c = cbox.deref().as_ref().unwrap(); + let p2 = c as *const _; + + let mut r = c.receiver().unwrap(); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 99); + r.receive_done(); + drop(r); + + let mut cbox = Box::new(cbox.take()); + let c = cbox.deref_mut().as_mut().unwrap(); + let p3 = c as *const _; + + let (mut s, mut r) = c.split(); + *s.try_send().unwrap() = 44; + s.send_done(); + let b = r.try_receive().unwrap(); + assert_eq!(*b, 44); + + assert!(p1 != p2, "Ensure data moved"); + assert!(p1 != p3, "Ensure data moved"); + assert!(p2 != p3, "Ensure data moved"); + } +} From 14b885c4413529be59e1d7f25b89d9549b44b28c Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 15:45:47 +0800 Subject: [PATCH 04/22] mctp-estack: Add critical-section dev-dependency This will be required for tests, using std feature. Signed-off-by: Matt Johnston --- Cargo.lock | 1 + mctp-estack/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ced7b73..580711c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -892,6 +892,7 @@ name = "mctp-estack" version = "0.1.0" dependencies = [ "crc", + "critical-section", "defmt", "embassy-sync", "embedded-io-adapters", diff --git a/mctp-estack/Cargo.toml b/mctp-estack/Cargo.toml index 2615fd9..6266ff3 100644 --- a/mctp-estack/Cargo.toml +++ b/mctp-estack/Cargo.toml @@ -20,6 +20,7 @@ smbus-pec = { version = "1.0", features = ["lookup-table"] } uuid = { workspace = true } [dev-dependencies] +critical-section = { version = "1.2.0", features = ["std"] } embedded-io-adapters = { workspace = true } env_logger = { workspace = true } proptest = { workspace = true } From dac69c43ffe23cdc8912c8c2f9a61a6c2078476c Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 6 Jun 2025 15:23:46 +0800 Subject: [PATCH 05/22] mctp-estack: Separate lifetimes for Router Previously RouterAsyncReqChannel etc used &'r Router<'r>, which is only usable for static lifetimes. Use a different lifetime for the reference. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 40 ++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 04cf16f..4147bbc 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -674,14 +674,17 @@ impl<'r> Router<'r> { } /// Create a `AsyncReqChannel` instance. - pub fn req(&'r self, eid: Eid) -> RouterAsyncReqChannel<'r> { + pub fn req(&self, eid: Eid) -> RouterAsyncReqChannel<'_, 'r> { RouterAsyncReqChannel::new(eid, self) } /// Create a `AsyncListener` instance. /// /// Will receive incoming messages with the TO bit set for the given `typ`. - pub fn listener(&'r self, typ: MsgType) -> Result> { + pub fn listener( + &self, + typ: MsgType, + ) -> Result> { let cookie = self.app_bind(typ)?; Ok(RouterAsyncListener { cookie, @@ -703,20 +706,20 @@ impl<'r> Router<'r> { } /// A request channel. -pub struct RouterAsyncReqChannel<'r> { +pub struct RouterAsyncReqChannel<'g, 'r> { /// Destination EID eid: Eid, /// Tag from the last `send()`. /// /// Cleared upon receiving a response, except in the case of !tag_expires. last_tag: Option, - router: &'r Router<'r>, + router: &'g Router<'r>, tag_expires: bool, cookie: Option, } -impl<'r> RouterAsyncReqChannel<'r> { - fn new(eid: Eid, router: &'r Router<'r>) -> Self { +impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { + fn new(eid: Eid, router: &'g Router<'r>) -> Self { RouterAsyncReqChannel { eid, last_tag: None, @@ -756,7 +759,7 @@ impl<'r> RouterAsyncReqChannel<'r> { } } -impl Drop for RouterAsyncReqChannel<'_> { +impl Drop for RouterAsyncReqChannel<'_, '_> { fn drop(&mut self) { if !self.tag_expires && self.last_tag.is_some() { warn!("Didn't call async_drop()"); @@ -770,7 +773,7 @@ impl Drop for RouterAsyncReqChannel<'_> { /// A request channel /// /// Created with [`Router::req()`](Router::req). -impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_> { +impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_, '_> { /// Send a message. /// /// This will async block until the message has been enqueued to the physical port. @@ -862,16 +865,16 @@ impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_> { /// A response channel. /// /// Returned by [`RouterAsyncListener::recv`](mctp::AsyncListener::recv). -pub struct RouterAsyncRespChannel<'r> { +pub struct RouterAsyncRespChannel<'g, 'r> { eid: Eid, tv: TagValue, - router: &'r Router<'r>, + router: &'g Router<'r>, typ: MsgType, } -impl<'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'r> { +impl<'g, 'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'g, 'r> { type ReqChannel<'a> - = RouterAsyncReqChannel<'r> + = RouterAsyncReqChannel<'g, 'r> where Self: 'a; @@ -902,7 +905,7 @@ impl<'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'r> { self.eid } - fn req_channel(&self) -> mctp::Result> { + fn req_channel(&self) -> mctp::Result> { Ok(RouterAsyncReqChannel::new(self.eid, self.router)) } } @@ -910,15 +913,14 @@ impl<'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'r> { /// A listener. /// /// Created with [`Router::listener()`](Router::listener). -pub struct RouterAsyncListener<'r> { - router: &'r Router<'r>, +pub struct RouterAsyncListener<'g, 'r> { + router: &'g Router<'r>, cookie: AppCookie, } -impl<'r> mctp::AsyncListener for RouterAsyncListener<'r> { - // type RespChannel<'a> = RouterAsyncRespChannel<'a> where Self: 'a; +impl<'g, 'r> mctp::AsyncListener for RouterAsyncListener<'g, 'r> { type RespChannel<'a> - = RouterAsyncRespChannel<'r> + = RouterAsyncRespChannel<'g, 'r> where Self: 'a; @@ -945,7 +947,7 @@ impl<'r> mctp::AsyncListener for RouterAsyncListener<'r> { } } -impl Drop for RouterAsyncListener<'_> { +impl Drop for RouterAsyncListener<'_, '_> { fn drop(&mut self) { self.router.app_unbind(self.cookie) } From 66d1d42218715377ab2be441effedd1fc4566392 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 10 Jun 2025 14:47:45 +0800 Subject: [PATCH 06/22] mctp-estack: Move packet queue into PortTop Now the application provides PortTop mut references to the Router. Router now has a single lifetime rather than the previous chain of lifetimes (also for the zero_copy_channel::Channel). PortStorage has been replaced by PortTop. PortBottom is now renamed to Port. This uses zerocopy_channel::FixedChannel. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 16 ++++ mctp-estack/src/router.rs | 158 +++++++++++++++----------------- mctp-usb-embassy/src/mctpusb.rs | 24 ++--- 3 files changed, 100 insertions(+), 98 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 56089a4..893f338 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -119,6 +119,8 @@ pub mod config { /// Customise with `MCTP_ESTACK_NUM_RECEIVE` environment variable. /// Number of outstanding waiting responses, default 64 pub const NUM_RECEIVE: usize = get_build_var!("MCTP_ESTACK_NUM_RECEIVE", 4); + + /// Maximum number of incoming flows, default 64. /// /// After a message is sent with Tag Owner (TO) bit set, the stack will accept /// response messages with the same tag and TO _unset_. `FLOWS` defines @@ -136,6 +138,20 @@ pub mod config { pub const MAX_MTU: usize = get_build_var!("MCTP_ESTACK_MAX_MTU", 255); const _: () = assert!(MAX_MTU >= crate::MctpHeader::LEN + 1, "MAX_MTU too small"); + + /// Per-port transmit queue length, default 4. + /// + /// This applies to [`Router`](crate::Router). + /// Each port will use `PORT_TXQUEUE` * `MAX_MTU` buffer space. + /// + /// Customise with `MCTP_ESTACK_PORT_TXQUEUE` environment variable. + pub const PORT_TXQUEUE: usize = + get_build_var!("MCTP_ESTACK_PORT_TXQUEUE", 4); + + /// Maximum number of ports for [`Router`](crate::Router), default 1. + /// + /// Customise with `MCTP_ESTACK_MAX_PORTS` environment variable. + pub const MAX_PORTS: usize = get_build_var!("MCTP_ESTACK_MAX_PORTS", 2); } #[derive(Debug)] diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 4147bbc..6e7eb79 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -15,12 +15,12 @@ use core::pin::pin; use core::task::{Poll, Waker}; use crate::{ - AppCookie, Fragmenter, MctpHeader, MctpMessage, SendOutput, Stack, MAX_MTU, - MAX_PAYLOAD, + config, AppCookie, Fragmenter, MctpHeader, MctpMessage, SendOutput, Stack, + MAX_MTU, MAX_PAYLOAD, }; use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue}; -use crate::zerocopy_channel::{Channel, Receiver, Sender}; +use crate::zerocopy_channel::{FixedChannel, Receiver}; use embassy_sync::waitqueue::WakerRegistration; use heapless::{Entry, FnvIndexMap, Vec}; @@ -88,6 +88,12 @@ impl PktBuf { } } +impl Default for PktBuf { + fn default() -> Self { + Self::new() + } +} + impl core::ops::Deref for PktBuf { type Target = [u8]; @@ -99,15 +105,36 @@ impl core::ops::Deref for PktBuf { /// The "producer" side of a queue of packets to send out a MCTP port/interface. /// /// It will be used by `Routing` to enqueue packets to a port. -pub struct PortTop<'a> { +pub struct PortTop { /// Forwarded packet queue. - /// The outer mutex will not be held over an await. - packets: AsyncMutex>, + channel: FixedChannel, + send_mutex: AsyncMutex<()>, mtu: usize, } -impl PortTop<'_> { +impl PortTop { + pub fn new(mtu: usize) -> Result { + if mtu > MAX_MTU { + debug!("port mtu {} > MAX_MTU {}", mtu, MAX_MTU); + return Err(Error::BadArgument); + } + + Ok(Self { + channel: FixedChannel::new(), + send_mutex: AsyncMutex::new(()), + mtu, + }) + } + + /// Return the bottom half port for the channel. + /// + /// Applications call Router::port(). + /// Returns None if already borrowed. `id` is not checked, just passed through. + fn bottom(&self, id: PortId) -> Option> { + self.channel.receiver().map(|packets| Port { packets, id }) + } + /// Enqueues a packet. /// /// Do not call with locks held. @@ -116,14 +143,16 @@ impl PortTop<'_> { async fn forward_packet(&self, pkt: &[u8]) -> Result<()> { debug_assert!(MctpHeader::decode(pkt).is_ok()); - let mut sender = self.packets.lock().await; - // Note: must not await while holding `sender` - // Check space first (can't rollback after try_send) if pkt.len() > self.mtu { debug!("Forward packet too large"); return Err(Error::NoSpace); } + // With forwarded packets we don't want to block if + // the queue is full (we drop packets instead). + // Don't hold this sender across any await. + let _l = self.send_mutex.lock().await; + let mut sender = self.channel.sender().unwrap(); // Get a slot to send let slot = sender.try_send().ok_or_else(|| { @@ -164,7 +193,9 @@ impl PortTop<'_> { }; loop { - let mut sender = self.packets.lock().await; + let _l = self.send_mutex.lock().await; + // OK to unwrap, protected by send_mutex.lock() + let mut sender = self.channel.sender().unwrap(); let qpkt = sender.send().await; qpkt.len = 0; @@ -189,12 +220,13 @@ impl PortTop<'_> { /// /// An MCTP transport implementation will read packets to send with /// [`outbound()`](Self::outbound). -pub struct PortBottom<'a> { +pub struct Port<'a> { /// packet queue packets: Receiver<'a, PortRawMutex, PktBuf>, + id: PortId, } -impl PortBottom<'_> { +impl Port<'_> { /// Retrieve an outbound packet to send for this port. /// /// Should call [`outbound_done()`](Self::outbound_done) to consume the @@ -211,6 +243,11 @@ impl PortBottom<'_> { (pkt, dest) } + /// Retrieve the `PortId`. + pub fn id(&self) -> PortId { + self.id + } + /// Attempt to retrieve an outbound packet. /// /// This is the same as [`outbound()`](Self::outbound) but returns @@ -233,65 +270,6 @@ impl PortBottom<'_> { } } -/// Storage for a Port, being a physical MCTP interface. -// TODO: instead of storing Vec, it could -// store `&'r []` and a length field, which would allow different ports -// have different MAX_MESSAGE/MAX_MTU. Does add another lifetime parameter. -pub struct PortStorage { - /// forwarded packet queue - packets: [PktBuf; FORWARD_QUEUE], -} - -impl PortStorage { - pub fn new() -> Self { - Self { - packets: [const { PktBuf::new() }; FORWARD_QUEUE], - } - } -} - -impl Default for PortStorage { - fn default() -> Self { - Self::new() - } -} - -pub struct PortBuilder<'a> { - /// forwarded packet queue - packets: Channel<'a, PortRawMutex, PktBuf>, -} - -impl<'a> PortBuilder<'a> { - pub fn new( - storage: &'a mut PortStorage, - ) -> Self { - // PortBuilder and PortStorage need to be separate structs, since - // zerocopy_channel::Channel takes a slice. - Self { - packets: Channel::new(storage.packets.as_mut_slice()), - } - } - - pub fn build( - &mut self, - mtu: usize, - ) -> Result<(PortTop<'_>, PortBottom<'_>)> { - if mtu > MAX_MTU { - debug!("port mtu {} > MAX_MTU {}", mtu, MAX_MTU); - return Err(Error::BadArgument); - } - - let (ps, pr) = self.packets.split(); - - let t = PortTop { - packets: AsyncMutex::new(ps), - mtu, - }; - let b = PortBottom { packets: pr }; - Ok((t, b)) - } -} - #[derive(Default)] struct WakerPoolInner { pool: FnvIndexMap, @@ -382,18 +360,17 @@ impl WakerPool { /// Device-provided input handlers feed input MCTP packets to /// [`inbound()`](Self::inbound). /// -/// For outbound packets each port has queue split into `PortTop` and `PortBottom`. -/// `Router` will feed packets for a port into the top, and device output handlers -/// will read from [`PortBottom`] and write out the specific MCTP transport. -/// /// [`update_time()`](Self::update_time) should be called periodically to /// handle timeouts. /// /// Packets not destined for the local EID will be forwarded out a port /// determined by the user-provided [`PortLookup`] implementation. +/// +/// Outbound packets are provided to a transport's `Port` instance, +/// returned by [`port()`](Self::port). pub struct Router<'r> { inner: AsyncMutex>, - ports: &'r [PortTop<'r>], + ports: Vec<&'r mut PortTop, { config::MAX_PORTS }>, /// Listeners for different message types. // Has a separate non-async Mutex so it can be used by RouterAsyncListener::drop() @@ -424,11 +401,7 @@ impl<'r> Router<'r> { /// of the `ports` slice are used as `PortId` identifiers. /// /// `lookup` callbacks define the routing table for outbound packets. - pub fn new( - stack: Stack, - ports: &'r [PortTop<'r>], - lookup: &'r dyn PortLookup, - ) -> Self { + pub fn new(stack: Stack, lookup: &'r dyn PortLookup) -> Self { let inner = RouterInner { stack, lookup }; let app_listeners = BlockingMutex::new(RefCell::new(Vec::new())); @@ -436,12 +409,31 @@ impl<'r> Router<'r> { Self { inner: AsyncMutex::new(inner), app_listeners, + ports: Vec::new(), recv_wakers: Default::default(), - ports, work_msg: AsyncMutex::new(Vec::new()), } } + pub fn add_port(&mut self, top: &'r mut PortTop) -> Result { + self.ports.push(top).map_err(|_| Error::NoSpace)?; + Ok(PortId((self.ports.len() - 1) as u8)) + } + + /// Return a port. + /// + /// A port may only be borrowed once (may be reborrowed after dropping). + pub fn port(&self, id: PortId) -> Result> { + let port = self.ports.get(id.0 as usize).ok_or_else(|| { + debug!("Bad port index"); + Error::BadArgument + })?; + port.bottom(id).ok_or_else(|| { + debug!("Port already borrowed"); + Error::BadArgument + }) + } + /// Called periodically to update the clock and check timeouts. /// /// A suitable interval (milliseconds) for the next call to `update_time()` will diff --git a/mctp-usb-embassy/src/mctpusb.rs b/mctp-usb-embassy/src/mctpusb.rs index 401b62f..cddea11 100644 --- a/mctp-usb-embassy/src/mctpusb.rs +++ b/mctp-usb-embassy/src/mctpusb.rs @@ -24,9 +24,7 @@ use embassy_usb_driver::{ Driver, Endpoint, EndpointIn, EndpointOut, EndpointType, }; use heapless::Vec; -use mctp_estack::{ - router::PortBottom, router::PortId, usb::MctpUsbHandler, Router, -}; +use mctp_estack::{router::Port, router::PortId, usb::MctpUsbHandler, Router}; use crate::MCTP_USB_MAX_PACKET; @@ -86,7 +84,7 @@ impl<'d, D: Driver<'d>> Sender<'d, D> { } /// Run with a `mctp::Router` stack. - pub async fn run(mut self, mut bottom: PortBottom<'_>) -> ! { + pub async fn run(mut self, mut port: Port<'_>) -> ! { // Outer loop for reattaching USB loop { debug!("mctp usb send waiting"); @@ -94,18 +92,18 @@ impl<'d, D: Driver<'d>> Sender<'d, D> { debug!("mctp usb send attached"); 'sending: loop { // Wait for at least one MCTP packet enqueued - let (pkt, _dest) = bottom.outbound().await; + let (pkt, _dest) = port.outbound().await; let r = self.feed(pkt); // Consume it - bottom.outbound_done(); + port.outbound_done(); if r.is_err() { // MCTP packet too large for USB continue 'sending; } 'fill: loop { - let Some((pkt, _dest)) = bottom.try_outbound() else { + let Some((pkt, _dest)) = port.try_outbound() else { // No more packets break 'fill; }; @@ -113,7 +111,7 @@ impl<'d, D: Driver<'d>> Sender<'d, D> { // See if it fits in the payload match self.feed(pkt) { // Success, consume it - Ok(()) => bottom.outbound_done(), + Ok(()) => port.outbound_done(), // Won't fit, leave it until next 'sending iteration. Err(_) => break 'fill, } @@ -272,14 +270,10 @@ impl<'d, D: Driver<'d>> MctpUsbClass<'d, D> { } /// Run with a `mctp::Router` stack. - pub async fn run( - self, - router: &Router<'_>, - bottom: PortBottom<'_>, - port: PortId, - ) -> ! { + pub async fn run(self, router: &Router<'_>, port: Port<'_>) -> ! { let (s, r) = self.split(); - let _ = join(s.run(bottom), r.run(router, port)).await; + let id = port.id(); + let _ = join(s.run(port), r.run(router, id)).await; unreachable!() } } From 7fac14a43b9e05461c214171932f429a615d686e Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 28 Jul 2025 14:39:34 +0800 Subject: [PATCH 07/22] mctp-estack: Revert send_message() loop change This reverts part of 4cc8969fde57 ("mctp-estack: Minor Fragmenter usage changes") It is better to break out of the loop immediately rather than waiting for another loop iteration - is the sender is full it will have to wait until a packet is sent. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 6e7eb79..f5f4885 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -204,13 +204,19 @@ impl PortTop { SendOutput::Packet(p) => { qpkt.len = p.len(); sender.send_done(); + if fragmenter.is_done() { + // Break here rather than using SendOutput::Complete, + // since we don't want to call channel.sender() an extra time. + break Ok(fragmenter.tag()); + } } SendOutput::Error { err, .. } => { debug!("Error packetising"); + debug_assert!(false, "Shouldn't fail, can't roll back"); sender.send_done(); break Err(err); } - SendOutput::Complete { tag, cookie: _ } => break Ok(tag), + SendOutput::Complete { .. } => unreachable!(), } } } From 22c4ea2cfcc3002b2bbc9d81f6fa5dcfd4ede269 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 28 Jul 2025 15:28:46 +0800 Subject: [PATCH 08/22] mctp-estack: Better locking between send_message and forward_packets() Previously a blocking send_message() could also block progress for forward_packets(). After this change, forward_packets() will wait for access to the channel (send_message() only holds the lock momentarily), then will drop the packet and return if the channel is full. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 107 +++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 42 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index f5f4885..11d3756 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -21,7 +21,7 @@ use crate::{ use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue}; use crate::zerocopy_channel::{FixedChannel, Receiver}; -use embassy_sync::waitqueue::WakerRegistration; +use embassy_sync::waitqueue::{AtomicWaker, WakerRegistration}; use heapless::{Entry, FnvIndexMap, Vec}; @@ -108,9 +108,13 @@ impl core::ops::Deref for PktBuf { pub struct PortTop { /// Forwarded packet queue. channel: FixedChannel, - send_mutex: AsyncMutex<()>, mtu: usize, + + // Callers should hold send_mutex when using channel.sender(). + // send_message() will wait on send_mutex being available using sender_waker. + send_mutex: BlockingMutex<()>, + sender_waker: AtomicWaker, } impl PortTop { @@ -122,8 +126,9 @@ impl PortTop { Ok(Self { channel: FixedChannel::new(), - send_mutex: AsyncMutex::new(()), mtu, + send_mutex: BlockingMutex::new(()), + sender_waker: AtomicWaker::new(), }) } @@ -150,21 +155,27 @@ impl PortTop { } // With forwarded packets we don't want to block if // the queue is full (we drop packets instead). - // Don't hold this sender across any await. - let _l = self.send_mutex.lock().await; - let mut sender = self.channel.sender().unwrap(); - - // Get a slot to send - let slot = sender.try_send().ok_or_else(|| { - debug!("Dropped forward packet"); - Error::TxFailure - })?; + let r = self.send_mutex.lock(|_| { + // OK unwrap, we have the send_mutex + let mut sender = self.channel.sender().unwrap(); - // Fill the buffer - // OK unwrap: pkt.len() checked above. - slot.set(pkt).unwrap(); - sender.send_done(); - Ok(()) + // Get a slot to send + let slot = sender.try_send().ok_or_else(|| { + debug!("Dropped forward packet"); + Error::TxFailure + })?; + + // Fill the buffer + if slot.set(pkt).is_ok() { + sender.send_done(); + Ok(()) + } else { + debug!("Oversized forward packet"); + Err(Error::TxFailure) + } + }); + self.sender_waker.wake(); + r } /// Fragments and enqueues a message. @@ -192,33 +203,45 @@ impl PortTop { work_msg }; - loop { - let _l = self.send_mutex.lock().await; - // OK to unwrap, protected by send_mutex.lock() - let mut sender = self.channel.sender().unwrap(); - - let qpkt = sender.send().await; - qpkt.len = 0; - let r = fragmenter.fragment(payload, &mut qpkt.data); - match r { - SendOutput::Packet(p) => { - qpkt.len = p.len(); - sender.send_done(); - if fragmenter.is_done() { - // Break here rather than using SendOutput::Complete, - // since we don't want to call channel.sender() an extra time. - break Ok(fragmenter.tag()); + // send_message() needs to wait for packets to get enqueued to the PortTop channel. + // It shouldn't hold the send_mutex() across an await, since that would block + // forward_packet(). + poll_fn(|cx| { + self.send_mutex.lock(|_| { + // OK to unwrap, protected by send_mutex.lock() + let mut sender = self.channel.sender().unwrap(); + + // Send as much as we can in a loop without blocking. + // If it blocks the next poll_fn iteration will continue + // where it left off. + loop { + let Poll::Ready(qpkt) = sender.poll_send(cx) else { + self.sender_waker.register(cx.waker()); + break Poll::Pending; + }; + + qpkt.len = 0; + match fragmenter.fragment(payload, &mut qpkt.data) { + SendOutput::Packet(p) => { + qpkt.len = p.len(); + sender.send_done(); + if fragmenter.is_done() { + // Break here rather than using SendOutput::Complete, + // since we don't want to call channel.sender() an extra time. + break Poll::Ready(Ok(fragmenter.tag())); + } + } + SendOutput::Error { err, .. } => { + debug!("Error packetising"); + debug_assert!(false, "fragment () shouldn't fail"); + break Poll::Ready(Err(err)); + } + SendOutput::Complete { .. } => unreachable!(), } } - SendOutput::Error { err, .. } => { - debug!("Error packetising"); - debug_assert!(false, "Shouldn't fail, can't roll back"); - sender.send_done(); - break Err(err); - } - SendOutput::Complete { .. } => unreachable!(), - } - } + }) + }) + .await } } From 39705adceb5f06ad6d8882118f3c4f9dbb164f8f Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Mon, 28 Jul 2025 15:29:45 +0800 Subject: [PATCH 09/22] mctp-estack: Move MTU handling into PortTop::by_eid() Now the lookup function also returns an optional EID, replacing the fixed port MTU. Signed-off-by: Matt Johnston --- mctp-estack/src/fragment.rs | 1 - mctp-estack/src/router.rs | 52 ++++++++++++++++++------------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/mctp-estack/src/fragment.rs b/mctp-estack/src/fragment.rs index c19d1c9..26ace50 100644 --- a/mctp-estack/src/fragment.rs +++ b/mctp-estack/src/fragment.rs @@ -109,7 +109,6 @@ impl Fragmenter { // Reserve header space, the remaining buffer keeps being // updated in `rest` let max_total = out.len().min(self.mtu); - // let out = &mut out[..max_total]; let (h, mut rest) = out[..max_total].split_at_mut(MctpHeader::LEN); // Append type byte diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 11d3756..c4c257e 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -50,17 +50,24 @@ pub struct PortId(pub u8); /// A trait implemented by applications to determine the routing table. pub trait PortLookup: Sync + Send { - /// Returns the `PortId` for a destination EID. + /// Returns `(PortId, MTU)` for a destination EID. /// - /// `PortId` is an index into the array of `ports` provided to [`Router::new`] - /// - /// Return `None` to drop the packet as unreachable. This lookup + /// Return a `None` `PortId` to drop the packet as unreachable. This lookup /// is only called for outbound packets - packets destined for the local EID /// will not be passed to this callback. /// + /// A MTU can optionally be returned, it will be applied to locally fragmented packets. + /// This MTU is ignored for forwarded packets in a bridge (the transport implementation + /// can drop packets if desired). + /// If MTU is `None`, the MCTP minimum 64 is used. + /// /// `source_port` is the incoming interface of a forwarded packet, /// or `None` for locally generated packets. - fn by_eid(&self, eid: Eid, source_port: Option) -> Option; + fn by_eid( + &self, + eid: Eid, + source_port: Option, + ) -> (Option, Option); } /// Used like `heapless::Vec`, but lets the mut buffer be written into @@ -108,9 +115,6 @@ impl core::ops::Deref for PktBuf { pub struct PortTop { /// Forwarded packet queue. channel: FixedChannel, - - mtu: usize, - // Callers should hold send_mutex when using channel.sender(). // send_message() will wait on send_mutex being available using sender_waker. send_mutex: BlockingMutex<()>, @@ -118,18 +122,12 @@ pub struct PortTop { } impl PortTop { - pub fn new(mtu: usize) -> Result { - if mtu > MAX_MTU { - debug!("port mtu {} > MAX_MTU {}", mtu, MAX_MTU); - return Err(Error::BadArgument); - } - - Ok(Self { + pub fn new() -> Self { + Self { channel: FixedChannel::new(), - mtu, send_mutex: BlockingMutex::new(()), sender_waker: AtomicWaker::new(), - }) + } } /// Return the bottom half port for the channel. @@ -148,11 +146,6 @@ impl PortTop { async fn forward_packet(&self, pkt: &[u8]) -> Result<()> { debug_assert!(MctpHeader::decode(pkt).is_ok()); - // Check space first (can't rollback after try_send) - if pkt.len() > self.mtu { - debug!("Forward packet too large"); - return Err(Error::NoSpace); - } // With forwarded packets we don't want to block if // the queue is full (we drop packets instead). let r = self.send_mutex.lock(|_| { @@ -245,6 +238,12 @@ impl PortTop { } } +impl Default for PortTop { + fn default() -> Self { + Self::new() + } +} + /// The "consumer" side of a queue of packets to send out a MCTP interface, /// /// An MCTP transport implementation will read packets to send with @@ -511,7 +510,8 @@ impl<'r> Router<'r> { } // Look for a route to forward to - let Some(p) = inner.lookup.by_eid(header.dest, Some(port)) else { + let (Some(p), _mtu) = inner.lookup.by_eid(header.dest, Some(port)) + else { debug!("No route for recv {}", header.dest); return ret_src; }; @@ -651,7 +651,8 @@ impl<'r> Router<'r> { ) -> Result { let mut inner = self.inner.lock().await; - let Some(p) = inner.lookup.by_eid(eid, None) else { + let (port, mtu) = inner.lookup.by_eid(eid, None); + let Some(p) = port else { debug!("No route for recv {}", eid); return Err(Error::TxFailure); }; @@ -661,7 +662,6 @@ impl<'r> Router<'r> { return Err(Error::TxFailure); }; - let mtu = top.mtu; let mut fragmenter = inner .stack .start_send( @@ -670,7 +670,7 @@ impl<'r> Router<'r> { tag, tag_expires, integrity_check, - Some(mtu), + mtu, cookie, ) .inspect_err(|e| trace!("error fragmenter {}", e))?; From f3e3a8125394e1714142ae7175877ca0f6a22812 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 29 Jul 2025 12:39:08 +0800 Subject: [PATCH 10/22] mctp-estack: Construct Stack within Router Stack doesn't allow much customisation, so construct it directly. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index c4c257e..3cc467d 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -423,13 +423,19 @@ pub struct RouterInner<'r> { impl<'r> Router<'r> { /// Create a new Router. /// - /// The EID of the provided `stack` is used to match local destination packets. - /// - /// `ports` is a list of transport interfaces for the router. The indices - /// of the `ports` slice are used as `PortId` identifiers. + /// `own_eid` is the EID that will respond locally to messages, and + /// is used as a source address. /// /// `lookup` callbacks define the routing table for outbound packets. - pub fn new(stack: Stack, lookup: &'r dyn PortLookup) -> Self { + /// + /// `now_millis` is the current timestamp, as would be provided to + /// [`update_time`](Self::update_time). + pub fn new( + own_eid: Eid, + lookup: &'r dyn PortLookup, + now_millis: u64, + ) -> Self { + let stack = Stack::new(own_eid, now_millis); let inner = RouterInner { stack, lookup }; let app_listeners = BlockingMutex::new(RefCell::new(Vec::new())); From 07b86c1ffab15796afd8e9a3ebe9668a4555df2f Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 29 Jul 2025 17:45:59 +0800 Subject: [PATCH 11/22] mctp-estack: Don't remove non-expiring flows With non-expiring tags a non-owner response shouldn't remove the flow. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 893f338..5a3afd2 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -47,7 +47,7 @@ extern crate std; /// released. pub use heapless::Vec; -use heapless::FnvIndexMap; +use heapless::{Entry, FnvIndexMap}; use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue}; @@ -396,13 +396,21 @@ impl Stack { match re.receive(packet, buf, self.now.increment()) { // Received a complete message Ok(Some(mut msg)) => { - // Have received a "response", flow is finished. - // TODO preallocated tags won't remove the flow. + // Have received a "response", flow may be finished. let re = &mut msg.reassembler; if !re.tag.is_owner() { - trace!("remove flow"); - let r = self.flows.remove(&(re.peer, re.tag.tag())); - debug_assert!(r.is_some(), "non-existent remove_flow"); + let e = self.flows.entry((re.peer, re.tag.tag())); + match e { + Entry::Occupied(e) => { + if e.get().expiry_stamp.is_some() { + trace!("remove flow"); + e.remove(); + } + } + Entry::Vacant(_) => { + debug_assert!(false, "non-existent remove_flow") + } + } } Ok(Some(msg)) From b98f29787a0c4939814f4ea72984d3f18fdea9e8 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Wed, 30 Jul 2025 14:12:00 +0800 Subject: [PATCH 12/22] mctp-estack: Add recv() timeouts for Listener, Req A receive timeout can be set with set_timeout(), expiring depending on udpate_clock(). Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 7 ++++ mctp-estack/src/router.rs | 68 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 5a3afd2..b836dd6 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -236,6 +236,13 @@ impl Stack { } } + /// Return the current internal timestamp. + /// + /// This is the time last set with `update_clock()`. + pub fn now(&self) -> u64 { + self.now.clock + } + /// Updates timeouts and returns the next timeout in milliseconds /// /// Must be called regularly to update the current clock value. diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index 3cc467d..ca58618 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -417,6 +417,9 @@ pub struct RouterInner<'r> { /// Core MCTP stack stack: Stack, + /// Minimum receive deadline. u64::MAX when cleared. + recv_deadline: u64, + lookup: &'r dyn PortLookup, } @@ -436,7 +439,11 @@ impl<'r> Router<'r> { now_millis: u64, ) -> Self { let stack = Stack::new(own_eid, now_millis); - let inner = RouterInner { stack, lookup }; + let inner = RouterInner { + stack, + recv_deadline: u64::MAX, + lookup, + }; let app_listeners = BlockingMutex::new(RefCell::new(Vec::new())); @@ -474,12 +481,21 @@ impl<'r> Router<'r> { /// be returned, currently a maximum of 100 ms. pub async fn update_time(&self, now_millis: u64) -> Result { let mut inner = self.inner.lock().await; - let (next, expired) = inner.stack.update(now_millis)?; + let (next, mut expired) = inner.stack.update(now_millis)?; + + if inner.recv_deadline <= now_millis { + expired = true; + // app_recv() will update with next minimum deadline. + inner.recv_deadline = u64::MAX; + } + if expired { // Wake pending receivers in case one was waiting on a now-expired response. // TODO something more efficient, maybe Reassembler should hold a waker? + trace!("update_time expired"); self.recv_wakers.wake_all(); } + Ok(next) } @@ -610,10 +626,13 @@ impl<'r> Router<'r> { &self, cookie: AppCookie, buf: &'f mut [u8], + timeout: Option, ) -> Result<(&'f mut [u8], Eid, MsgType, Tag, MsgIC)> { // buf can only be taken once let mut buf = Some(buf); + let mut deadline = None; + // Wait for the message to arrive poll_fn(|cx| { let l = pin!(self.inner.lock()); @@ -621,7 +640,28 @@ impl<'r> Router<'r> { return Poll::Pending; }; + // Convert timeout to a deadline on the first iteration + if deadline.is_none() { + if let Some(timeout) = timeout { + deadline = Some(timeout + inner.stack.now()) + } + } + + let expired = + deadline.map(|d| inner.stack.now() >= d).unwrap_or(false); + + if let Some(deadline) = deadline { + // Update the Router-wide deadline. + if !expired { + inner.recv_deadline = inner.recv_deadline.min(deadline); + } + } + let Some(msg) = inner.stack.get_deferred_bycookie(&[cookie]) else { + trace!("no message"); + if expired { + return Poll::Ready(Err(mctp::Error::TimedOut)); + } self.recv_wakers.register(cookie, cx.waker()); return Poll::Pending; }; @@ -716,6 +756,7 @@ impl<'r> Router<'r> { Ok(RouterAsyncListener { cookie, router: self, + timeout: None, }) } @@ -743,6 +784,7 @@ pub struct RouterAsyncReqChannel<'g, 'r> { router: &'g Router<'r>, tag_expires: bool, cookie: Option, + timeout: Option, } impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { @@ -753,6 +795,7 @@ impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { tag_expires: true, router, cookie: None, + timeout: None, } } @@ -784,6 +827,13 @@ impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { } } } + + /// Set a timeout. + /// + /// Specified in milliseconds. + pub fn set_timeout(&mut self, timeout: Option) { + self.timeout = timeout; + } } impl Drop for RouterAsyncReqChannel<'_, '_> { @@ -867,7 +917,7 @@ impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_, '_> { }; let recv_tag = Tag::Unowned(tv); let (buf, eid, typ, tag, ic) = - self.router.app_recv(cookie, buf).await?; + self.router.app_recv(cookie, buf, self.timeout).await?; debug_assert_eq!(tag, recv_tag); debug_assert_eq!(eid, self.eid); @@ -943,6 +993,16 @@ impl<'g, 'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'g, 'r> { pub struct RouterAsyncListener<'g, 'r> { router: &'g Router<'r>, cookie: AppCookie, + timeout: Option, +} + +impl RouterAsyncListener<'_, '_> { + /// Set a receive timeout. + /// + /// Specified in milliseconds. + pub fn set_timeout(&mut self, timeout: Option) { + self.timeout = timeout; + } } impl<'g, 'r> mctp::AsyncListener for RouterAsyncListener<'g, 'r> { @@ -957,7 +1017,7 @@ impl<'g, 'r> mctp::AsyncListener for RouterAsyncListener<'g, 'r> { ) -> mctp::Result<(MsgType, MsgIC, &'f mut [u8], Self::RespChannel<'_>)> { let (msg, eid, typ, tag, ic) = - self.router.app_recv(self.cookie, buf).await?; + self.router.app_recv(self.cookie, buf, self.timeout).await?; let Tag::Owned(tv) = tag else { debug_assert!(false, "listeners only accept owned tags"); From b2cbf79e8190131cc3e762ff2092ca8f73074f88 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Wed, 30 Jul 2025 14:13:16 +0800 Subject: [PATCH 13/22] mctp-estack: impl Debug for Channels, Listener Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index ca58618..c98f8b0 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -773,7 +773,14 @@ impl<'r> Router<'r> { } } +impl core::fmt::Debug for Router<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Router").finish_non_exhaustive() + } +} + /// A request channel. +#[derive(Debug)] pub struct RouterAsyncReqChannel<'g, 'r> { /// Destination EID eid: Eid, @@ -942,6 +949,7 @@ impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_, '_> { /// A response channel. /// /// Returned by [`RouterAsyncListener::recv`](mctp::AsyncListener::recv). +#[derive(Debug)] pub struct RouterAsyncRespChannel<'g, 'r> { eid: Eid, tv: TagValue, @@ -990,6 +998,7 @@ impl<'g, 'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'g, 'r> { /// A listener. /// /// Created with [`Router::listener()`](Router::listener). +#[derive(Debug)] pub struct RouterAsyncListener<'g, 'r> { router: &'g Router<'r>, cookie: AppCookie, From 4a13f3dbbc61e31f72daf865aa598429c32ab0c2 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 10:56:50 +0800 Subject: [PATCH 14/22] mctp-estack: Stack timeouts expire on >= deadline, not > This is slightly more accurate and consistent with other timeouts such as in Router. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index b836dd6..918c577 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -769,7 +769,8 @@ impl EventStamp { return None; }; - timeout.checked_sub(elapsed) + // zero time_remaining should return None. + (elapsed < timeout).then(|| timeout - elapsed) } } From f1ce9d1c9e086f48c311af5b9659d7186f616319 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 13:26:09 +0800 Subject: [PATCH 15/22] mctp-estack: Add cancel_flow_bycookie() cancel_flow is also updated, now has no return (was always Ok) and removed the debug assertion which was incorrect. A reassembler could be Done and still existing, but the flow would be removed. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 20 ++++++++++++-------- mctp-estack/src/router.rs | 4 +--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 918c577..1b053a7 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -638,23 +638,27 @@ impl Stack { self.flows.get(&(peer, tv)) } - pub fn cancel_flow(&mut self, source: Eid, tv: TagValue) -> Result<()> { + pub fn cancel_flow(&mut self, source: Eid, tv: TagValue) { trace!("cancel flow {}", source); let tag = Tag::Unowned(tv); - let mut removed = false; for (re, _buf) in self.reassemblers.iter_mut() { if !re.is_unused() && re.tag == tag && re.peer == source { re.set_unused(); - removed = true; } } - trace!("removed flow"); - let r = self.flows.remove(&(source, tv)); - if removed { - debug_assert!(r.is_some()); + self.flows.remove(&(source, tv)); + } + + pub fn cancel_flow_bycookie(&mut self, cookie: AppCookie) { + trace!("cancel flow bycookie {:?}", cookie); + for (re, _buf) in self.reassemblers.iter_mut() { + if !re.is_unused() && re.cookie == Some(cookie) { + re.set_unused(); + } } - Ok(()) + + self.flows.retain(|_, f| f.cookie != Some(cookie)); } } diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index c98f8b0..e655726 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -735,9 +735,7 @@ impl<'r> Router<'r> { let Tag::Owned(tv) = tag else { unreachable!() }; let mut inner = self.inner.lock().await; - if let Err(e) = inner.stack.cancel_flow(eid, tv) { - warn!("flow cancel failed {}", e); - } + inner.stack.cancel_flow(eid, tv); } /// Create a `AsyncReqChannel` instance. From 185038ba3d6136a76274c4f8062c3e8f9a474d4f Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 11:22:36 +0800 Subject: [PATCH 16/22] mctp-estack: Remove Stack's DEFERRED_TIMEOUT Messages now don't expire at all after retain(), it is up to the application to ensure that they are fetched with get_deferred...() then dropped. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 45 ++++++++++++----------------------- mctp-estack/src/reassemble.rs | 16 ++++++++----- 2 files changed, 25 insertions(+), 36 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 1b053a7..4d6f45c 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -85,11 +85,6 @@ pub(crate) use proto::MctpHeader; /// In milliseconds. const REASSEMBLY_EXPIRY_TIMEOUT: u32 = 6000; -/// Timeout for calling [`get_deferred()`](Stack::get_deferred). -/// -/// See documentation for [`MctpMessage`]. -pub const DEFERRED_TIMEOUT: u32 = 6000; - /// Timeout granularity. /// /// Timeouts will be checked no more often than this interval (in milliseconds). @@ -269,18 +264,16 @@ impl Stack { // Check reassembler expiry for incomplete packets for (re, _buf) in self.reassemblers.iter_mut() { if !re.is_unused() { - match re.check_expired( - &self.now, - REASSEMBLY_EXPIRY_TIMEOUT, - DEFERRED_TIMEOUT, - ) { - None => { + match re.check_expired(&self.now, REASSEMBLY_EXPIRY_TIMEOUT) { + Some(0) => { trace!("Expired"); any_expired = true; re.set_unused(); } // Not expired, update the timeout Some(t) => timeout = timeout.min(t), + // No timeout + None => (), } } } @@ -295,11 +288,11 @@ impl Stack { .check_timeout(&self.now, REASSEMBLY_EXPIRY_TIMEOUT) { // expired, remove it - None => { + 0 => { any_expired = true; false } - Some(t) => { + t => { // still time left timeout = timeout.min(t); true @@ -435,10 +428,6 @@ impl Stack { /// /// Messages are selected by `(source_eid, tag)`. /// If multiple match the earliest is returned. - /// - /// Messages are only available for [`DEFERRED_TIMEOUT`], after - /// that time they will be discarded and the message slot/tag may - /// be reused. pub fn get_deferred( &mut self, source: Eid, @@ -464,10 +453,6 @@ impl Stack { /// /// If multiple match the earliest is returned. /// Multiple cookies to match may be provided. - /// - /// Messages are only available for [`DEFERRED_TIMEOUT`], after - /// that time they will be discarded and the message slot may - /// be reused. pub fn get_deferred_bycookie( &mut self, cookies: &[AppCookie], @@ -670,9 +655,10 @@ impl Stack { /// If the the message is going to be retrieved again using /// [`get_deferred()`](Stack::get_deferred) or /// [`get_deferred_bycookie()`](Stack::get_deferred_bycookie), the caller must -/// call [`retain()`](Self::retain). In that case the MCTP stack will keep the message -/// buffer available until [`DEFERRED_TIMEOUT`] (measured from when the final packet -/// of the message was received). +/// call [`retain()`](Self::retain). +/// +/// After `retain()` a caller must ensure that the packet is eventually retrieved, +/// otherwise it will forever consume a reassembly slot in the `Stack`. pub struct MctpMessage<'a> { pub source: Eid, pub dest: Eid, @@ -761,20 +747,19 @@ impl EventStamp { /// Check timeout /// - /// Returns `None` if expired, or `Some(time_remaining)`. + /// Returns 0 if expired, otherwise `time_remaining`. /// Times are in milliseconds. - pub fn check_timeout(&self, now: &EventStamp, timeout: u32) -> Option { + pub fn check_timeout(&self, now: &EventStamp, timeout: u32) -> u32 { let Some(elapsed) = now.clock.checked_sub(self.clock) else { debug_assert!(false, "Timestamp backwards"); - return None; + return 0; }; let Ok(elapsed) = u32::try_from(elapsed) else { // Longer than 49 days elapsed. It's expired. - return None; + return 0; }; - // zero time_remaining should return None. - (elapsed < timeout).then(|| timeout - elapsed) + timeout.saturating_sub(elapsed) } } diff --git a/mctp-estack/src/reassemble.rs b/mctp-estack/src/reassemble.rs index 5b154dc..974c452 100644 --- a/mctp-estack/src/reassemble.rs +++ b/mctp-estack/src/reassemble.rs @@ -222,25 +222,29 @@ impl Reassembler { /// Check timeouts /// - /// Returns `None` if timed out, `Some(remaining)` otherwise. + /// Returns `None` if no timeout, `Some(remaining)` otherwise. + /// `Some(0)` is expired. pub fn check_expired( &self, now: &EventStamp, reassemble_timeout: u32, - done_timeout: u32, ) -> Option { let timeout = match self.state { State::Active { .. } => reassemble_timeout, - State::Done { .. } => done_timeout, + State::Done { .. } => { + // Done reassemblers don't expire, they will be + // cleared when a MctpMessage is dropped without retain(). + return None; + } State::New | State::Bad => { // Bad ones should have been cleaned up, New ones should // have moved to Active prior to check_expired(). debug_assert!(false, "Bad or new reassembler"); - return None; + return Some(0); } - State::Unused => return None, + State::Unused => return Some(0), }; - self.stamp.check_timeout(now, timeout) + Some(self.stamp.check_timeout(now, timeout)) } pub(crate) fn set_cookie(&mut self, cookie: Option) { From 40a99db784c80cfc354e7f8bcd01aadfb73728a2 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 12:57:39 +0800 Subject: [PATCH 17/22] mctp-estack: cleanup() to remove deferred cookies This replaces async_drop(), running a cleanup handler if needed prior to handing local incoming packets. This ensures that a dropped ReqChannel/Listener won't hold Reassembler slots used. Signed-off-by: Matt Johnston --- mctp-estack/src/router.rs | 109 ++++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 40 deletions(-) diff --git a/mctp-estack/src/router.rs b/mctp-estack/src/router.rs index e655726..23644f8 100644 --- a/mctp-estack/src/router.rs +++ b/mctp-estack/src/router.rs @@ -11,6 +11,7 @@ use crate::fmt::{debug, error, info, trace, warn}; use core::cell::RefCell; use core::debug_assert; use core::future::{poll_fn, Future}; +use core::mem::take; use core::pin::pin; use core::task::{Poll, Waker}; @@ -300,8 +301,17 @@ impl Port<'_> { #[derive(Default)] struct WakerPoolInner { - pool: FnvIndexMap, + // Value is None for AppCookies that are pending deallocation. + // Deallocation is deferred since it needs an async lock on the stack. + pool: FnvIndexMap, MAX_CHANNELS>, + + /// Next AppCookie to allocate. Arbitrary, but incrementing + /// values are nicer for debugging. next: usize, + + /// Set when remove() is called, lets cleanup() avoid scanning the + /// whole pool if not necessary. + need_cleanup: bool, } struct WakerPool { @@ -320,29 +330,37 @@ impl WakerPool { fn wake(&self, cookie: AppCookie) { self.inner.lock(|i| { let mut i = i.borrow_mut(); - if let Some(w) = i.pool.get_mut(&cookie) { + if let Some(Some(w)) = i.pool.get_mut(&cookie) { w.wake() } else { - // This can currently happen if a ReqChannel is dropped but the core stack - // subsequently receives a response message corresponding to that cookie. - // TODO fix expiring from the stack (?) and make this a debug_assertion. - // We can't expire in the ReqChannel drop handler since it needs async - // for locking. + // Some(None) case is when .remove() has removed the slot, but cleanup() + // hasn't run yet. + // + // None case is when a ReqChannel is dropped but the core stack + // subsequently receives a response message corresponding to that cookie, + // prior to cleanup(). + // + // In both cases we do nothing, a subsequent cleanup will handle it. } }) } fn wake_all(&self) { self.inner.lock(|i| { - for w in i.borrow_mut().pool.values_mut() { + for w in i.borrow_mut().pool.values_mut().flatten() { w.wake() } }) } fn register(&self, cookie: AppCookie, waker: &Waker) { - self.inner - .lock(|i| i.borrow_mut().pool[&cookie].register(waker)) + self.inner.lock(|i| { + if let Some(w) = i.borrow_mut().pool[&cookie].as_mut() { + w.register(waker); + } else { + debug_assert!(false, "register called after remove"); + } + }); } /// Returns `Error::NoSpace` if all slots are occupied. @@ -360,7 +378,7 @@ impl WakerPool { continue; }; - break if entry.insert(WakerRegistration::new()).is_err() { + break if entry.insert(Some(WakerRegistration::new())).is_err() { // Map is full Err(Error::NoSpace) } else { @@ -370,12 +388,39 @@ impl WakerPool { }) } + // Marks the cookie as unused. It will later be fully cleared by a call + // to cleanup(). They are split so that remove() can call from drop handlers + // (no async lock possible), while cleanup() can run later holding an async lock. fn remove(&self, cookie: AppCookie) { self.inner.lock(|i| { - let e = i.borrow_mut().pool.remove(&cookie); - debug_assert!(e.is_some()); + let mut i = i.borrow_mut(); + if let Some(e) = i.pool.get_mut(&cookie) { + debug_assert!(e.is_some(), "remove called twice"); + *e = None; + i.need_cleanup = true; + } }); } + + // Finalises items previously remove()d, calling a closure with the cookie. + // + // Does nothing if no cleanup is necessary. + fn cleanup(&self, mut f: F) + where + F: FnMut(AppCookie), + { + self.inner.lock(|i| { + let mut i = i.borrow_mut(); + if take(&mut i.need_cleanup) { + i.pool.retain(|cookie, w| { + if w.is_none() { + f(*cookie); + } + w.is_some() + }) + } + }) + } } /// An async MCTP stack with routing. @@ -514,6 +559,14 @@ impl<'r> Router<'r> { // Handle locally if possible if inner.stack.is_local_dest(pkt) { + // Clean up any outstanding reassembly slots, to ensure + // they don't prevent the new packet being received. + // This is cheap. + self.recv_wakers.cleanup(|cookie| { + inner.stack.cancel_flow_bycookie(cookie); + while inner.stack.get_deferred_bycookie(&[cookie]).is_some() {} + }); + match inner.stack.receive(pkt) { // Complete message Ok(Some(msg)) => { @@ -728,16 +781,6 @@ impl<'r> Router<'r> { top.send_message(&mut fragmenter, buf, &mut work_msg).await } - /// Only needs to be called for tags allocated with tag_expires=false - /// - /// Must only be called for owned tags. - async fn app_release_tag(&self, eid: Eid, tag: Tag) { - let Tag::Owned(tv) = tag else { unreachable!() }; - let mut inner = self.inner.lock().await; - - inner.stack.cancel_flow(eid, tv); - } - /// Create a `AsyncReqChannel` instance. pub fn req(&self, eid: Eid) -> RouterAsyncReqChannel<'_, 'r> { RouterAsyncReqChannel::new(eid, self) @@ -806,8 +849,6 @@ impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { /// Set the tag to not expire. That allows multiple calls to `send()`. /// - /// `async_drop` must be called prior to drop. - /// /// Should be called prior to any `send()` and may only be called once /// for a `RouterAsyncReqChannel`. /// This can also be called after the `recv()` has completed. @@ -819,20 +860,6 @@ impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { Ok(()) } - /// This must be called prior to drop whenever `tag_noexpire()` is used. - /// - /// Failure to call will result in leaking tags in the Router. - /// - /// This is a workaround until async drop is implemented in Rust itself. - /// - pub async fn async_drop(mut self) { - if !self.tag_expires { - if let Some(tag) = self.last_tag.take() { - self.router.app_release_tag(self.eid, tag).await; - } - } - } - /// Set a timeout. /// /// Specified in milliseconds. @@ -844,8 +871,10 @@ impl<'g, 'r> RouterAsyncReqChannel<'g, 'r> { impl Drop for RouterAsyncReqChannel<'_, '_> { fn drop(&mut self) { if !self.tag_expires && self.last_tag.is_some() { - warn!("Didn't call async_drop()"); + // tag cleanup will require a cookie + debug_assert!(self.cookie.is_some()); } + if let Some(c) = self.cookie { self.router.recv_wakers.remove(c); } From 790c03a189cd6e342f0ab2acfe8660ec3b1b6633 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 16:12:31 +0800 Subject: [PATCH 18/22] Remove needless lifetimes by clippy Clippy 1.85 warns about these unused lifetimes. Remove them with --fix. More recently clippy no longer warns, though the code is cleaner without them. Signed-off-by: Matt Johnston --- mctp-estack/src/lib.rs | 4 ++-- mctp-estack/src/zerocopy_channel.rs | 4 ++-- mctp-linux/src/lib.rs | 2 +- pldm-platform/src/proto.rs | 2 +- pldm/src/lib.rs | 8 ++++---- pldm/src/util.rs | 4 ++-- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/mctp-estack/src/lib.rs b/mctp-estack/src/lib.rs index 4d6f45c..576b57c 100644 --- a/mctp-estack/src/lib.rs +++ b/mctp-estack/src/lib.rs @@ -677,7 +677,7 @@ pub struct MctpMessage<'a> { retain: bool, } -impl<'a> MctpMessage<'a> { +impl MctpMessage<'_> { /// Retrieve the message's cookie. /// /// For response messages with `tag.is_owner() == false` this will be @@ -704,7 +704,7 @@ impl<'a> MctpMessage<'a> { } } -impl<'a> Drop for MctpMessage<'a> { +impl Drop for MctpMessage<'_> { fn drop(&mut self) { if !self.retain { self.reassembler.set_unused() diff --git a/mctp-estack/src/zerocopy_channel.rs b/mctp-estack/src/zerocopy_channel.rs index bfeec78..78ee875 100644 --- a/mctp-estack/src/zerocopy_channel.rs +++ b/mctp-estack/src/zerocopy_channel.rs @@ -331,7 +331,7 @@ pub struct Sender<'a, M: RawMutex, T> { channel: &'a ChannelInner, } -impl<'a, M: RawMutex, T> Sender<'a, M, T> { +impl Sender<'_, M, T> { /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Sender<'_, M, T> { Sender { channel: self.channel } @@ -420,7 +420,7 @@ pub struct Receiver<'a, M: RawMutex, T> { channel: &'a ChannelInner, } -impl<'a, M: RawMutex, T> Receiver<'a, M, T> { +impl Receiver<'_, M, T> { /// Creates one further [`Receiver`] over the same channel. pub fn borrow(&mut self) -> Receiver<'_, M, T> { Receiver { channel: self.channel } diff --git a/mctp-linux/src/lib.rs b/mctp-linux/src/lib.rs index f490e2c..59f2af4 100644 --- a/mctp-linux/src/lib.rs +++ b/mctp-linux/src/lib.rs @@ -707,7 +707,7 @@ pub struct MctpLinuxAsyncResp<'l> { typ: MsgType, } -impl<'l> mctp::AsyncRespChannel for MctpLinuxAsyncResp<'l> { +impl mctp::AsyncRespChannel for MctpLinuxAsyncResp<'_> { type ReqChannel<'a> = MctpLinuxAsyncReq where diff --git a/pldm-platform/src/proto.rs b/pldm-platform/src/proto.rs index d1c9850..2c05d66 100644 --- a/pldm-platform/src/proto.rs +++ b/pldm-platform/src/proto.rs @@ -325,7 +325,7 @@ impl TryFrom<&str> for AsciiString { } } -impl<'a, Predicate, const N: usize> DekuReader<'a, (Limit, ())> +impl DekuReader<'_, (Limit, ())> for AsciiString where Predicate: FnMut(&u8) -> bool, diff --git a/pldm/src/lib.rs b/pldm/src/lib.rs index a224186..aab261d 100644 --- a/pldm/src/lib.rs +++ b/pldm/src/lib.rs @@ -203,7 +203,7 @@ pub struct PldmRequest<'a> { pub data: VecOrSlice<'a, u8>, } -impl<'a> Debug for PldmRequest<'a> { +impl Debug for PldmRequest<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let vs = match self.data { #[cfg(feature = "alloc")] @@ -222,7 +222,7 @@ impl<'a> Debug for PldmRequest<'a> { } #[cfg(feature = "alloc")] -impl<'a> PldmRequest<'a> { +impl PldmRequest<'_> { /// Converts any `PldmRequest` into one with allocated storage pub fn make_owned(self) -> PldmRequest<'static> { let d = match self.data { @@ -347,7 +347,7 @@ pub struct PldmResponse<'a> { pub data: VecOrSlice<'a, u8>, } -impl<'a> Debug for PldmResponse<'a> { +impl Debug for PldmResponse<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let vs = match self.data { #[cfg(feature = "alloc")] @@ -367,7 +367,7 @@ impl<'a> Debug for PldmResponse<'a> { } #[cfg(feature = "alloc")] -impl<'a> PldmResponse<'a> { +impl PldmResponse<'_> { /// Set the data payload for this response pub fn set_data(&mut self, data: Vec) { self.data = data.into() diff --git a/pldm/src/util.rs b/pldm/src/util.rs index dd80d97..564d51f 100644 --- a/pldm/src/util.rs +++ b/pldm/src/util.rs @@ -21,14 +21,14 @@ pub enum VecOrSlice<'a, V> { Borrowed(&'a [V]), } -impl<'a, V> core::ops::Deref for VecOrSlice<'a, V> { +impl core::ops::Deref for VecOrSlice<'_, V> { type Target = [V]; fn deref(&self) -> &[V] { self.as_ref() } } -impl<'a, V> AsRef<[V]> for VecOrSlice<'a, V> { +impl AsRef<[V]> for VecOrSlice<'_, V> { fn as_ref(&self) -> &[V] { match self { #[cfg(feature = "alloc")] From 16d6f9df05666b1042af44bded55228cf4e26ee8 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Fri, 8 Aug 2025 14:47:39 +0800 Subject: [PATCH 19/22] ci: Don't run clippy for old toolchain Some lints are unwanted and have been removed in newer versions, for example operator precedence warnings. Signed-off-by: Matt Johnston --- .github/workflows/ci.yml | 15 ++++++++++++--- ci/runtests.sh | 4 +++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b3a19b..b74345f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,9 +14,15 @@ jobs: ci: strategy: matrix: - # 1.82 is the earliest version that will build. Notice if it breaks, - # though MSRV may be bumped as needed. - rust_version: [stable, 1.82, nightly] + include: + - rust_version: stable + + # 1.82 is the earliest version that will build. Notice if it breaks, + # though MSRV may be bumped as needed. + - rust_version: 1.82 + no_clippy: yes + + - rust_version: nightly runs-on: ubuntu-latest @@ -41,4 +47,7 @@ jobs: rustup override set ${{ matrix.rust_version }} - name: Build and test ${{ matrix.rust_version }} + env: + # Older clippy has some unwanted lints, ignore them. + NO_CLIPPY: ${{ matrix.no_clippy }} run: ./ci/runtests.sh diff --git a/ci/runtests.sh b/ci/runtests.sh index 5aa9934..c54cfec 100755 --- a/ci/runtests.sh +++ b/ci/runtests.sh @@ -15,7 +15,9 @@ cargo fmt -- --check # Check everything first cargo check --all-targets --locked -cargo clippy --all-targets +if [ -z "NO_CLIPPY" ]; then + cargo clippy --all-targets +fi # stable, std cargo build --release From ca289c391ba08b392a1c309f78d476180e85ae27 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Thu, 7 Aug 2025 16:08:11 +0800 Subject: [PATCH 20/22] ci: Move minimum version to Rust 1.85 Tests will use async closures, so update the version. Signed-off-by: Matt Johnston --- .github/workflows/ci.yml | 4 ++-- mctp-estack/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b74345f..5c7e53d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,9 +17,9 @@ jobs: include: - rust_version: stable - # 1.82 is the earliest version that will build. Notice if it breaks, + # 1.85 is the earliest version that will build. Notice if it breaks, # though MSRV may be bumped as needed. - - rust_version: 1.82 + - rust_version: 1.85 no_clippy: yes - rust_version: nightly diff --git a/mctp-estack/Cargo.toml b/mctp-estack/Cargo.toml index 6266ff3..0652970 100644 --- a/mctp-estack/Cargo.toml +++ b/mctp-estack/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true repository.workspace = true categories = ["network-programming", "embedded", "hardware-support", "no-std"] -rust-version = "1.82" +rust-version = "1.85" [dependencies] crc = { workspace = true } From b583ad8c2927d09b390a6c54d8237bf335a7d5ee Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 5 Aug 2025 17:18:48 +0800 Subject: [PATCH 21/22] mctp-estack: Add StepExecutor for testing This is a specialised executor to run async futures with more control. Can be used to partially run a future such as recv(), then adjust the clock before proceeding, when testing timeouts. Signed-off-by: Matt Johnston --- Cargo.lock | 25 + mctp-estack/Cargo.toml | 2 + mctp-estack/tests/test_step_executor.rs | 666 ++++++++++++++++++++++++ 3 files changed, 693 insertions(+) create mode 100644 mctp-estack/tests/test_step_executor.rs diff --git a/Cargo.lock b/Cargo.lock index 580711c..4bb933e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -710,6 +711,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -729,6 +741,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -750,6 +773,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -898,6 +922,7 @@ dependencies = [ "embedded-io-adapters", "embedded-io-async", "env_logger", + "futures", "heapless", "log", "mctp", diff --git a/mctp-estack/Cargo.toml b/mctp-estack/Cargo.toml index 0652970..52e4563 100644 --- a/mctp-estack/Cargo.toml +++ b/mctp-estack/Cargo.toml @@ -23,9 +23,11 @@ uuid = { workspace = true } critical-section = { version = "1.2.0", features = ["std"] } embedded-io-adapters = { workspace = true } env_logger = { workspace = true } +futures = "0.3" proptest = { workspace = true } smol = { workspace = true } + [features] default = ["log"] std = ["mctp/std"] diff --git a/mctp-estack/tests/test_step_executor.rs b/mctp-estack/tests/test_step_executor.rs new file mode 100644 index 0000000..be55027 --- /dev/null +++ b/mctp-estack/tests/test_step_executor.rs @@ -0,0 +1,666 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 +/* + * Copyright (c) 2025 Code Construct + */ + +//! A specialised executor for testing. +//! +//! The `StepExecutor` allows run-until-idle, and also +//! also ensures that each task runs with a distinct waker. +//! +//! Currently this doesn't have a mechanism to run beneath other executors, +//! which would be required to interface with external IO triggers. +//! That could be added in future by proxying `wake()` calls to a parent executor. + +#[allow(unused)] +use log::{debug, error, info, trace, warn}; + +use core::future::poll_fn; +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Mutex; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, Wake, Waker}; + +/// A test executor +/// +/// This allows control of task execution. +/// The executor is not designed to be efficient. +#[derive(Debug)] +pub struct StepExecutor<'a> { + /// Futures that run at any time + tasks: Vec<(Task<'a>, Arc)>, + + /// Futures that only run when requested + sub_tasks: SubTaskRunner, + + /// A counter, incremented by any wake in any task or subtask. + /// + /// Used for pessimistic modification detection. Won't overflow. + counter: Arc, +} + +impl<'a> StepExecutor<'a> { + pub fn new() -> Self { + let counter = Arc::new(AtomicU64::new(1)); + Self { + tasks: Vec::new(), + sub_tasks: SubTaskRunner { + penders: Default::default(), + counter: counter.clone(), + }, + counter, + } + } + + /// Retrieves a `SubTaskRunner`. + /// + /// That can be used to start subtasks, and wait for idleness. + pub fn sub_runner(&self) -> SubTaskRunner { + self.sub_tasks.clone() + } + + /// Adds a future that will run at any time. + /// + /// The future will run to completion, with output discarded. + pub fn add + 'a + Sync + Send, R>(&mut self, f: F) { + let task = Task::new(f); + let pender = Arc::new(Pender::new(self.counter.clone())); + trace!( + "Add task {}. New pender {}", + self.tasks.len(), + pender.debug_id() + ); + self.tasks.push((task, pender)); + } + + /// Adds a future and runs the executor until the future completes. + /// + /// Returns an error if the executor went idle before the future completes. + /// + /// TODO: eventually it might be nice to take an `AsyncFnOnce(SubTaskRunner)` + /// argument instead, but at present (Rust 1.89) they can't be expressed as + /// `Sync+Send`. + pub fn run_to_completion(&mut self, f: F) -> Result<(), IterLimit> + where + F: Future + 'a + Sync + Send, + { + self.add(f); + let (_t, pender) = self.tasks.last().unwrap(); + + // Keep running until the task is removed. + let pender = Arc::downgrade(pender); + self.until_idle(); + if pender.strong_count() > 0 { + trace!("Future didn't complete"); + Err(IterLimit) + } else { + Ok(()) + } + } + + fn all_main_idle(&self) -> bool { + self.tasks.iter().all(|(_t, p)| !p.is_pending()) + } + + /// Run the executor until no more forward progress can be made. + /// + /// It is possible for this to never complete if task(s) are continually + /// waking. + pub fn until_idle(&mut self) { + // A single CPU can't count to u64::MAX + self.until_idle_limit_iter(u64::MAX).unwrap() + } + + /// Run the executor until all tasks complete + pub fn run_all(mut self) { + while !self.tasks.is_empty() { + self.until_idle(); + } + } + + /// The same as until_idle but with an iteration limit + /// + /// Can be used when testing for expected infinite loops. + /// Note that scheduler order is undefined, so even finite loops + /// could possibly run for a very long time. + /// + /// Returns Ok((()) if idle, Err(IterLimit) on iteration limit hit. + pub fn until_idle_limit_iter( + &mut self, + iters: u64, + ) -> Result<(), IterLimit> { + '_outer: for _ in 0..iters { + trace!("loop top"); + // Check whether all tasks have completed, prior to looking for + // idle waiters. + let main_idle = self.all_main_idle(); + + let mut sub_idle = true; + // Iterate over all the live subtasks to find ones where + // another task is waiting for idleness. + for p in self + .sub_tasks + .penders + .lock() + .unwrap() + .iter() + .filter_map(|p| p.upgrade()) + { + p.with(|p| { + while let Some(p) = p.subtask_idle_waiters.pop() { + sub_idle = false; + p.wake(); + } + p.subtask_idle = main_idle; + // Read the counter after waking idle waiters. Otherwise + // subtask_idle_counter will always be outdated (wake() increments + // the global counter). + // + // There is still the chance that a future wake + // makes the counter outdated before being compared, + // but in that case another 'outer loop will occur and will eventually + // run with a not-outdated counter. + let now = self.counter.load(Ordering::SeqCst); + p.subtask_idle_counter = now; + }) + } + + trace!("main_idle {main_idle:?} sub_idle {sub_idle:?}"); + + if main_idle && sub_idle { + // Reached quiescent state. + return Ok(()); + } + + // Run tasks, removing completed ones. + self.tasks.retain_mut(|(t, pender)| { + // Clear pending flag + if pender.clear() { + // Poll pending tasks + let w = Waker::from(pender.clone()); + let mut cx = Context::from_waker(&w); + if t.fut.poll_unpin(&mut cx).is_ready() { + // Task is done, remove it. + return false; + } + } + true + }); + } + Err(IterLimit) + } +} + +impl Default for StepExecutor<'_> { + fn default() -> Self { + Self::new() + } +} + +/// Error returned when an iteration limit is reached. +#[derive(Debug)] +pub struct IterLimit; + +/// Represents the wakeup state of a task. +/// +/// An `Arc` is passed as a "task handle", and is convertible to/from a `Waker` +/// via `StepExecutor::task_from_waker()`. `PenderHandle` implements hash etc as a newtype. +#[derive(Debug)] +struct Pender { + inner: Mutex, + counter: Arc, +} + +impl Pender { + fn new(counter: Arc) -> Self { + trace!("Pender new, counter {}", counter.load(Ordering::SeqCst)); + assert_ne!(counter.load(Ordering::SeqCst), 0); + let inner = PenderInner { + // New tasks are pending to poll at least once. + pending: true, + subtask_idle_waiters: Vec::new(), + subtask_idle: false, + subtask_idle_counter: 0, + }; + Self { + inner: Mutex::new(inner), + counter, + } + } + + fn debug_id(self: &Arc) -> String { + format!("Pender({:#x?})", Arc::as_ptr(self)) + } + + fn with(&self, mut f: F) -> R + where + F: FnMut(&mut PenderInner) -> R, + { + let mut p = self.inner.lock().unwrap(); + f(&mut p) + } + + /// Clears the pending bit, returns the previous value. + fn clear(&self) -> bool { + self.with(|p| core::mem::replace(&mut p.pending, false)) + } + + fn is_pending(&self) -> bool { + self.with(|p| p.pending) + } + + fn read_counter(&self) -> u64 { + self.counter.load(Ordering::SeqCst) + } +} + +impl Wake for Pender { + fn wake(self: Arc) { + trace!("wake {}", self.debug_id()); + self.counter.fetch_add(1, Ordering::SeqCst); + self.with(|p| { + p.pending = true; + // Wake any parent waiters of a subtask + for wt in &p.subtask_idle_waiters { + wt.wake_by_ref(); + } + }) + } +} + +#[derive(Debug)] +struct PenderInner { + /// Only set by wake() + pending: bool, + + /// For subtasks, a list of other tasks waiting for this to go idle. + subtask_idle_waiters: Vec, + + /// Set true when idle. + /// + /// Is set pessemistically. Should be discarded if counter has advanced + /// past subtask_idle_counter. + subtask_idle: bool, + + /// Counter value when subtask_idle was set, to track staleness. + subtask_idle_counter: u64, +} + +/// State shared between `StepExecutor` and tasks. +#[derive(Clone, Debug)] +pub struct SubTaskRunner { + penders: Arc>>>, + counter: Arc, +} + +impl SubTaskRunner { + /// Starts a subtask. + pub fn start<'a, F: Future + 'a + Sync + Send, R>( + &self, + f: F, + ) -> SubTask<'a, R> { + trace!("Start, counter {}", self.counter.load(Ordering::SeqCst)); + let st = SubTask::new(f, self.counter.clone()); + self.penders + .lock() + .unwrap() + .push(Arc::downgrade(&st.pender)); + st + } + + /// Starts a subtask and runs it until all tasks are idle. + /// + /// Returns `Ok` if it goes idle, or `Err` with the result if it completes early. + pub async fn start_until_idle< + 'a, + F: Future + 'a + Sync + Send, + R, + >( + &self, + f: F, + ) -> Result, R> { + let mut st = self.start(f); + match st.run_until_idle().await { + Some(r) => Err(r), + None => Ok(st), + } + } + + /// Waits for all tasks to be idle. + pub async fn wait_idle(&self) { + self.start_until_idle(smol::future::pending::<()>()) + .await + .unwrap(); + } +} + +pub struct SubTask<'a, R> { + // See Sync+Send comment on `struct Task` + fut: Pin + 'a + Sync + Send>>, + // Has a corresponding pender in StepExecutor's sub_tasks + pender: Arc, + + // Task is complete. + done: bool, +} + +impl core::fmt::Debug for SubTask<'_, R> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SubTask") + .field("pender", &self.pender) + .field("done", &self.done) + .finish_non_exhaustive() + } +} + +impl<'a, R> SubTask<'a, R> { + /// Constructed via SubTaskList::start() + fn new + 'a + Sync + Send>( + fut: F, + counter: Arc, + ) -> Self { + let fut = Box::pin(fut); + Self { + fut, + pender: Arc::new(Pender::new(counter)), + done: false, + } + } + + /// Called by another task to progress this task, until all tasks go idle. + /// + /// Returns Some if the subtask completes, None otherwise. + /// + /// Panics if called after `SubTask` completion. + /// `StepExecutor` will later panic if `SubTask::run_until_idle` + /// is called from a different executor. + /// + /// Note: This may only be called by main tasks (ones started with + /// `StepExecutor::add()`). Subtasks will currently panic if they call + /// run_until_idle(). In future that could be added. + pub async fn run_until_idle(&mut self) -> Option { + if self.done { + panic!("Can't run_until_idle() after completion"); + } + + poll_fn(|cx| { + self.pender.clear(); + trace!("run_until_idle prepoll"); + if let Poll::Ready(r) = self.fut.poll_unpin(cx) { + // The subtask completed + trace!("run_until_idle ready {:?}", cx.waker()); + self.done = true; + return Poll::Ready(Some(r)); + } + + let (idle, idle_counter) = self + .pender + .with(|p| (p.subtask_idle, p.subtask_idle_counter)); + let now = self.pender.read_counter(); + if idle && idle_counter == now { + // All tasks were idle. + trace!("run_until_idle idle {:?}", cx.waker()); + return Poll::Ready(None); + } + trace!("idle {idle:?} idle_counter {idle_counter} now {now}"); + + // StepExecutor will periodically uniquify the list TODO check. + trace!("run_until_idle push {:?}", cx.waker()); + self.pender + .with(|p| p.subtask_idle_waiters.push(cx.waker().clone())); + Poll::Pending + }) + .await + } +} + +/// Poll the task's future to completion. +impl Future for SubTask<'_, R> { + type Output = R; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + if self.done { + panic!("Can't await after completion"); + } + + self.pender.clear(); + let r = self.fut.poll_unpin(cx); + self.done = r.is_ready(); + r + } +} + +/// Holds a pinned boxed future. +/// +/// The future's output is discarded on completion. +/// +/// TODO: Currently this has `Sync + Send` bounds (easier to relax later), +/// but they're expected to run in a single threaded `StepExecutor` instance. +/// Is there a use case for `Sync + Send` (sending to other executors?), or should +/// the bounds be removed? +struct Task<'a> { + fut: Pin + 'a + Sync + Send>>, +} + +impl core::fmt::Debug for Task<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Task").finish_non_exhaustive() + } +} + +impl<'a> Task<'a> { + fn new + 'a + Sync + Send, R>(fut: F) -> Self { + // Convert the future to return () + let fut = async { + fut.await; + }; + + let fut = Box::pin(fut); + Self { fut } + } +} + +fn start_log() { + let _ = env_logger::Builder::new() + .filter(None, log::LevelFilter::Trace) + .is_test(true) + .try_init(); +} + +#[test] +fn step_executor_1() { + start_log(); + + let mut ex = StepExecutor::default(); + let (ex1_s, ex1_r) = smol::channel::bounded(1); + let (ex2_s, ex2_r) = smol::channel::bounded(1); + let (ex3_s, ex3_r) = smol::channel::bounded(1); + let (sub_s, sub_r) = smol::channel::bounded(1); + + trace!("top"); + ex.add(async move { + for i in 0..5 { + trace!("ex1 {i}"); + smol::future::yield_now().await; + } + ex1_s.try_send(()).unwrap(); + + smol::future::pending::<()>().await; + unreachable!(); + }); + + ex.add(async move { + for i in 0..10 { + trace!("ex2 {i}"); + smol::future::yield_now().await; + } + ex2_s.try_send(()).unwrap(); + }); + + let sub = ex.sub_runner(); + + ex.add(async move { + let f = async { + for i in 0..10 { + trace!("f {i}"); + smol::future::yield_now().await; + smol::future::yield_now().await; + smol::future::yield_now().await; + } + sub_s.try_send(5).unwrap(); + smol::future::pending::().await; + }; + + let mut f = sub.start(f); + match f.run_until_idle().await { + Some(r) => { + info!("sub wait result {r:?}"); + panic!("No result should be returned"); + } + None => info!("sub wait idle"), + } + ex3_s.try_send(()).unwrap(); + + // let t = ex. + }); + + assert!(ex1_r.try_recv().is_err()); + assert!(ex2_r.try_recv().is_err()); + assert!(ex3_r.try_recv().is_err()); + + ex.until_idle(); + + assert!(ex1_r.try_recv().is_ok()); + assert!(ex2_r.try_recv().is_ok()); + assert!(ex3_r.try_recv().is_ok()); + assert!(sub_r.try_recv().is_ok()); +} + +#[test] +fn step_executor_subtask_yield() { + start_log(); + + let mut ex = StepExecutor::default(); + let (ex3_s, ex3_r) = smol::channel::bounded(1); + + let sub = ex.sub_runner(); + + ex.add(async move { + let f = async { + for i in 0..10 { + trace!("f {i}"); + smol::future::yield_now().await; + smol::future::yield_now().await; + smol::future::yield_now().await; + } + "the end." + }; + + let mut f = sub.start(f); + let r = f.run_until_idle().await; + assert_eq!(r, Some("the end.")); + ex3_s.try_send(()).unwrap(); + }); + + assert!(ex3_r.try_recv().is_err()); + + ex.until_idle(); + + assert!(ex3_r.try_recv().is_ok()); +} + +#[test] +fn step_executor_subtask_pending() { + start_log(); + + let mut ex = StepExecutor::default(); + let (ex3_s, ex3_r) = smol::channel::bounded(1); + + let sub = ex.sub_runner(); + + ex.add(async move { + let f = async { + smol::future::yield_now().await; + smol::future::pending::().await + }; + + let mut f = sub.start(f); + let r = f.run_until_idle().await; + assert!(r.is_none()); + ex3_s.try_send("done").unwrap(); + }); + + assert!(ex3_r.try_recv().is_err()); + + ex.until_idle(); + + assert_eq!(ex3_r.try_recv(), Ok("done")) +} + +#[test] +fn step_executor_subtask_busy() { + start_log(); + + let mut ex = StepExecutor::default(); + + let sub = ex.sub_runner(); + + // Add an unrelated main task that is never idle + ex.add(async move { + for _ in 0..10 { + smol::future::yield_now().await; + } + }); + + ex.add(async move { + let f = async { + loop { + smol::future::yield_now().await; + } + }; + + let mut f = sub.start(f); + f.run_until_idle().await; + unreachable!() + }); + + let run = ex.until_idle_limit_iter(1000); + assert!(run.is_err(), "the subtask shouldn't return return"); +} + +#[test] +fn step_executor_subtask_other_busy() { + start_log(); + + let mut ex = StepExecutor::default(); + + let sub = ex.sub_runner(); + + // Add an unrelated main task that is never idle + ex.add(async move { + loop { + smol::future::yield_now().await; + } + }); + + ex.add(async move { + let f = async { + for _ in 0..5 { + smol::future::yield_now().await; + } + "done" + }; + + let mut f = sub.start(f); + assert_eq!(f.run_until_idle().await, Some("done")); + }); + + let run = ex.until_idle_limit_iter(1000); + assert!(run.is_err(), "the first loop shouldn't return"); +} From 88c9eba20c6ac62025438bb7ca9b9578c9f2679c Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 29 Jul 2025 12:37:42 +0800 Subject: [PATCH 22/22] mctp-estack: Add Router roundtrip tests These set up two router peers and test various send/receive scenarios. Signed-off-by: Matt Johnston --- mctp-estack/tests/roundtrip.rs | 534 +++++++++++++++++++++++++++++++++ 1 file changed, 534 insertions(+) create mode 100644 mctp-estack/tests/roundtrip.rs diff --git a/mctp-estack/tests/roundtrip.rs b/mctp-estack/tests/roundtrip.rs new file mode 100644 index 0000000..9a78cd1 --- /dev/null +++ b/mctp-estack/tests/roundtrip.rs @@ -0,0 +1,534 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 +/* + * Copyright (c) 2025 Code Construct + */ + +#[allow(unused)] +use log::{debug, error, info, trace, warn}; + +use mctp::{Eid, MsgType}; + +use mctp::{AsyncListener, AsyncReqChannel, AsyncRespChannel}; +use mctp_estack::config::NUM_RECEIVE; +use mctp_estack::router::{ + Port, PortId, PortLookup, PortTop, RouterAsyncReqChannel, +}; +use mctp_estack::{config, Router}; + +use futures::{select, FutureExt}; +use std::collections::VecDeque; +use std::future::Future; + +pub mod test_step_executor; +use test_step_executor::{StepExecutor, SubTaskRunner}; + +fn start_log() { + let _ = env_logger::Builder::new() + .filter(None, log::LevelFilter::Trace) + .is_test(true) + .try_init(); +} + +/// Always routes out port 0 +#[derive(Default)] +struct DefaultRoute; + +impl PortLookup for DefaultRoute { + fn by_eid( + &self, + _eid: Eid, + _src_port: Option, + ) -> (Option, Option) { + (Some(PortId(0)), None) + } +} + +const DEFAULT_LOOKUP: DefaultRoute = DefaultRoute; + +/// Formats hex plus printable ascii +struct HexFmt<'a>(&'a [u8]); +impl core::fmt::Debug for HexFmt<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:02x?} ", self.0)?; + for c in self.0 { + if c.is_ascii() && !c.is_ascii_control() { + write!(f, "{}", char::from(*c))?; + } else { + write!(f, ".")?; + } + } + Ok(()) + } +} + +async fn receive_loop( + name: &str, + router: &Router<'_>, + p: PortId, + mut port: Port<'_>, +) -> ! { + loop { + let (pkt, _eid) = port.outbound().await; + trace!("rx {name} {:?}", HexFmt(pkt)); + router.inbound(pkt, p).await; + port.outbound_done(); + } +} + +/// Run a loop forwarding packets between routera and routerb. +/// +/// The `test` Future runs to completion. +async fn router_loop(routera: &Router<'_>, routerb: &Router<'_>) -> ! { + let port1 = routera.port(PortId(0)).unwrap(); + let port2 = routerb.port(PortId(0)).unwrap(); + let a = receive_loop("a", routerb, port1.id(), port1); + let b = receive_loop("b", routera, port2.id(), port2); + + select! { + _ = a.fuse() => unreachable!(), + _ = b.fuse() => unreachable!(), + } +} + +fn run(routera: &Router<'_>, routerb: &Router<'_>, test: F) -> R +where + F: Future, +{ + let pktloop = router_loop(routera, routerb); + smol::block_on(async { + select! { + res = test.fuse() => { + info!("Finished"); + res + } + _ = pktloop.fuse() => unreachable!(), + } + }) +} + +fn routers<'r>( + tops: &'r mut [PortTop], + lookup1: &'r dyn PortLookup, + lookup2: &'r dyn PortLookup, +) -> (Router<'r>, Router<'r>) { + start_log(); + let mut p = tops.iter_mut(); + let mut routera = Router::new(Eid(10), lookup1, 0); + let mut routerb = Router::new(Eid(20), lookup2, 0); + routera.add_port(p.next().unwrap()).unwrap(); + routerb.add_port(p.next().unwrap()).unwrap(); + (routera, routerb) +} + +/// Simple request/response +#[test] +fn router_requests() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + run(&routera, &routerb, async { + let typ = MsgType(0x33); + let mut buf = [0u8; 1000]; + + let mut lista = routera.listener(typ).unwrap(); + + let mut reqb = routerb.req(routera.get_eid().await); + reqb.send(typ, b"first").await.unwrap(); + reqb.send(typ, b"second").await.unwrap(); + + // check first request + let (_t, _ic, payload, _resp) = lista.recv(&mut buf).await.unwrap(); + assert_eq!(payload, b"first"); + + // respond only to the second request + let (_t, _ic, payload, mut resp) = lista.recv(&mut buf).await.unwrap(); + assert_eq!(payload, b"second"); + resp.send(b"reply2").await.unwrap(); + let (_t, _ic, payload) = reqb.recv(&mut buf).await.unwrap(); + assert_eq!(payload, b"reply2"); + }); +} + +/// Test a requester with tag_noexpire() +#[test] +fn router_noexpire() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + run(&routera, &routerb, async { + let typ = MsgType(0x33); + let mut buf = [0u8; 1000]; + + let mut lista = routera.listener(typ).unwrap(); + let mut reqb = routerb.req(routera.get_eid().await); + reqb.tag_noexpire().unwrap(); + + let mut counter = 0; + + // Greater iteration count than tag limit. + for _ in 0..20 { + // Request + for s in 0..config::NUM_RECEIVE { + let msg = format!("req-{}", counter + s).into_bytes(); + reqb.send(typ, &msg).await.unwrap(); + } + + // Listener receive the request + let mut resps = VecDeque::new(); + for s in 0..config::NUM_RECEIVE { + let msg = format!("req-{}", counter + s).into_bytes(); + let (_t, _ic, payload, resp) = + lista.recv(&mut buf).await.unwrap(); + assert_eq!(payload, msg); + resps.push_front(resp); + } + + // Listener respond + for s in 0..config::NUM_RECEIVE { + let msg = format!("resp-{}", counter + s).into_bytes(); + let mut resp = resps.pop_front().unwrap(); + resp.send(&msg).await.unwrap(); + } + + // Check the response + for s in 0..config::NUM_RECEIVE { + let msg = format!("resp-{}", counter + s).into_bytes(); + let (_t, _ic, payload) = reqb.recv(&mut buf).await.unwrap(); + assert_eq!(payload, msg); + } + counter += config::NUM_RECEIVE; + } + }); +} + +#[test] +fn router_listener_timeout() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + let mut ex = StepExecutor::default(); + ex.add(router_loop(&routera, &routerb)); + + let test = async |sub: SubTaskRunner| { + let mut now = 0; + let typ = MsgType(0x33); + let mut buf = [0u8; 1000]; + + let mut lista = routera.listener(typ).unwrap(); + lista.set_timeout(Some(1000)); + + let mut reqb = routerb.req(routera.get_eid().await); + + // timed out case + let recv_task = + sub.start_until_idle(lista.recv(&mut buf)).await.unwrap(); + + // Send the message after the recv timeout has elapsed. + now += 1000; + routera.update_time(now).await.unwrap(); + reqb.send(typ, b"late").await.unwrap(); + + let r = recv_task.await; + assert!(matches!(r, Err(mctp::Error::TimedOut))); + + // subsequent recv on the listener receives it + let (_typ, _ic, payload, _resp) = lista.recv(&mut buf).await.unwrap(); + assert_eq!(payload, b"late"); + + // not timed out case + let recv_task = + sub.start_until_idle(lista.recv(&mut buf)).await.unwrap(); + + // Send the message before the recv timeout has elapsed. + now += 999; + routera.update_time(now).await.unwrap(); + reqb.send(typ, b"in the nick of time").await.unwrap(); + let (_typ, _ic, payload, _resp) = recv_task.await.unwrap(); + assert_eq!(payload, b"in the nick of time"); + }; + ex.add(test(ex.sub_runner())); + ex.until_idle(); +} + +#[test] +fn router_req_timeout() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + let mut ex = StepExecutor::default(); + ex.add(router_loop(&routera, &routerb)); + + let test = async |sub: SubTaskRunner| { + let mut now = 0; + let typ = MsgType(0x33); + let mut bufa = [0u8; 1000]; + let mut bufb = [0u8; 1000]; + + let mut lista = routera.listener(typ).unwrap(); + + let mut reqb = routerb.req(routera.get_eid().await); + reqb.set_timeout(Some(1000)); + + info!("timed out case"); + reqb.send(typ, b"req").await.unwrap(); + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufa).await.unwrap(); + + let recv_task = + sub.start_until_idle(reqb.recv(&mut bufb)).await.unwrap(); + + now += 1000; + routerb.update_time(now).await.unwrap(); + trace!("new now {now}"); + let r = recv_task.await; + assert!( + matches!(r, Err(mctp::Error::TimedOut)), + "no response was sent" + ); + + info!("subsequent recv gets it"); + resp.send(b"later").await.unwrap(); + let (_typ, _ic, payload) = reqb.recv(&mut bufb).await.unwrap(); + assert_eq!(payload, b"later"); + + info!("late message"); + reqb.send(typ, b"req").await.unwrap(); + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufa).await.unwrap(); + + let recv_task = + sub.start_until_idle(reqb.recv(&mut bufb)).await.unwrap(); + + now += 1000; + routerb.update_time(now).await.unwrap(); + + // Send the message after the recv timeout has elapsed. + resp.send(b"late").await.unwrap(); + + // Ensure the stack receives it it before recv() runs. + sub.wait_idle().await; + + let (_typ, _ic, payload) = recv_task.await.unwrap(); + assert_eq!(payload, b"late"); + + info!("A new cycle succeeds within timeout."); + reqb.send(typ, b"req").await.unwrap(); + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufa).await.unwrap(); + + let recv_task = + sub.start_until_idle(reqb.recv(&mut bufb)).await.unwrap(); + + // Before elapsed + now += 999; + routerb.update_time(now).await.unwrap(); + resp.send(b"made it").await.unwrap(); + + let (_typ, _ic, payload) = recv_task.await.unwrap(); + assert_eq!(payload, b"made it"); + }; + ex.add(test(ex.sub_runner())); + ex.until_idle(); +} + +#[test] +fn router_reassembly_timeout() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + let mut ex = StepExecutor::default(); + ex.add(router_loop(&routera, &routerb)); + + let test = async |sub: SubTaskRunner| { + let mut now = 0; + let typ = MsgType(0x33); + let mut bufa = [0u8; 1000]; + let mut bufb = [0u8; 1000]; + + // 3000 is within the timeout, 8000 is past it. + for delay in [3000, 8000] { + let mut lista = routera.listener(typ).unwrap(); + + // B -> A + let mut reqb = routerb.req(routera.get_eid().await); + reqb.send(typ, b"test").await.unwrap(); + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufa).await.unwrap(); + + now += delay + 100; + routerb.update_time(now).await.unwrap(); + + sub.wait_idle().await; + // A -> B + resp.send(b"response").await.unwrap(); + + let r = reqb.recv(&mut bufb).await; + warn!("delay {delay} r {r:?}"); + // REASSEMBLY_EXPIRY_TIMEOUT + if delay > 6000 { + assert!( + matches!(r, Err(mctp::Error::TimedOut)), + "packet shouldn't be received" + ); + } else { + assert!(r.is_ok()) + } + } + }; + ex.add(test(ex.sub_runner())); + ex.until_idle(); +} + +// Tests that retain()ed messages don't expire. +#[test] +fn router_retain_timeout() { + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + let mut ex = StepExecutor::default(); + ex.add(router_loop(&routera, &routerb)); + + let test = async |sub: SubTaskRunner| { + let mut now = 0; + let typ = MsgType(0x33); + let mut bufa = [0u8; 1000]; + let mut bufb = [0u8; 1000]; + + let mut lista = routera.listener(typ).unwrap(); + + let mut reqb = routerb.req(routera.get_eid().await); + reqb.send(typ, b"req").await.unwrap(); + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufa).await.unwrap(); + + resp.send(b"response").await.unwrap(); + + sub.wait_idle().await; + // routerb should now have retain()ed the received message + + // increment time possibly past reassembly timeout + now += 100_000; + routerb.update_time(now).await.unwrap(); + + let r = reqb.recv(&mut bufb).await; + assert!(r.is_ok()) + }; + ex.run_to_completion(test(ex.sub_runner())).unwrap(); +} + +// Checks that dropping reqchannels when full allows further ones to be created +#[test] +fn router_drop_cleanup() { + #![allow(clippy::needless_range_loop)] + let mut tops = [PortTop::new(), PortTop::new()]; + let (routera, routerb) = + routers(&mut tops, &DEFAULT_LOOKUP, &DEFAULT_LOOKUP); + + let mut ex = StepExecutor::default(); + ex.add(router_loop(&routera, &routerb)); + + let test = async |sub: SubTaskRunner| { + let mut now = 0; + let typ = MsgType(0x33); + let mut bufa = [0u8; 1000]; + let mut bufb = [0u8; 1000]; + + let mut send = async |n: usize| { + let mut lista = routera.listener(typ).unwrap(); + let mut reqb = routerb.req(routera.get_eid().await); + reqb.set_timeout(Some(1000)); + reqb.send(typ, b"req").await.unwrap(); + + // Send a response, it will get queued in routerb's Stack reassemblers. + let (_typ, _ic, _payload, mut resp) = + lista.recv(&mut bufb).await.unwrap(); + resp.send(&n.to_be_bytes()).await.unwrap(); + reqb + }; + + let mut recv = async |req: &mut RouterAsyncReqChannel| { + let mut recv_task = sub.start(req.recv(&mut bufa)); + match recv_task.run_until_idle().await { + // Already completed + Some(r) => r, + None => { + // Run to completion. + // Expire the timeout to avoid hangs + now += 2000; + routerb.update_time(now).await.unwrap(); + recv_task.await + } + } + .map(|(_typ, _ic, payload)| payload.to_vec()) + }; + + //// Test running out of receivers. Last ones are dropped. + { + let mut reqs = Vec::new(); + // Fill all the reassemblers. N+1 would be adequate, N+3 just to see. + for n in 0..config::NUM_RECEIVE + 3 { + reqs.push(send(n).await); + } + + // Ensure they all reached the reassemblers. + sub.wait_idle().await; + + // Test that the last one is dropped + for n in 0..config::NUM_RECEIVE + 3 { + let res = recv(&mut reqs[n]).await; + + if n < config::NUM_RECEIVE { + assert_eq!(res.unwrap(), &n.to_be_bytes()); + } else { + // Last message is dropped + assert!(matches!(res, Err(mctp::Error::TimedOut))); + } + } + } + + //// Test running out of receivers, then drop channels to allow more messages. + { + let mut reqs = Vec::new(); + let skip = [2, 5]; + for n in 0..config::NUM_RECEIVE + 3 { + let r = send(n).await; + if skip.contains(&n) { + // Drop reqs 2 and 5 + reqs.push(None); + } else { + reqs.push(Some(r)); + }; + } + + // Ensure they all reached the reassemblers. + sub.wait_idle().await; + + // Test that all the non-dropped ones worked, apart from the last one which was full. + for n in 0..config::NUM_RECEIVE + 3 { + trace!("recv {n}"); + + let Some(req) = &mut reqs[n] else { + assert!(skip.contains(&n)); + continue; + }; + assert!(!skip.contains(&n)); + + let res = recv(req).await; + + if n == NUM_RECEIVE + 3 - 1 { + // Last message is dropped + assert!(matches!(res, Err(mctp::Error::TimedOut))); + } else { + assert_eq!(res.unwrap(), &n.to_be_bytes()); + } + } + } + }; + ex.run_to_completion(test(ex.sub_runner())).unwrap(); +}