Skip to content

Commit 980c966

Browse files
jk-ozlabsmkj
authored andcommitted
mctp-linux: Add implementations for MCTP async traits
Add an Async socket plus AsyncReqChannel, AsyncRespChannel and AsyncListener implementations. Include a small async-based example too. Signed-off-by: Jeremy Kerr <[email protected]>
1 parent 3f10471 commit 980c966

File tree

4 files changed

+268
-13
lines changed

4 files changed

+268
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mctp-linux/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ categories = ["network-programming", "embedded", "hardware-support", "os::linux-
1010
[dependencies]
1111
libc = "0.2"
1212
mctp = { workspace = true, features = ["std"] }
13+
smol = { version = "2.0" }
1314

1415
[[example]]
1516
name = "mctp-req"

mctp-linux/examples/async-req.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// SPDX-License-Identifier: MIT OR Apache-2.0
2+
/*
3+
* Simple MCTP example using Linux sockets in async mode.
4+
*
5+
* Copyright (c) 2025 Code Construct
6+
*/
7+
8+
use mctp::{AsyncReqChannel, Eid, MCTP_TYPE_CONTROL};
9+
use mctp_linux::MctpLinuxAsyncReq;
10+
11+
fn main() -> std::io::Result<()> {
12+
const EID: Eid = Eid(8);
13+
14+
let mut ep = MctpLinuxAsyncReq::new(EID, None)?;
15+
16+
let tx_buf = vec![0x02u8];
17+
let mut rx_buf = vec![0u8; 16];
18+
19+
let (typ, ic, rx_buf) = smol::block_on(async {
20+
ep.send(MCTP_TYPE_CONTROL, &tx_buf).await?;
21+
ep.recv(&mut rx_buf).await
22+
})?;
23+
24+
println!("response type {typ}, ic {ic:?}: {rx_buf:x?}");
25+
26+
Ok(())
27+
}

mctp-linux/src/lib.rs

Lines changed: 239 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@
4040
//! socket model.
4141
4242
use core::mem;
43+
use smol::Async;
4344
use std::fmt;
4445
use std::io::Error;
45-
use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd, RawFd};
46+
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
4647
use std::time::Duration;
4748

