Skip to content

Commit 0cc01b1

Browse files
jk-ozlabsmkj
authored andcommitted
estack: Convert serial transport to packet-transfer-style API
Currently, MctpSerialHandler's public API gets passed a mctp::Stack object, and interacts with that for packet receive/transmit. Newer-style end-usage of the stack objects tends to use the routing infrastructure though, which uses a simpler packet tx/rx interface to the transport (ie, populating [u8]s rather than handling the MCTP packets internally). This change converts the current interface to: pub async fn recv_async( &mut self, input: &mut impl Read, ) -> mctp::Result<&[u8]> { pub async fn send_async( &mut self, pkt: &[u8], output: &mut impl Write, ) -> mctp::Result<()> { - which is easier to glue into the routing interface. In doing this, convert the standalone crate to use the new MctpSerialHandler API, where it needs to own the mctp::Stack internally. Signed-off-by: Jeremy Kerr <[email protected]>
1 parent 742ca42 commit 0cc01b1

File tree

2 files changed

+51
-121
lines changed

2 files changed

+51
-121
lines changed

mctp-estack/src/serial.rs

Lines changed: 10 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@
77
88
#[allow(unused)]
99
use crate::fmt::{debug, error, info, trace, warn};
10-
11-
use crate::{
12-
AppCookie, MctpMessage, ReceiveHandle, SendOutput, Stack, MAX_PAYLOAD,
13-
};
14-
use mctp::{Eid, Error, MsgType, Result, Tag};
10+
use mctp::{Error, Result};
1511

1612
use crc::Crc;
1713
use heapless::Vec;
@@ -21,7 +17,7 @@ use embedded_io_async::{Read, Write};
2117
const MCTP_SERIAL_REVISION: u8 = 0x01;
2218

2319
// Limited by u8 bytecount field, minus MCTP headers
24-
const MCTP_SERIAL_MAXMTU: usize = 0xff - 4;
20+
pub const MTU_MAX: usize = 0xff - 4;
2521

2622
// Received frame after unescaping. Bytes 1-N+1 in Figure 1 (serial protocol
2723
// revision to frame check seq lsb)
@@ -33,9 +29,6 @@ const FRAMING_ESCAPE: u8 = 0x7d;
3329
const FLAG_ESCAPED: u8 = 0x5e;
3430
const ESCAPE_ESCAPED: u8 = 0x5d;
3531

36-
// 6 serial header/footer bytes, 0xff MCTP packet bytes
37-
const TXFRAGBUF: usize = 6 + 0xff;
38-
3932
// Rx byte position in DSP0253 Table 1
4033
// Indicates the expected position of the next read byte.
4134
#[derive(Debug, PartialEq)]
@@ -57,9 +50,6 @@ pub struct MctpSerialHandler {
5750
rxbuf: Vec<u8, MAX_RX>,
5851
// Last-seen byte count field
5952
rxcount: usize,
60-
61-
send_message: Vec<u8, MAX_PAYLOAD>,
62-
send_fragment: [u8; TXFRAGBUF],
6353
}
6454

6555
// https://www.rfc-editor.org/rfc/rfc1662
@@ -72,29 +62,13 @@ impl MctpSerialHandler {
7262
rxpos: Pos::FrameSearch,
7363
rxcount: 0,
7464
rxbuf: Vec::new(),
75-
76-
send_message: Vec::new(),
77-
send_fragment: [0u8; TXFRAGBUF],
7865
}
7966
}
8067

