From 8fe87eac03401812929e51412a30c87a679ae7fe Mon Sep 17 00:00:00 2001 From: Dominic Clifton Date: Thu, 9 Oct 2025 18:39:56 +0200 Subject: [PATCH 1/4] Initial implementation of UnconnectedUdp from the embedded_nal_async crate. --- embassy-net/src/udp.rs | 63 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/embassy-net/src/udp.rs b/embassy-net/src/udp.rs index 448c25eccc..f3fce6f05c 100644 --- a/embassy-net/src/udp.rs +++ b/embassy-net/src/udp.rs @@ -3,7 +3,6 @@ use core::future::{Future, poll_fn}; use core::mem; use core::task::{Context, Poll}; - use smoltcp::iface::{Interface, SocketHandle}; use smoltcp::socket::udp; pub use smoltcp::socket::udp::{PacketMetadata, UdpMetadata}; @@ -45,6 +44,8 @@ pub enum RecvError { pub struct UdpSocket<'a> { stack: Stack<'a>, handle: SocketHandle, + + bind_endpoint: Option, } impl<'a> UdpSocket<'a> { @@ -67,7 +68,7 @@ impl<'a> UdpSocket<'a> { )) }); - Self { stack, handle } + Self { stack, handle, bind_endpoint: None } } /// Bind the socket to a local endpoint. @@ -394,3 +395,61 @@ impl Drop for UdpSocket<'_> { fn _assert_covariant<'a, 'b: 'a>(x: UdpSocket<'b>) -> UdpSocket<'a> { x } + +pub mod socket { + use core::net::SocketAddr; + use embedded_nal_async::UnconnectedUdp; + use super::*; + + impl<'a> UnconnectedUdp for UdpSocket<'a> { + type Error = UnconnectedUdpError; + + async fn send(&mut self, local: SocketAddr, remote: SocketAddr, data: &[u8]) -> Result<(), Self::Error> { + self.send_to(data, remote).await + .map_err(UnconnectedUdpError::SendError) + } + + async fn receive_into(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr, SocketAddr), Self::Error> { + self.recv_from(buffer).await + .map(|(size, meta)| { + // Safety: 'reciving' always has local_address, can't receive without being bound. + let local_socket_address: SocketAddr = SocketAddr::new(meta.local_address.unwrap().into(), self.bind_endpoint.unwrap().port); + let remote_socket_address: SocketAddr = SocketAddr::new(meta.endpoint.addr.into(), meta.endpoint.port); + (size, local_socket_address, remote_socket_address) + }) + .map_err(UnconnectedUdpError::RecvError) + } + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug)] + #[cfg_attr(feature = "defmt", derive(defmt::Format))] + pub enum UnconnectedUdpError { + SendError(SendError), + RecvError(RecvError), + } +} + +mod embedded_io_impls { + use embedded_io_async::ErrorKind; + use crate::udp::{RecvError, SendError}; + use crate::udp::socket::UnconnectedUdpError; + + impl embedded_io_async::Error for UnconnectedUdpError { + fn kind(&self) -> ErrorKind { + match self { + UnconnectedUdpError::SendError(send_error) => { + match send_error { + SendError::NoRoute => ErrorKind::NotFound, + SendError::SocketNotBound => ErrorKind::Other, + SendError::PacketTooLarge => ErrorKind::Other, + } + } + UnconnectedUdpError::RecvError(receive_error) => { + match receive_error { + RecvError::Truncated => ErrorKind::BrokenPipe + } + } + } + } + } +} \ No newline at end of file From 42e1246c97ece6355c5de2d9d352fe7bf3800afa Mon Sep 17 00:00:00 2001 From: Dominic Clifton Date: Thu, 9 Oct 2025 18:40:24 +0200 Subject: [PATCH 2/4] Update the stm32h7 eth_client.rs example to use UDP. --- examples/stm32h7/Cargo.toml | 2 +- examples/stm32h7/src/bin/eth_client.rs | 45 ++++++++++++++++++++++---- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/examples/stm32h7/Cargo.toml b/examples/stm32h7/Cargo.toml index 5993110de0..53c1db9384 100644 --- a/examples/stm32h7/Cargo.toml +++ b/examples/stm32h7/Cargo.toml @@ -12,7 +12,7 @@ embassy-sync = { version = "0.7.2", path = "../../embassy-sync", features = ["de embassy-embedded-hal = { version = "0.5.0", path = "../../embassy-embedded-hal" } embassy-executor = { version = "0.9.0", path = "../../embassy-executor", features = ["arch-cortex-m", "executor-thread", "executor-interrupt", "defmt"] } embassy-time = { version = "0.5.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-hz-32_768"] } -embassy-net = { version = "0.7.1", path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "proto-ipv6", "dns"] } +embassy-net = { version = "0.7.1", path = "../../embassy-net", features = ["defmt", "tcp", "udp", "dhcpv4", "medium-ethernet", "proto-ipv6", "dns"] } embassy-usb = { version = "0.5.1", path = "../../embassy-usb", features = ["defmt"] } embassy-futures = { version = "0.1.2", path = "../../embassy-futures" } diff --git a/examples/stm32h7/src/bin/eth_client.rs b/examples/stm32h7/src/bin/eth_client.rs index fed8f1a9c2..c281182389 100644 --- a/examples/stm32h7/src/bin/eth_client.rs +++ b/examples/stm32h7/src/bin/eth_client.rs @@ -1,6 +1,7 @@ #![no_std] #![no_main] +use embassy_net::udp::{PacketMetadata, UdpSocket}; use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use defmt::*; @@ -13,9 +14,10 @@ use embassy_stm32::rng::Rng; use embassy_stm32::{Config, bind_interrupts, eth, peripherals, rng}; use embassy_time::Timer; use embedded_io_async::Write; -use embedded_nal_async::TcpConnect; +use embedded_nal_async::{TcpConnect, UnconnectedUdp}; use static_cell::StaticCell; use {defmt_rtt as _, panic_probe as _}; +use embassy_net::udp::socket::UnconnectedUdpError; bind_interrupts!(struct Irqs { ETH => eth::InterruptHandler; @@ -91,7 +93,7 @@ async fn main(spawner: Spawner) -> ! { //}); // Init network stack - static RESOURCES: StaticCell> = StaticCell::new(); + static RESOURCES: StaticCell> = StaticCell::new(); let (stack, runner) = embassy_net::new(device, config, RESOURCES.init(StackResources::new()), seed); // Launch network task @@ -99,29 +101,60 @@ async fn main(spawner: Spawner) -> ! { // Ensure DHCP configuration is up before trying connect stack.wait_config_up().await; + let config_v4 = unwrap!(stack.config_v4()); info!("Network task initialized"); + + let mut rx_meta = [PacketMetadata::EMPTY; 16]; + let mut rx_buffer = [0; 4096]; + let mut tx_meta = [PacketMetadata::EMPTY; 16]; + let mut tx_buffer = [0; 4096]; + + let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer); + socket.bind(8001).unwrap(); + + let local_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.address().into(), 8001).into(); + info!("udp local address: {}", local_socket_address); + let broadcast_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.broadcast().unwrap().into(), 8001).into(); + info!("udp broadcast address: {}", broadcast_socket_address); + let state: TcpClientState<1, 1024, 1024> = TcpClientState::new(); let client = TcpClient::new(stack, &state); loop { // You need to start a server on the host machine, for example: `nc -l 8000` - let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(10, 42, 0, 1), 8000)); + let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8000)); + + // You need to start a server on the host machine, for example: `nc -b -l 8001` + let udp_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8001)); info!("connecting..."); + + let r = socket.send(local_socket_address, broadcast_socket_address, b"Broadcast UDP\n").await; + if let Err(e) = r { + info!("udp broadcast error: {:?}", e); + } + + let r = client.connect(addr).await; if let Err(e) = r { - info!("connect error: {:?}", e); + info!("tcp connect error: {:?}", e); Timer::after_secs(1).await; continue; } let mut connection = r.unwrap(); - info!("connected!"); + info!("tcp connected!"); loop { + + let r = socket.send(local_socket_address, udp_addr, b"Hello UDP\n").await; + if let Err(e) = r { + info!("udp write error: {:?}", e); + } + let r = connection.write_all(b"Hello\n").await; if let Err(e) = r { - info!("write error: {:?}", e); + info!("tcp write error: {:?}", e); break; } Timer::after_secs(1).await; From 06b5e7453e3287d8fdf1a432ee4425001e346d90 Mon Sep 17 00:00:00 2001 From: Dominic Clifton Date: Thu, 9 Oct 2025 18:15:46 +0200 Subject: [PATCH 3/4] Refactor stm32h7 eth_client to use a task for sending UDP messages. --- examples/stm32h7/src/bin/eth_client.rs | 102 ++++++++++++++++--------- 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/examples/stm32h7/src/bin/eth_client.rs b/examples/stm32h7/src/bin/eth_client.rs index c281182389..4f6af3d0ca 100644 --- a/examples/stm32h7/src/bin/eth_client.rs +++ b/examples/stm32h7/src/bin/eth_client.rs @@ -1,12 +1,11 @@ #![no_std] #![no_main] -use embassy_net::udp::{PacketMetadata, UdpSocket}; use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; - use defmt::*; use embassy_executor::Spawner; -use embassy_net::StackResources; +use embassy_net::{Stack, StackResources}; +use embassy_net::udp::{PacketMetadata, UdpSocket}; use embassy_net::tcp::client::{TcpClient, TcpClientState}; use embassy_stm32::eth::{Ethernet, GenericPhy, PacketQueue}; use embassy_stm32::peripherals::ETH; @@ -18,6 +17,16 @@ use embedded_nal_async::{TcpConnect, UnconnectedUdp}; use static_cell::StaticCell; use {defmt_rtt as _, panic_probe as _}; use embassy_net::udp::socket::UnconnectedUdpError; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::mutex::Mutex; + +enum TcpState { + Connecting, + Connected, +} + +type MessageType = Mutex>; +static MESSAGE: MessageType = Mutex::new(None); bind_interrupts!(struct Irqs { ETH => eth::InterruptHandler; @@ -87,9 +96,9 @@ async fn main(spawner: Spawner) -> ! { let config = embassy_net::Config::dhcpv4(Default::default()); //let config = embassy_net::Config::ipv4_static(embassy_net::StaticConfigV4 { - // address: Ipv4Cidr::new(Ipv4Address::new(10, 42, 0, 61), 24), + // address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 18, 64), 24), // dns_servers: Vec::new(), - // gateway: Some(Ipv4Address::new(10, 42, 0, 1)), + // gateway: Some(Ipv4Address::new(192, 168, 18, 1)), //}); // Init network stack @@ -105,53 +114,41 @@ async fn main(spawner: Spawner) -> ! { info!("Network task initialized"); - - let mut rx_meta = [PacketMetadata::EMPTY; 16]; - let mut rx_buffer = [0; 4096]; - let mut tx_meta = [PacketMetadata::EMPTY; 16]; - let mut tx_buffer = [0; 4096]; - - let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer); - socket.bind(8001).unwrap(); + let state: TcpClientState<1, 1024, 1024> = TcpClientState::new(); + let client = TcpClient::new(stack, &state); let local_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.address().into(), 8001).into(); info!("udp local address: {}", local_socket_address); let broadcast_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.broadcast().unwrap().into(), 8001).into(); info!("udp broadcast address: {}", broadcast_socket_address); - let state: TcpClientState<1, 1024, 1024> = TcpClientState::new(); - let client = TcpClient::new(stack, &state); - - loop { - // You need to start a server on the host machine, for example: `nc -l 8000` - let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8000)); + // You need to start a server on the host machine, for example: `nc -b -l 8001` + let udp_address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8001)); - // You need to start a server on the host machine, for example: `nc -b -l 8001` - let udp_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8001)); + // You need to start a server on the host machine, for example: `nc -l 8000` + let tcp_address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8000)); + + spawner.spawn(unwrap!(broadcast_task(stack, local_socket_address, broadcast_socket_address, udp_address, &MESSAGE))); + loop { info!("connecting..."); - - let r = socket.send(local_socket_address, broadcast_socket_address, b"Broadcast UDP\n").await; - if let Err(e) = r { - info!("udp broadcast error: {:?}", e); + { + *(MESSAGE.lock().await) = Some(TcpState::Connecting); } - - - let r = client.connect(addr).await; + let r = client.connect(tcp_address).await; if let Err(e) = r { info!("tcp connect error: {:?}", e); Timer::after_secs(1).await; continue; } + let mut connection = r.unwrap(); info!("tcp connected!"); - loop { - - let r = socket.send(local_socket_address, udp_addr, b"Hello UDP\n").await; - if let Err(e) = r { - info!("udp write error: {:?}", e); - } + { + *(MESSAGE.lock().await) = Some(TcpState::Connected); + } + loop { let r = connection.write_all(b"Hello\n").await; if let Err(e) = r { info!("tcp write error: {:?}", e); @@ -161,3 +158,40 @@ async fn main(spawner: Spawner) -> ! { } } } + +#[embassy_executor::task] +async fn broadcast_task(stack: Stack<'static>, local_socket_address: SocketAddr, broadcast_socket_address: SocketAddr, udp_address: SocketAddr, message: &'static MessageType) -> ! { + + let mut rx_meta = [PacketMetadata::EMPTY; 16]; + let mut rx_buffer = [0; 4096]; + let mut tx_meta = [PacketMetadata::EMPTY; 16]; + let mut tx_buffer = [0; 4096]; + + let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer); + socket.bind(8001).unwrap(); + + loop { + let mut message_unlocked = message.lock().await; + if let Some(message_ref) = message_unlocked.as_mut() { + + match message_ref { + TcpState::Connecting => { + let r = socket.send(local_socket_address, broadcast_socket_address, b"UDP: Waiting for TCP connect\n").await; + if let Err(e) = r { + info!("udp broadcast error: {:?}", e); + } + } + TcpState::Connected => { + let r = socket.send(local_socket_address, udp_address, b"UDP: TCP connection OK\n").await; + if let Err(e) = r { + info!("udp write error: {:?}", e); + } + } + } + } + // release the mutex + drop(message_unlocked); + + Timer::after_secs(1).await; + } +} \ No newline at end of file From f592a1a5a95f0ebd32fe2ef3d3daf5857ae0efc2 Mon Sep 17 00:00:00 2001 From: Dominic Clifton Date: Thu, 9 Oct 2025 18:52:38 +0200 Subject: [PATCH 4/4] Use a task for the TCP connection. --- examples/stm32h7/src/bin/eth_client.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/stm32h7/src/bin/eth_client.rs b/examples/stm32h7/src/bin/eth_client.rs index 4f6af3d0ca..7e4765d29c 100644 --- a/examples/stm32h7/src/bin/eth_client.rs +++ b/examples/stm32h7/src/bin/eth_client.rs @@ -16,7 +16,6 @@ use embedded_io_async::Write; use embedded_nal_async::{TcpConnect, UnconnectedUdp}; use static_cell::StaticCell; use {defmt_rtt as _, panic_probe as _}; -use embassy_net::udp::socket::UnconnectedUdpError; use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; use embassy_sync::mutex::Mutex; @@ -114,9 +113,6 @@ async fn main(spawner: Spawner) -> ! { info!("Network task initialized"); - let state: TcpClientState<1, 1024, 1024> = TcpClientState::new(); - let client = TcpClient::new(stack, &state); - let local_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.address().into(), 8001).into(); info!("udp local address: {}", local_socket_address); let broadcast_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.broadcast().unwrap().into(), 8001).into(); @@ -129,11 +125,23 @@ async fn main(spawner: Spawner) -> ! { let tcp_address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8000)); spawner.spawn(unwrap!(broadcast_task(stack, local_socket_address, broadcast_socket_address, udp_address, &MESSAGE))); + spawner.spawn(unwrap!(tcp_communication_task(stack, tcp_address, &MESSAGE))); + + loop { + Timer::after_secs(1).await; + } +} + +#[embassy_executor::task] +async fn tcp_communication_task(stack: Stack<'static>, tcp_address: SocketAddr, message: &'static MessageType) -> ! { + + let state: TcpClientState<1, 1024, 1024> = TcpClientState::new(); + let client = TcpClient::new(stack, &state); loop { info!("connecting..."); { - *(MESSAGE.lock().await) = Some(TcpState::Connecting); + *(message.lock().await) = Some(TcpState::Connecting); } let r = client.connect(tcp_address).await; if let Err(e) = r { @@ -145,7 +153,7 @@ async fn main(spawner: Spawner) -> ! { let mut connection = r.unwrap(); info!("tcp connected!"); { - *(MESSAGE.lock().await) = Some(TcpState::Connected); + *(message.lock().await) = Some(TcpState::Connected); } loop {