4849
use mctp::{
@@ -160,12 +161,12 @@ impl MctpSocket {
160161
Ok(MctpSocket(fd))
161162
}
162163

163-
/// Blocking receive from a socket, into `buf`, returning a length
164-
/// and source address
165-
///
166-
/// Essentially a wrapper around [libc::recvfrom], using MCTP-specific
167-
/// addressing.
168-
pub fn recvfrom(&self, buf: &mut [u8]) -> Result<(usize, MctpSockAddr)> {
164+
// Inner recvfrom, returning an io::Error on failure. This can be
165+
// used with async wrappers.
166+
fn io_recvfrom(
167+
&self,
168+
buf: &mut [u8],
169+
) -> std::io::Result<(usize, MctpSockAddr)> {
169170
let mut addr = MctpSockAddr::zero();
170171
let (addr_ptr, mut addr_len) = addr.as_raw_mut();
171172
let buf_ptr = buf.as_mut_ptr() as *mut libc::c_void;
@@ -177,17 +178,26 @@ impl MctpSocket {
177178
};
178179

179180
if rc < 0 {
180-
Err(last_os_error())
181+
Err(Error::last_os_error())
181182
} else {
182183
Ok((rc as usize, addr))
183184
}
184185
}
185186

186-
/// Blocking send to a socket, given a buffer and address, returning
187-
/// the number of bytes sent.
187+
/// Blocking receive from a socket, into `buf`, returning a length
188+
/// and source address
188189
///
189-
/// Essentially a wrapper around [libc::sendto].
190-
pub fn sendto(&self, buf: &[u8], addr: &MctpSockAddr) -> Result<usize> {
190+
/// Essentially a wrapper around [libc::recvfrom], using MCTP-specific
191+
/// addressing.
192+
pub fn recvfrom(&self, buf: &mut [u8]) -> Result<(usize, MctpSockAddr)> {
193+
self.io_recvfrom(buf).map_err(mctp::Error::Io)
194+
}
195+
196+
fn io_sendto(
197+
&self,
198+
buf: &[u8],
199+
addr: &MctpSockAddr,
200+
) -> std::io::Result<usize> {
191201
let (addr_ptr, addr_len) = addr.as_raw();
192202
let buf_ptr = buf.as_ptr() as *const libc::c_void;
193203
let buf_len = buf.len() as libc::size_t;
@@ -198,12 +208,20 @@ impl MctpSocket {
198208
};
199209

200210
if rc < 0 {
201-
Err(last_os_error())
211+
Err(Error::last_os_error())
202212
} else {
203213
Ok(rc as usize)
204214
}
205215
}
206216

217+
/// Blocking send to a socket, given a buffer and address, returning
218+
/// the number of bytes sent.
219+
///
220+
/// Essentially a wrapper around [libc::sendto].
221+
pub fn sendto(&self, buf: &[u8], addr: &MctpSockAddr) -> Result<usize> {
222+
self.io_sendto(buf, addr).map_err(mctp::Error::Io)
223+
}
224+
207225
/// Bind the socket to a local address.
208226
pub fn bind(&self, addr: &MctpSockAddr) -> Result<()> {
209227
let (addr_ptr, addr_len) = addr.as_raw();
@@ -298,6 +316,57 @@ impl std::os::fd::AsRawFd for MctpSocket {
298316
}
299317
}
300318

319+
impl AsFd for MctpSocket {
320+
fn as_fd(&self) -> BorrowedFd<'_> {
321+
self.0.as_fd()
322+
}
323+
}
324+
325+
/// MCTP socket for async use
326+
pub struct MctpSocketAsync(Async<MctpSocket>);
327+
328+
impl MctpSocketAsync {
329+
/// Create a new async MCTP socket
330+
pub fn new() -> Result<Self> {
331+
let sock = MctpSocket::new()?;
332+
let sock = Async::new(sock).map_err(mctp::Error::Io)?;
333+
334+
Ok(Self(sock))
335+
}
336+
337+
/// Bind the socket to a local address.
338+
pub fn bind(&self, addr: &MctpSockAddr) -> Result<()> {
339+
self.0.as_ref().bind(addr)
340+
}
341+
342+
/// Receive a message from this socket
343+
///
344+
/// Returns the length of buffer read, and the peer address.
345+
pub async fn recvfrom(
346+
&self,
347+
buf: &mut [u8],
348+
) -> Result<(usize, MctpSockAddr)> {
349+
self.0
350+
.read_with(|io| io.io_recvfrom(buf))
351+
.await
352+
.map_err(mctp::Error::Io)
353+
}
354+
355+
/// Send a message to a given address
356+
///
357+
/// Returns the number of bytes sent
358+
pub async fn sendto(
359+
&self,
360+
buf: &[u8],
361+
addr: &MctpSockAddr,
362+
) -> Result<usize> {
363+
self.0
364+
.write_with(|io| io.io_sendto(buf, addr))
365+
.await
366+
.map_err(mctp::Error::Io)
367+
}
368+
}
369+
301370
/// Encapsulation of a remote endpoint: a socket and an Endpoint ID.
302371
pub struct MctpLinuxReq {
303372
eid: Eid,
@@ -383,6 +452,71 @@ impl mctp::ReqChannel for MctpLinuxReq {
383452
}
384453
}
385454

455+
/// Encapsulation of a remote endpoint: a socket and an Endpoint ID.
456+
pub struct MctpLinuxAsyncReq {
457+
eid: Eid,
458+
net: u32,
459+
sock: MctpSocketAsync,
460+
sent: bool,
461+
}
462+
463+
impl MctpLinuxAsyncReq {
464+
/// Create a new asynchronous request channel.
465+
pub fn new(eid: Eid, net: Option<u32>) -> Result<Self> {
466+
let net = net.unwrap_or(MCTP_NET_ANY);
467+
Ok(Self {
468+
eid,
469+
net,
470+
sock: MctpSocketAsync::new()?,
471+
sent: false,
472+
})
473+
}
474+
}
475+
476+
impl mctp::AsyncReqChannel for MctpLinuxAsyncReq {
477+
fn remote_eid(&self) -> Eid {
478+
self.eid
479+
}
480+
481+
async fn send_vectored(
482+
&mut self,
483+
typ: MsgType,
484+
ic: MsgIC,
485+
bufs: &[&[u8]],
486+
) -> Result<()> {
487+
let typ_ic = mctp::encode_type_ic(typ, ic);
488+
let addr = MctpSockAddr::new(
489+
self.eid.0,
490+
self.net,
491+
typ_ic,
492+
mctp::MCTP_TAG_OWNER,
493+
);
494+
let concat = bufs
495+
.iter()
496+
.flat_map(|b| b.iter().cloned())
497+
.collect::<Vec<u8>>();
498+
self.sock.sendto(&concat, &addr).await?;
499+
self.sent = true;
500+
Ok(())
501+
}
502+
503+
async fn recv<'f>(
504+
&mut self,
505+
buf: &'f mut [u8],
506+
) -> Result<(MsgType, MsgIC, &'f mut [u8])> {
507+
if !self.sent {
508+
return Err(mctp::Error::BadArgument);
509+
}
510+
let (sz, addr) = self.sock.recvfrom(buf).await?;
511+
let src = Eid(addr.0.smctp_addr);
512+
let (typ, ic) = mctp::decode_type_ic(addr.0.smctp_type);
513+
if src != self.eid {
514+
return Err(mctp::Error::Other);
515+
}
516+
Ok((typ, ic, &mut buf[..sz]))
517+
}
518+
}
519+
386520
/// A Listener for Linux MCTP messages
387521
pub struct MctpLinuxListener {
388522
sock: MctpSocket,
@@ -453,6 +587,62 @@ impl mctp::Listener for MctpLinuxListener {
453587
}
454588
}
455589

590+
/// An MCTP Listener for asynchronous IO
591+
pub struct MctpLinuxAsyncListener {
592+
sock: MctpSocketAsync,
593+
net: u32,
594+
typ: MsgType,
595+
}
596+
597+
impl MctpLinuxAsyncListener {
598+
/// Create a new `MctpLinuxAsyncListener`.
599+
///
600+
/// This will listen for MCTP message type `typ`, on an optional
601+
/// Linux network `net`. `None` network defaults to `MCTP_NET_ANY`.
602+
pub fn new(typ: MsgType, net: Option<u32>) -> Result<Self> {
603+
let sock = MctpSocketAsync::new()?;
604+
// Linux requires MCTP_ADDR_ANY for binds.
605+
let net = net.unwrap_or(MCTP_NET_ANY);
606+
let addr = MctpSockAddr::new(
607+
MCTP_ADDR_ANY.0,
608+
net,
609+
typ.0,
610+
mctp::MCTP_TAG_OWNER,
611+
);
612+
sock.bind(&addr)?;
613+
Ok(Self { sock, net, typ })
614+
}
615+
}
616+
617+
impl mctp::AsyncListener for MctpLinuxAsyncListener {
618+
type RespChannel<'a> = MctpLinuxAsyncResp<'a>;
619+
620+
async fn recv<'f>(
621+
&mut self,
622+
buf: &'f mut [u8],
623+
) -> Result<(MsgType, MsgIC, &'f mut [u8], Self::RespChannel<'_>)> {
624+
let (sz, addr) = self.sock.recvfrom(buf).await?;
625+
let src = Eid(addr.0.smctp_addr);
626+
let (typ, ic) = mctp::decode_type_ic(addr.0.smctp_type);
627+
let tag = tag_from_smctp(addr.0.smctp_tag);
628+
if let Tag::Unowned(_) = tag {
629+
// bind() shouldn't give non-owned packets.
630+
return Err(mctp::Error::InternalError);
631+
}
632+
if typ != self.typ {
633+
// bind() should return the requested type
634+
return Err(mctp::Error::InternalError);
635+
}
636+
let ep = MctpLinuxAsyncResp {
637+
eid: src,
638+
tv: tag.tag(),
639+
listener: self,
640+
typ,
641+
};
642+
Ok((typ, ic, &mut buf[..sz], ep))
643+
}
644+
}
645+
456646
/// A Linux MCTP Listener response channel
457647
pub struct MctpLinuxResp<'a> {
458648
eid: Eid,
@@ -492,6 +682,42 @@ impl mctp::RespChannel for MctpLinuxResp<'_> {
492682
}
493683
}
494684

