Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions embassy-net/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use core::future::{Future, poll_fn};
use core::mem;
use core::task::{Context, Poll};

use smoltcp::iface::{Interface, SocketHandle};
use smoltcp::socket::udp;
pub use smoltcp::socket::udp::{PacketMetadata, UdpMetadata};
Expand Down Expand Up @@ -45,6 +44,8 @@ pub enum RecvError {
pub struct UdpSocket<'a> {
stack: Stack<'a>,
handle: SocketHandle,

bind_endpoint: Option<IpListenEndpoint>,
}

impl<'a> UdpSocket<'a> {
Expand All @@ -67,7 +68,7 @@ impl<'a> UdpSocket<'a> {
))
});

Self { stack, handle }
Self { stack, handle, bind_endpoint: None }
}

/// Bind the socket to a local endpoint.
Expand Down Expand Up @@ -394,3 +395,61 @@ impl Drop for UdpSocket<'_> {
fn _assert_covariant<'a, 'b: 'a>(x: UdpSocket<'b>) -> UdpSocket<'a> {
x
}

pub mod socket {
use core::net::SocketAddr;
use embedded_nal_async::UnconnectedUdp;
use super::*;

impl<'a> UnconnectedUdp for UdpSocket<'a> {
type Error = UnconnectedUdpError;

async fn send(&mut self, local: SocketAddr, remote: SocketAddr, data: &[u8]) -> Result<(), Self::Error> {
self.send_to(data, remote).await
.map_err(UnconnectedUdpError::SendError)
}

async fn receive_into(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr, SocketAddr), Self::Error> {
self.recv_from(buffer).await
.map(|(size, meta)| {
// Safety: 'reciving' always has local_address, can't receive without being bound.
let local_socket_address: SocketAddr = SocketAddr::new(meta.local_address.unwrap().into(), self.bind_endpoint.unwrap().port);
let remote_socket_address: SocketAddr = SocketAddr::new(meta.endpoint.addr.into(), meta.endpoint.port);
(size, local_socket_address, remote_socket_address)
})
.map_err(UnconnectedUdpError::RecvError)
}
}

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum UnconnectedUdpError {
SendError(SendError),
RecvError(RecvError),
}
}

