Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 10 additions & 87 deletions mctp-estack/src/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@

#[allow(unused)]
use crate::fmt::{debug, error, info, trace, warn};

use crate::{
AppCookie, MctpMessage, ReceiveHandle, SendOutput, Stack, MAX_PAYLOAD,
};
use mctp::{Eid, Error, MsgType, Result, Tag};
use mctp::{Error, Result};

use crc::Crc;
use heapless::Vec;
Expand All @@ -21,7 +17,7 @@ use embedded_io_async::{Read, Write};
const MCTP_SERIAL_REVISION: u8 = 0x01;

// Limited by u8 bytecount field, minus MCTP headers
const MCTP_SERIAL_MAXMTU: usize = 0xff - 4;
pub const MTU_MAX: usize = 0xff - 4;

// Received frame after unescaping. Bytes 1-N+1 in Figure 1 (serial protocol
// revision to frame check seq lsb)
Expand All @@ -33,9 +29,6 @@ const FRAMING_ESCAPE: u8 = 0x7d;
const FLAG_ESCAPED: u8 = 0x5e;
const ESCAPE_ESCAPED: u8 = 0x5d;

// 6 serial header/footer bytes, 0xff MCTP packet bytes
const TXFRAGBUF: usize = 6 + 0xff;

// Rx byte position in DSP0253 Table 1
// Indicates the expected position of the next read byte.
#[derive(Debug, PartialEq)]
Expand All @@ -57,9 +50,6 @@ pub struct MctpSerialHandler {
rxbuf: Vec<u8, MAX_RX>,
// Last-seen byte count field
rxcount: usize,

send_message: Vec<u8, MAX_PAYLOAD>,
send_fragment: [u8; TXFRAGBUF],
}

// https://www.rfc-editor.org/rfc/rfc1662
Expand All @@ -72,29 +62,13 @@ impl MctpSerialHandler {
rxpos: Pos::FrameSearch,
rxcount: 0,
rxbuf: Vec::new(),

send_message: Vec::new(),
send_fragment: [0u8; TXFRAGBUF],
}
}

/// Receive with a timeout.
pub async fn receive_async<'f>(
&mut self,
input: &mut impl Read,
mctp: &'f mut Stack,
) -> Result<Option<(MctpMessage<'f>, ReceiveHandle)>> {
let packet = self.read_frame_async(input).await?;
mctp.receive(packet)
}