685+
/// A Linux MCTP Async Listener response channel
686+
pub struct MctpLinuxAsyncResp<'l> {
687+
eid: Eid,
688+
tv: TagValue,
689+
listener: &'l MctpLinuxAsyncListener,
690+
typ: MsgType,
691+
}
692+
693+
impl<'l> mctp::AsyncRespChannel for MctpLinuxAsyncResp<'l> {
694+
type ReqChannel<'a>
695+
= MctpLinuxAsyncReq
696+
where
697+
Self: 'a;
698+
699+
async fn send_vectored(&mut self, ic: MsgIC, bufs: &[&[u8]]) -> Result<()> {
700+
let typ_ic = mctp::encode_type_ic(self.typ, ic);
701+
let tag = tag_to_smctp(&Tag::Unowned(self.tv));
702+
let addr =
703+
MctpSockAddr::new(self.eid.0, self.listener.net, typ_ic, tag);
704+
let concat = bufs
705+
.iter()
706+
.flat_map(|b| b.iter().cloned())
707+
.collect::<Vec<u8>>();
708+
self.listener.sock.sendto(&concat, &addr).await?;
709+
Ok(())
710+
}
711+
712+
fn remote_eid(&self) -> Eid {
713+
self.eid
714+
}
715+
716+
fn req_channel(&self) -> Result<Self::ReqChannel<'_>> {
717+
MctpLinuxAsyncReq::new(self.eid, Some(self.listener.net))
718+
}
719+
}
720+
495721
/// Helper for applications taking an MCTP address as an argument,
496722
/// configuration, etc.
497723
///

0 commit comments

Comments
 (0)