81-
/// Receive with a timeout.
82-
pub async fn receive_async<'f>(
83-
&mut self,
84-
input: &mut impl Read,
85-
mctp: &'f mut Stack,
86-
) -> Result<Option<(MctpMessage<'f>, ReceiveHandle)>> {
87-
let packet = self.read_frame_async(input).await?;
88-
mctp.receive(packet)
89-
}
90-
9168
/// Read a frame.
9269
///
9370
/// This is async cancel-safe.
94-
async fn read_frame_async(
95-
&mut self,
96-
input: &mut impl Read,
97-
) -> Result<&[u8]> {
71+
pub async fn recv_async(&mut self, input: &mut impl Read) -> Result<&[u8]> {
9872
// TODO: This reads one byte a time, might need a buffering wrapper
9973
// for performance. Will require more thought about cancel-safety
10074

@@ -211,65 +185,14 @@ impl MctpSerialHandler {
211185
None
212186
}
213187

214-
// Returns SendOutput::Complete or SendOutput::Error
215-
pub async fn send_fill<F>(
188+
pub async fn send_async(
216189
&mut self,
217-
eid: Eid,
218-
typ: MsgType,
219-
tag: Option<Tag>,
220-
ic: bool,
221-
cookie: Option<AppCookie>,
190+
pkt: &[u8],
222191
output: &mut impl Write,
223-
mctp: &mut Stack,
224-
fill_msg: F,
225-
) -> SendOutput
226-
where
227-
F: FnOnce(&mut Vec<u8, MAX_PAYLOAD>) -> Option<()>,
228-
{
229-
// Fetch the message from input
230-
self.send_message.clear();
231-
if fill_msg(&mut self.send_message).is_none() {
232-
return SendOutput::Error {
233-
err: Error::Other,
234-
cookie: None,
235-
};
236-
}
237-
238-
let mut fragmenter = match mctp.start_send(
239-
eid,
240-
typ,
241-
tag,
242-
true,
243-
ic,
244-
Some(MCTP_SERIAL_MAXMTU),
245-
cookie,
246-
) {
247-
Ok(f) => f,
248-
Err(err) => return SendOutput::Error { err, cookie: None },
249-
};
250-
251-
loop {
252-
let r = fragmenter
253-
.fragment(&self.send_message, &mut self.send_fragment);
254-
match r {
255-
SendOutput::Packet(p) => {
256-
trace!(
257-
"packet len {} msg {}",
258-
p.len(),
259-
self.send_message.len()
260-
);
261-
// Write to serial
262-
if let Err(_e) = Self::frame_to_serial(p, output).await {
263-
trace!("Serial write error");
264-
return SendOutput::Error {
265-
err: Error::TxFailure,
266-
cookie: None,
267-
};
268-
}
269-
}
270-
_ => return r.unborrowed().unwrap(),
271-
}
272-
}
192+
) -> Result<()> {
193+
Self::frame_to_serial(pkt, output)
194+
.await
195+
.map_err(|_e| Error::TxFailure)
273196
}
274197

275198
async fn frame_to_serial<W>(
@@ -354,7 +277,7 @@ mod tests {
354277

355278
let mut h = MctpSerialHandler::new();
356279
let mut s = FromFutures::new(esc.as_slice());
357-
let packet = h.read_frame_async(&mut s).await.unwrap();
280+
let packet = h.recv_async(&mut s).await.unwrap();
358281
debug_assert_eq!(payload, packet);
359282
}
360283

standalone/src/serial.rs

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -55,29 +55,32 @@ impl<S: Read + Write> Inner<S> {
5555
) -> Result<Tag> {
5656
let _ = self.mctp.update(self.now());
5757
let cookie = None;
58-
let r = self.mctpserial.send_fill(
58+
let mut fragmenter = self.mctp.start_send(
5959
eid,
6060
typ,
6161
tag,
62+
true,
6263
integrity_check,
64+
Some(mctp_estack::serial::MTU_MAX),
6365
cookie,
64-
&mut self.serial,
65-
&mut self.mctp,
66-
|v| {
67-
for b in bufs {
68-
v.extend_from_slice(b).ok()?
69-
}
70-
trace!("v len {}", v.len());
71-
Some(())
72-
},
73-
);
66+
)?;
7467

75-
let r = smol::block_on(r);
68+
let mut tx_msg = Vec::new();
69+
for buf in bufs {
70+
tx_msg.extend_from_slice(buf);
71+
}
7672

77-
match r {
78-
SendOutput::Packet(_) => unreachable!(),
79-
SendOutput::Complete { tag, .. } => Ok(tag),
80-
SendOutput::Error { err, .. } => Err(err),
73+
loop {
74+
let mut tx_pkt = [0u8; mctp_estack::serial::MTU_MAX];
75+
let r = fragmenter.fragment(&tx_msg, &mut tx_pkt);
76+
match r {
77+
SendOutput::Packet(p) => {
78+
let fut = self.mctpserial.send_async(p, &mut self.serial);
79+
smol::block_on(fut)?;
80+
}
81+
SendOutput::Complete { tag, .. } => break Ok(tag),
82+
SendOutput::Error { err, .. } => break Err(err),
83+
};
8184
}
8285
}
8386

@@ -102,26 +105,30 @@ impl<S: Read + Write> Inner<S> {
102105
loop {
103106
let _ = self.mctp.update(self.now());
104107

105-
let r = self
106-
.mctpserial
107-
.receive_async(&mut self.serial, &mut self.mctp)
108-
.or(async {
109-
if let Some(deadline) = deadline {
110-
Timer::at(deadline)
111-
} else {
112-
Timer::never()
113-
}
114-
.await;
115-
Err(mctp::Error::TimedOut)
116-
});
108+
let r = self.mctpserial.recv_async(&mut self.serial).or(async {
109+
if let Some(deadline) = deadline {
110+
Timer::at(deadline)
111+
} else {
112+
Timer::never()
113+
}
114+
.await;
115+
Err(mctp::Error::TimedOut)
116+
});
117117

118-
let r = smol::block_on(r)?;
118+
let pkt = smol::block_on(r)?;
119119

120-
if let Some((_msg, handle)) = r {
121-
// Tricks here for loops+lifetimes.
122-
// Could return (msg, handle) directly once Rust polonius merged.
123-
let msg = self.mctp.fetch_message(&handle);
124-
return Ok((msg, handle));
120+
let r = self.mctp.receive(pkt);
121+
122+
match r {
123+
Ok(Some((_msg, handle))) => {
124+
// Tricks here for loops+lifetimes.
125+
// Could return (msg, handle) directly once Rust polonius
126+
// merged.
127+
let msg = self.mctp.fetch_message(&handle);
128+
break Ok((msg, handle));
129+
}
130+
Ok(None) => (),
131+
Err(e) => break Err(e),
125132
}
126133
}
127134
}

0 commit comments

Comments
 (0)