/// Read a frame.
///
/// This is async cancel-safe.
async fn read_frame_async(
&mut self,
input: &mut impl Read,
) -> Result<&[u8]> {
pub async fn recv_async(&mut self, input: &mut impl Read) -> Result<&[u8]> {
// TODO: This reads one byte a time, might need a buffering wrapper
// for performance. Will require more thought about cancel-safety

Expand Down Expand Up @@ -211,65 +185,14 @@ impl MctpSerialHandler {
None
}

// Returns SendOutput::Complete or SendOutput::Error
pub async fn send_fill<F>(
pub async fn send_async(
&mut self,
eid: Eid,
typ: MsgType,
tag: Option<Tag>,
ic: bool,
cookie: Option<AppCookie>,
pkt: &[u8],
output: &mut impl Write,
mctp: &mut Stack,
fill_msg: F,
) -> SendOutput
where
F: FnOnce(&mut Vec<u8, MAX_PAYLOAD>) -> Option<()>,
{
// Fetch the message from input
self.send_message.clear();
if fill_msg(&mut self.send_message).is_none() {
return SendOutput::Error {
err: Error::Other,
cookie: None,
};
}

let mut fragmenter = match mctp.start_send(
eid,
typ,
tag,
true,
ic,
Some(MCTP_SERIAL_MAXMTU),
cookie,
) {
Ok(f) => f,
Err(err) => return SendOutput::Error { err, cookie: None },
};

loop {
let r = fragmenter
.fragment(&self.send_message, &mut self.send_fragment);
match r {
SendOutput::Packet(p) => {
trace!(
"packet len {} msg {}",
p.len(),
self.send_message.len()
);
// Write to serial
if let Err(_e) = Self::frame_to_serial(p, output).await {
trace!("Serial write error");
return SendOutput::Error {
err: Error::TxFailure,
cookie: None,
};
}
}
_ => return r.unborrowed().unwrap(),
}
}
) -> Result<()> {
Self::frame_to_serial(pkt, output)
.await
.map_err(|_e| Error::TxFailure)
}

async fn frame_to_serial<W>(
Expand Down Expand Up @@ -354,7 +277,7 @@ mod tests {

let mut h = MctpSerialHandler::new();
let mut s = FromFutures::new(esc.as_slice());
let packet = h.read_frame_async(&mut s).await.unwrap();
let packet = h.recv_async(&mut s).await.unwrap();
debug_assert_eq!(payload, packet);
}

Expand Down
75 changes: 41 additions & 34 deletions standalone/src/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,32 @@ impl<S: Read + Write> Inner<S> {
) -> Result<Tag> {
let _ = self.mctp.update(self.now());
let cookie = None;
let r = self.mctpserial.send_fill(
let mut fragmenter = self.mctp.start_send(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider revamping standalone to use Router and then serial.rs would be two simpler loops. That can be done later though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating doing that, but settled on the direct stack approach as a minimum change. Happy to convert later if Router makes more sense in general.

eid,
typ,
tag,
true,
integrity_check,
Some(mctp_estack::serial::MTU_MAX),
cookie,
&mut self.serial,
&mut self.mctp,
|v| {
for b in bufs {
v.extend_from_slice(b).ok()?
}
trace!("v len {}", v.len());
Some(())
},
);
)?;

let r = smol::block_on(r);
let mut tx_msg = Vec::new();
for buf in bufs {
tx_msg.extend_from_slice(buf);
}

match r {
SendOutput::Packet(_) => unreachable!(),
SendOutput::Complete { tag, .. } => Ok(tag),
SendOutput::Error { err, .. } => Err(err),
loop {
let mut tx_pkt = [0u8; mctp_estack::serial::MTU_MAX];
let r = fragmenter.fragment(&tx_msg, &mut tx_pkt);
match r {
SendOutput::Packet(p) => {
let fut = self.mctpserial.send_async(p, &mut self.serial);
smol::block_on(fut)?;
}
SendOutput::Complete { tag, .. } => break Ok(tag),
SendOutput::Error { err, .. } => break Err(err),
};
}
}

Expand All @@ -102,26 +105,30 @@ impl<S: Read + Write> Inner<S> {
loop {
let _ = self.mctp.update(self.now());

let r = self
.mctpserial
.receive_async(&mut self.serial, &mut self.mctp)
.or(async {
if let Some(deadline) = deadline {
Timer::at(deadline)
} else {
Timer::never()
}
.await;
Err(mctp::Error::TimedOut)
});
let r = self.mctpserial.recv_async(&mut self.serial).or(async {
if let Some(deadline) = deadline {
Timer::at(deadline)
} else {
Timer::never()
}
.await;
Err(mctp::Error::TimedOut)
});

let r = smol::block_on(r)?;
let pkt = smol::block_on(r)?;

if let Some((_msg, handle)) = r {
// Tricks here for loops+lifetimes.
// Could return (msg, handle) directly once Rust polonius merged.
let msg = self.mctp.fetch_message(&handle);
return Ok((msg, handle));
let r = self.mctp.receive(pkt);

match r {
Ok(Some((_msg, handle))) => {
// Tricks here for loops+lifetimes.
// Could return (msg, handle) directly once Rust polonius
// merged.
let msg = self.mctp.fetch_message(&handle);
break Ok((msg, handle));
}
Ok(None) => (),
Err(e) => break Err(e),
}
}
}
Expand Down