mod embedded_io_impls {
use embedded_io_async::ErrorKind;
use crate::udp::{RecvError, SendError};
use crate::udp::socket::UnconnectedUdpError;

impl embedded_io_async::Error for UnconnectedUdpError {
fn kind(&self) -> ErrorKind {
match self {
UnconnectedUdpError::SendError(send_error) => {
match send_error {
SendError::NoRoute => ErrorKind::NotFound,
SendError::SocketNotBound => ErrorKind::Other,
SendError::PacketTooLarge => ErrorKind::Other,
}
}
UnconnectedUdpError::RecvError(receive_error) => {
match receive_error {
RecvError::Truncated => ErrorKind::BrokenPipe
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion examples/stm32h7/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ embassy-sync = { version = "0.7.2", path = "../../embassy-sync", features = ["de
embassy-embedded-hal = { version = "0.5.0", path = "../../embassy-embedded-hal" }
embassy-executor = { version = "0.9.0", path = "../../embassy-executor", features = ["arch-cortex-m", "executor-thread", "executor-interrupt", "defmt"] }
embassy-time = { version = "0.5.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-hz-32_768"] }
embassy-net = { version = "0.7.1", path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "proto-ipv6", "dns"] }
embassy-net = { version = "0.7.1", path = "../../embassy-net", features = ["defmt", "tcp", "udp", "dhcpv4", "medium-ethernet", "proto-ipv6", "dns"] }
embassy-usb = { version = "0.5.1", path = "../../embassy-usb", features = ["defmt"] }
embassy-futures = { version = "0.1.2", path = "../../embassy-futures" }

Expand Down
101 changes: 88 additions & 13 deletions examples/stm32h7/src/bin/eth_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
#![no_main]

use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

use defmt::*;
use embassy_executor::Spawner;
use embassy_net::StackResources;
use embassy_net::{Stack, StackResources};
use embassy_net::udp::{PacketMetadata, UdpSocket};
use embassy_net::tcp::client::{TcpClient, TcpClientState};
use embassy_stm32::eth::{Ethernet, GenericPhy, PacketQueue};
use embassy_stm32::peripherals::ETH;
use embassy_stm32::rng::Rng;
use embassy_stm32::{Config, bind_interrupts, eth, peripherals, rng};
use embassy_time::Timer;
use embedded_io_async::Write;
use embedded_nal_async::TcpConnect;
use embedded_nal_async::{TcpConnect, UnconnectedUdp};
use static_cell::StaticCell;
use {defmt_rtt as _, panic_probe as _};
use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex;
use embassy_sync::mutex::Mutex;

enum TcpState {
Connecting,
Connected,
}

type MessageType = Mutex<ThreadModeRawMutex, Option<TcpState>>;
static MESSAGE: MessageType = Mutex::new(None);

bind_interrupts!(struct Irqs {
ETH => eth::InterruptHandler;
Expand Down Expand Up @@ -85,46 +95,111 @@ async fn main(spawner: Spawner) -> ! {

let config = embassy_net::Config::dhcpv4(Default::default());
//let config = embassy_net::Config::ipv4_static(embassy_net::StaticConfigV4 {
// address: Ipv4Cidr::new(Ipv4Address::new(10, 42, 0, 61), 24),
// address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 18, 64), 24),
// dns_servers: Vec::new(),
// gateway: Some(Ipv4Address::new(10, 42, 0, 1)),
// gateway: Some(Ipv4Address::new(192, 168, 18, 1)),
//});

// Init network stack
static RESOURCES: StaticCell<StackResources<3>> = StaticCell::new();
static RESOURCES: StaticCell<StackResources<4>> = StaticCell::new();
let (stack, runner) = embassy_net::new(device, config, RESOURCES.init(StackResources::new()), seed);

// Launch network task
spawner.spawn(unwrap!(net_task(runner)));

// Ensure DHCP configuration is up before trying connect
stack.wait_config_up().await;
let config_v4 = unwrap!(stack.config_v4());

info!("Network task initialized");

let local_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.address().into(), 8001).into();
info!("udp local address: {}", local_socket_address);
let broadcast_socket_address: SocketAddr = SocketAddrV4::new(config_v4.address.broadcast().unwrap().into(), 8001).into();
info!("udp broadcast address: {}", broadcast_socket_address);

// You need to start a server on the host machine, for example: `nc -b -l 8001`
let udp_address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8001));

// You need to start a server on the host machine, for example: `nc -l 8000`
let tcp_address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 18, 41), 8000));

spawner.spawn(unwrap!(broadcast_task(stack, local_socket_address, broadcast_socket_address, udp_address, &MESSAGE)));
spawner.spawn(unwrap!(tcp_communication_task(stack, tcp_address, &MESSAGE)));

loop {
Timer::after_secs(1).await;
}
}

#[embassy_executor::task]
async fn tcp_communication_task(stack: Stack<'static>, tcp_address: SocketAddr, message: &'static MessageType) -> ! {

let state: TcpClientState<1, 1024, 1024> = TcpClientState::new();
let client = TcpClient::new(stack, &state);

loop {
// You need to start a server on the host machine, for example: `nc -l 8000`
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(10, 42, 0, 1), 8000));

info!("connecting...");
let r = client.connect(addr).await;
{
*(message.lock().await) = Some(TcpState::Connecting);
}
let r = client.connect(tcp_address).await;
if let Err(e) = r {
info!("connect error: {:?}", e);
info!("tcp connect error: {:?}", e);
Timer::after_secs(1).await;
continue;
}

let mut connection = r.unwrap();
info!("connected!");
info!("tcp connected!");
{
*(message.lock().await) = Some(TcpState::Connected);
}

loop {
let r = connection.write_all(b"Hello\n").await;
if let Err(e) = r {
info!("write error: {:?}", e);
info!("tcp write error: {:?}", e);
break;
}
Timer::after_secs(1).await;
}
}
}

#[embassy_executor::task]
async fn broadcast_task(stack: Stack<'static>, local_socket_address: SocketAddr, broadcast_socket_address: SocketAddr, udp_address: SocketAddr, message: &'static MessageType) -> ! {

let mut rx_meta = [PacketMetadata::EMPTY; 16];
let mut rx_buffer = [0; 4096];
let mut tx_meta = [PacketMetadata::EMPTY; 16];
let mut tx_buffer = [0; 4096];

let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer);
socket.bind(8001).unwrap();

loop {
let mut message_unlocked = message.lock().await;
if let Some(message_ref) = message_unlocked.as_mut() {

match message_ref {
TcpState::Connecting => {
let r = socket.send(local_socket_address, broadcast_socket_address, b"UDP: Waiting for TCP connect\n").await;
if let Err(e) = r {
info!("udp broadcast error: {:?}", e);
}
}
TcpState::Connected => {
let r = socket.send(local_socket_address, udp_address, b"UDP: TCP connection OK\n").await;
if let Err(e) = r {
info!("udp write error: {:?}", e);
}
}
}
}
// release the mutex
drop(message_unlocked);

Timer::after_secs(1).await;
}
}
Loading