diff --git a/mctp-estack/src/serial.rs b/mctp-estack/src/serial.rs index 991b3c0..dc59e08 100644 --- a/mctp-estack/src/serial.rs +++ b/mctp-estack/src/serial.rs @@ -7,11 +7,7 @@ #[allow(unused)] use crate::fmt::{debug, error, info, trace, warn}; - -use crate::{ - AppCookie, MctpMessage, ReceiveHandle, SendOutput, Stack, MAX_PAYLOAD, -}; -use mctp::{Eid, Error, MsgType, Result, Tag}; +use mctp::{Error, Result}; use crc::Crc; use heapless::Vec; @@ -21,7 +17,7 @@ use embedded_io_async::{Read, Write}; const MCTP_SERIAL_REVISION: u8 = 0x01; // Limited by u8 bytecount field, minus MCTP headers -const MCTP_SERIAL_MAXMTU: usize = 0xff - 4; +pub const MTU_MAX: usize = 0xff - 4; // Received frame after unescaping. Bytes 1-N+1 in Figure 1 (serial protocol // revision to frame check seq lsb) @@ -33,9 +29,6 @@ const FRAMING_ESCAPE: u8 = 0x7d; const FLAG_ESCAPED: u8 = 0x5e; const ESCAPE_ESCAPED: u8 = 0x5d; -// 6 serial header/footer bytes, 0xff MCTP packet bytes -const TXFRAGBUF: usize = 6 + 0xff; - // Rx byte position in DSP0253 Table 1 // Indicates the expected position of the next read byte. #[derive(Debug, PartialEq)] @@ -57,9 +50,6 @@ pub struct MctpSerialHandler { rxbuf: Vec, // Last-seen byte count field rxcount: usize, - - send_message: Vec, - send_fragment: [u8; TXFRAGBUF], } // https://www.rfc-editor.org/rfc/rfc1662 @@ -72,29 +62,13 @@ impl MctpSerialHandler { rxpos: Pos::FrameSearch, rxcount: 0, rxbuf: Vec::new(), - - send_message: Vec::new(), - send_fragment: [0u8; TXFRAGBUF], } } - /// Receive with a timeout. - pub async fn receive_async<'f>( - &mut self, - input: &mut impl Read, - mctp: &'f mut Stack, - ) -> Result, ReceiveHandle)>> { - let packet = self.read_frame_async(input).await?; - mctp.receive(packet) - } - /// Read a frame. /// /// This is async cancel-safe. - async fn read_frame_async( - &mut self, - input: &mut impl Read, - ) -> Result<&[u8]> { + pub async fn recv_async(&mut self, input: &mut impl Read) -> Result<&[u8]> { // TODO: This reads one byte a time, might need a buffering wrapper // for performance. Will require more thought about cancel-safety @@ -211,65 +185,14 @@ impl MctpSerialHandler { None } - // Returns SendOutput::Complete or SendOutput::Error - pub async fn send_fill( + pub async fn send_async( &mut self, - eid: Eid, - typ: MsgType, - tag: Option, - ic: bool, - cookie: Option, + pkt: &[u8], output: &mut impl Write, - mctp: &mut Stack, - fill_msg: F, - ) -> SendOutput - where - F: FnOnce(&mut Vec) -> Option<()>, - { - // Fetch the message from input - self.send_message.clear(); - if fill_msg(&mut self.send_message).is_none() { - return SendOutput::Error { - err: Error::Other, - cookie: None, - }; - } - - let mut fragmenter = match mctp.start_send( - eid, - typ, - tag, - true, - ic, - Some(MCTP_SERIAL_MAXMTU), - cookie, - ) { - Ok(f) => f, - Err(err) => return SendOutput::Error { err, cookie: None }, - }; - - loop { - let r = fragmenter - .fragment(&self.send_message, &mut self.send_fragment); - match r { - SendOutput::Packet(p) => { - trace!( - "packet len {} msg {}", - p.len(), - self.send_message.len() - ); - // Write to serial - if let Err(_e) = Self::frame_to_serial(p, output).await { - trace!("Serial write error"); - return SendOutput::Error { - err: Error::TxFailure, - cookie: None, - }; - } - } - _ => return r.unborrowed().unwrap(), - } - } + ) -> Result<()> { + Self::frame_to_serial(pkt, output) + .await + .map_err(|_e| Error::TxFailure) } async fn frame_to_serial( @@ -354,7 +277,7 @@ mod tests { let mut h = MctpSerialHandler::new(); let mut s = FromFutures::new(esc.as_slice()); - let packet = h.read_frame_async(&mut s).await.unwrap(); + let packet = h.recv_async(&mut s).await.unwrap(); debug_assert_eq!(payload, packet); } diff --git a/standalone/src/serial.rs b/standalone/src/serial.rs index d0076d1..2434982 100644 --- a/standalone/src/serial.rs +++ b/standalone/src/serial.rs @@ -55,29 +55,32 @@ impl Inner { ) -> Result { let _ = self.mctp.update(self.now()); let cookie = None; - let r = self.mctpserial.send_fill( + let mut fragmenter = self.mctp.start_send( eid, typ, tag, + true, integrity_check, + Some(mctp_estack::serial::MTU_MAX), cookie, - &mut self.serial, - &mut self.mctp, - |v| { - for b in bufs { - v.extend_from_slice(b).ok()? - } - trace!("v len {}", v.len()); - Some(()) - }, - ); + )?; - let r = smol::block_on(r); + let mut tx_msg = Vec::new(); + for buf in bufs { + tx_msg.extend_from_slice(buf); + } - match r { - SendOutput::Packet(_) => unreachable!(), - SendOutput::Complete { tag, .. } => Ok(tag), - SendOutput::Error { err, .. } => Err(err), + loop { + let mut tx_pkt = [0u8; mctp_estack::serial::MTU_MAX]; + let r = fragmenter.fragment(&tx_msg, &mut tx_pkt); + match r { + SendOutput::Packet(p) => { + let fut = self.mctpserial.send_async(p, &mut self.serial); + smol::block_on(fut)?; + } + SendOutput::Complete { tag, .. } => break Ok(tag), + SendOutput::Error { err, .. } => break Err(err), + }; } } @@ -102,26 +105,30 @@ impl Inner { loop { let _ = self.mctp.update(self.now()); - let r = self - .mctpserial - .receive_async(&mut self.serial, &mut self.mctp) - .or(async { - if let Some(deadline) = deadline { - Timer::at(deadline) - } else { - Timer::never() - } - .await; - Err(mctp::Error::TimedOut) - }); + let r = self.mctpserial.recv_async(&mut self.serial).or(async { + if let Some(deadline) = deadline { + Timer::at(deadline) + } else { + Timer::never() + } + .await; + Err(mctp::Error::TimedOut) + }); - let r = smol::block_on(r)?; + let pkt = smol::block_on(r)?; - if let Some((_msg, handle)) = r { - // Tricks here for loops+lifetimes. - // Could return (msg, handle) directly once Rust polonius merged. - let msg = self.mctp.fetch_message(&handle); - return Ok((msg, handle)); + let r = self.mctp.receive(pkt); + + match r { + Ok(Some((_msg, handle))) => { + // Tricks here for loops+lifetimes. + // Could return (msg, handle) directly once Rust polonius + // merged. + let msg = self.mctp.fetch_message(&handle); + break Ok((msg, handle)); + } + Ok(None) => (), + Err(e) => break Err(e), } } }