diff --git a/edge-captive/src/io.rs b/edge-captive/src/io.rs index 3cc4ece..487e3c3 100644 --- a/edge-captive/src/io.rs +++ b/edge-captive/src/io.rs @@ -12,12 +12,26 @@ pub const DEFAULT_SOCKET: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSP const PORT: u16 = 53; -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum DnsIoError { DnsError(DnsError), IoError(E), } +pub type DnsIoErrorKind = DnsIoError; + +impl DnsIoError +where + E: edge_nal::io::Error, +{ + pub fn erase(&self) -> DnsIoError { + match self { + Self::DnsError(e) => DnsIoError::DnsError(*e), + Self::IoError(e) => DnsIoError::IoError(e.kind()), + } + } +} + impl From for DnsIoError { fn from(err: DnsError) -> Self { Self::DnsError(err) diff --git a/edge-captive/src/lib.rs b/edge-captive/src/lib.rs index 6dadc01..8a653e5 100644 --- a/edge-captive/src/lib.rs +++ b/edge-captive/src/lib.rs @@ -24,7 +24,7 @@ use domain::{ #[cfg(feature = "io")] pub mod io; -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum DnsError { ShortBuf, InvalidMessage, diff --git a/edge-dhcp/src/io.rs b/edge-dhcp/src/io.rs index 4e15fcf..5b4d280 100644 --- a/edge-dhcp/src/io.rs +++ b/edge-dhcp/src/io.rs @@ -9,12 +9,26 @@ pub mod server; pub const DEFAULT_SERVER_PORT: u16 = 67; pub const DEFAULT_CLIENT_PORT: u16 = 68; -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum Error { Io(E), Format(dhcp::Error), } +pub type ErrorKind = Error; + +impl Error +where + E: edge_nal::io::Error, +{ + pub fn erase(&self) -> Error { + match self { + Self::Io(e) => Error::Io(e.kind()), + Self::Format(e) => Error::Format(*e), + } + } +} + impl From for Error { fn from(value: dhcp::Error) -> Self { Self::Format(value) diff --git a/edge-dhcp/src/lib.rs b/edge-dhcp/src/lib.rs index a98cb36..23538f5 100644 --- a/edge-dhcp/src/lib.rs +++ b/edge-dhcp/src/lib.rs @@ -17,7 +17,7 @@ pub mod server; #[cfg(feature = "io")] pub mod io; -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum Error { DataUnderflow, BufferOverflow, diff --git a/edge-http/README.md b/edge-http/README.md index c1ae228..11459d0 100644 --- a/edge-http/README.md +++ b/edge-http/README.md @@ -68,8 +68,8 @@ where Ok(()) } -async fn request<'b, const N: usize, T: TcpConnect>( - conn: &mut Connection<'b, T, N>, +async fn request( + conn: &mut Connection<'_, T, N>, uri: &str, ) -> Result<(), Error> { conn.initiate_request(true, Method::Get, uri, &[("Host", "httpbin.org")]) @@ -103,7 +103,7 @@ async fn request<'b, const N: usize, T: TcpConnect>( ### HTTP server ```rust -use core::fmt::Display; +use core::fmt::{Debug, Display}; use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::io::Error; @@ -133,24 +133,27 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { .bind(addr.parse().unwrap()) .await?; - server.run(acceptor, HttpHandler).await?; + server.run(None, acceptor, HttpHandler).await?; Ok(()) } struct HttpHandler; -impl<'b, T, const N: usize> Handler<'b, T, N> for HttpHandler -where - T: Read + Write, -{ - type Error = Error; +impl Handler for HttpHandler { + type Error + = Error + where + E: Debug; - async fn handle( + async fn handle( &self, _task_id: impl Display + Copy, - conn: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + conn: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { let headers = conn.headers()?; if headers.method != Method::Get { diff --git a/edge-http/src/io.rs b/edge-http/src/io.rs index 3d58acd..487c73c 100644 --- a/edge-http/src/io.rs +++ b/edge-http/src/io.rs @@ -18,7 +18,7 @@ pub mod client; pub mod server; /// An error in parsing the headers or the body. -#[derive(Debug)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum Error { InvalidHeaders, InvalidBody, @@ -34,6 +34,30 @@ pub enum Error { Io(E), } +pub type ErrorKind = Error; + +impl Error +where + E: edge_nal::io::Error, +{ + pub fn erase(&self) -> Error { + match self { + Self::InvalidHeaders => Error::InvalidHeaders, + Self::InvalidBody => Error::InvalidBody, + Self::TooManyHeaders => Error::TooManyHeaders, + Self::TooLongHeaders => Error::TooLongHeaders, + Self::TooLongBody => Error::TooLongBody, + Self::IncompleteHeaders => Error::IncompleteHeaders, + Self::IncompleteBody => Error::IncompleteBody, + Self::InvalidState => Error::InvalidState, + Self::ConnectionClosed => Error::ConnectionClosed, + Self::HeadersMismatchError(e) => Error::HeadersMismatchError(*e), + Self::WsUpgradeError(e) => Error::WsUpgradeError(*e), + Self::Io(e) => Error::Io(e.kind()), + } + } +} + impl From for Error { fn from(e: httparse::Error) -> Self { match e { @@ -373,7 +397,7 @@ where Ok((connection_type, body_type)) } -impl<'b, const N: usize> Headers<'b, N> { +impl Headers<'_, N> { fn resolve( &self, carry_over_connection_type: Option, @@ -497,7 +521,7 @@ where type Error = Error; } -impl<'b, R> Read for Body<'b, R> +impl Read for Body<'_, R> where R: Read, { @@ -545,7 +569,7 @@ where type Error = R::Error; } -impl<'b, R> Read for PartiallyRead<'b, R> +impl Read for PartiallyRead<'_, R> where R: Read, { @@ -824,7 +848,7 @@ where type Error = Error; } -impl<'b, R> Read for ChunkedRead<'b, R> +impl Read for ChunkedRead<'_, R> where R: Read, { diff --git a/edge-http/src/io/client.rs b/edge-http/src/io/client.rs index 6f2fb10..b0663ac 100644 --- a/edge-http/src/io/client.rs +++ b/edge-http/src/io/client.rs @@ -228,7 +228,7 @@ where let needs_close = if self.response_mut().is_ok() { self.complete_response().await? } else { - true + false }; Result::<_, Error>::Ok(needs_close) @@ -239,11 +239,13 @@ where match result { Ok(true) | Err(_) => { - let mut io = state.io.take().unwrap(); + let io = state.io.take(); *self = Self::Unbound(state); - io.close(Close::Both).await.map_err(Error::Io)?; - let _ = io.abort().await; + if let Some(mut io) = io { + io.close(Close::Both).await.map_err(Error::Io)?; + let _ = io.abort().await; + } } _ => { *self = Self::Unbound(state); @@ -255,6 +257,17 @@ where Ok(()) } + pub async fn close(mut self) -> Result<(), Error> { + let res = self.complete().await; + + if let Some(mut io) = self.unbind().io.take() { + io.close(Close::Both).await.map_err(Error::Io)?; + let _ = io.abort().await; + } + + res + } + async fn complete_request(&mut self) -> Result<(), Error> { self.request_mut()?.io.finish().await?; @@ -401,7 +414,7 @@ where type Error = Error; } -impl<'b, T, const N: usize> Read for Connection<'b, T, N> +impl Read for Connection<'_, T, N> where T: TcpConnect, { @@ -410,7 +423,7 @@ where } } -impl<'b, T, const N: usize> Write for Connection<'b, T, N> +impl Write for Connection<'_, T, N> where T: TcpConnect, { @@ -473,7 +486,7 @@ mod embedded_svc_compat { use embedded_svc::http::client::asynch::{Connection, Headers, Method, Status}; - impl<'b, T, const N: usize> Headers for super::Connection<'b, T, N> + impl Headers for super::Connection<'_, T, N> where T: TcpConnect, { @@ -484,7 +497,7 @@ mod embedded_svc_compat { } } - impl<'b, T, const N: usize> Status for super::Connection<'b, T, N> + impl Status for super::Connection<'_, T, N> where T: TcpConnect, { diff --git a/edge-http/src/io/server.rs b/edge-http/src/io/server.rs index c87485f..c286706 100644 --- a/edge-http/src/io/server.rs +++ b/edge-http/src/io/server.rs @@ -1,7 +1,8 @@ use core::fmt::{self, Debug, Display}; use core::mem::{self, MaybeUninit}; +use core::pin::pin; -use edge_nal::{with_timeout, Close, TcpShutdown, WithTimeout, WithTimeoutError}; +use edge_nal::{with_timeout, Close, Readable, TcpShutdown, WithTimeout, WithTimeoutError}; use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embassy_sync::mutex::Mutex; @@ -286,7 +287,7 @@ where type Error = Error; } -impl<'b, T, const N: usize> Read for Connection<'b, T, N> +impl Read for Connection<'_, T, N> where T: Read + Write, { @@ -295,7 +296,7 @@ where } } -impl<'b, T, const N: usize> Write for Connection<'b, T, N> +impl Write for Connection<'_, T, N> where T: Read + Write, { @@ -330,70 +331,101 @@ where } } +#[derive(Debug)] +pub enum HandlerError { + Io(T), + Connection(Error), + Handler(E), +} + +impl From> for HandlerError { + fn from(e: Error) -> Self { + Self::Connection(e) + } +} + /// A trait (async callback) for handling incoming HTTP requests -pub trait Handler<'b, T, const N: usize> -where - T: Read + Write, -{ - type Error: Debug; +pub trait Handler { + type Error: Debug + where + E: Debug; /// Handle an incoming HTTP request /// /// Parameters: /// - `task_id`: An identifier for the task, thast can be used by the handler for logging purposes /// - `connection`: A connection state machine for the request-response cycle - async fn handle( + async fn handle( &self, task_id: impl Display + Copy, - connection: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error>; + connection: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write; } -impl<'b, const N: usize, T, H> Handler<'b, T, N> for &H +impl Handler for &H where - T: Read + Write, - H: Handler<'b, T, N>, + H: Handler, { - type Error = H::Error; + type Error + = H::Error + where + E: Debug; - async fn handle( + async fn handle( &self, task_id: impl Display + Copy, - connection: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + connection: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { (**self).handle(task_id, connection).await } } -impl<'b, const N: usize, T, H> Handler<'b, T, N> for &mut H +impl Handler for &mut H where - T: Read + Write, - H: Handler<'b, T, N>, + H: Handler, { - type Error = H::Error; + type Error + = H::Error + where + E: Debug; - async fn handle( + async fn handle( &self, task_id: impl Display + Copy, - connection: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + connection: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { (**self).handle(task_id, connection).await } } -impl<'b, const N: usize, T, H> Handler<'b, T, N> for WithTimeout +impl Handler for WithTimeout where - T: Read + Write, - H: Handler<'b, T, N>, + H: Handler, { - type Error = WithTimeoutError; + type Error + = WithTimeoutError> + where + E: Debug; - async fn handle( + async fn handle( &self, task_id: impl Display + Copy, - connection: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { - with_timeout(self.timeout_ms(), self.io().handle(task_id, connection)).await?; + connection: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { + let mut io = pin!(self.io().handle(task_id, connection)); + + with_timeout(self.timeout_ms(), &mut io).await?; Ok(()) } @@ -405,27 +437,56 @@ where /// The socket stream will be closed only in case of error, or until the client explicitly requests that /// either with a hard socket close, or with a `Connection: Close` header. /// +/// A note on timeouts: +/// - The function does NOT - by default - establish any timeouts on the IO operations _except_ +/// an optional timeout for detecting idle connections, so that they can be closed and thus make +/// the server available for accepting new connections. +/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish +/// timeouts on the socket produced by the acceptor. +/// - Similarly, the server does NOT establish any timeouts on the complete request-response cycle. +/// It is up to the caller to wrap their complete or partial handling logic with +/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish +/// a global or semi-global request-response timeout. +/// /// Parameters: /// - `io`: A socket stream /// - `buf`: A work-area buffer used by the implementation +/// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive connection +/// that should be closed. If not provided, the server will not close idle connections. /// - `task_id`: An identifier for the task, used for logging purposes /// - `handler`: An implementation of `Handler` to handle incoming requests -pub async fn handle_connection( +pub async fn handle_connection( mut io: T, buf: &mut [u8], + keepalive_timeout_ms: Option, task_id: impl Display + Copy, handler: H, ) where - H: for<'b> Handler<'b, &'b mut T, N>, - T: Read + Write + TcpShutdown, + H: Handler, + T: Read + Write + Readable + TcpShutdown, { let close = loop { - debug!("Handler task {task_id}: Waiting for new request"); + debug!("Handler task {task_id}: Waiting for a new request"); - let result = handle_request::(buf, &mut io, task_id, &handler).await; + if let Some(keepalive_timeout_ms) = keepalive_timeout_ms { + let wait_data = with_timeout(keepalive_timeout_ms, io.readable()).await; + match wait_data { + Err(WithTimeoutError::Timeout) => { + info!("Handler task {task_id}: Closing connection due to inactivity"); + break true; + } + Err(e) => { + warn!("Handler task {task_id}: Error when handling request: {e:?}"); + break true; + } + Ok(_) => {} + } + } + + let result = handle_request::<_, _, N>(buf, &mut io, task_id, &handler).await; match result { - Err(HandleRequestError::Connection(Error::ConnectionClosed)) => { + Err(HandlerError::Connection(Error::ConnectionClosed)) => { debug!("Handler task {task_id}: Connection closed"); break false; } @@ -515,14 +576,14 @@ where /// - `io`: A socket stream /// - `task_id`: An identifier for the task, used for logging purposes /// - `handler`: An implementation of `Handler` to handle incoming requests -pub async fn handle_request<'b, const N: usize, H, T>( - buf: &'b mut [u8], +pub async fn handle_request( + buf: &mut [u8], io: T, task_id: impl Display + Copy, handler: H, -) -> Result> +) -> Result>> where - H: Handler<'b, T, N>, + H: Handler, T: Read + Write, { let mut connection = Connection::<_, N>::new(buf, io).await?; @@ -534,7 +595,7 @@ where Result::Err(e) => connection .complete_err("INTERNAL ERROR") .await - .map_err(|_| HandleRequestError::Handler(e))?, + .map_err(|_| HandlerError::Handler(e))?, } Ok(connection.needs_close()) @@ -566,25 +627,34 @@ impl Server { /// Run the server with the specified acceptor and handler /// + /// A note on timeouts: + /// - The function does NOT - by default - establish any timeouts on the IO operations _except_ + /// an optional timeout on idle connections, so that they can be closed. + /// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish + /// timeouts on the socket produced by the acceptor. + /// - Similarly, the function does NOT establish any timeouts on the complete request-response cycle. + /// It is up to the caller to wrap their complete or partial handling logic with + /// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish + /// a global or semi-global request-response timeout. + /// /// Parameters: + /// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive + /// connection that should be closed. If not provided, the function will not close idle connections + /// and the connection - in the absence of other timeouts - will remain active forever. /// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections /// - `handler`: An implementation of `Handler` to handle incoming requests /// If not provided, a default timeout of 50 seconds is used. - /// - /// Note that the server does NOT - by default - establish any timeouts on the IO operations. - /// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish - /// timeouts on the socket produced by the acceptor. - /// - /// Similarly, the server does NOT establish any timeouts on the complete request-response cycle. - /// It is up to the caller to wrap their complete or partial handling logic with - /// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish - /// a global or semi-global request-response timeout. #[inline(never)] #[cold] - pub async fn run(&mut self, acceptor: A, handler: H) -> Result<(), Error> + pub async fn run( + &mut self, + keepalive_timeout_ms: Option, + acceptor: A, + handler: H, + ) -> Result<(), Error> where A: edge_nal::TcpAccept, - H: for<'b, 't> Handler<'b, &'b mut A::Socket<'t>, N>, + H: Handler, { let mutex = Mutex::::new(()); let mut tasks = heapless::Vec::<_, P>::new(); @@ -614,9 +684,10 @@ impl Server { debug!("Handler task {task_id}: Got connection request"); - handle_connection::( + handle_connection::<_, _, N>( io, unsafe { buf.as_mut() }.unwrap(), + keepalive_timeout_ms, task_id, handler, ) @@ -646,14 +717,11 @@ mod embedded_svc_compat { use embedded_io_async::{Read, Write}; use embedded_svc::http::server::asynch::{Connection, Headers, Query}; - use embedded_svc::utils::http::server::registration::{ChainHandler, ChainRoot}; use crate::io::Body; use crate::RequestHeaders; - use super::*; - - impl<'b, T, const N: usize> Headers for super::Connection<'b, T, N> + impl Headers for super::Connection<'_, T, N> where T: Read + Write, { @@ -665,7 +733,7 @@ mod embedded_svc_compat { } } - impl<'b, T, const N: usize> Query for super::Connection<'b, T, N> + impl Query for super::Connection<'_, T, N> where T: Read + Write, { @@ -721,47 +789,33 @@ mod embedded_svc_compat { } } - impl<'b, T, const N: usize> Handler<'b, T, N> for ChainRoot - where - T: Read + Write, - { - type Error = Error; - - async fn handle( - &self, - _task_id: impl core::fmt::Display + Copy, - connection: &mut super::Connection<'b, T, N>, - ) -> Result<(), Self::Error> { - connection.initiate_response(404, None, &[]).await - } - } - - impl<'b, const N: usize, T, H, Q> Handler<'b, T, N> for ChainHandler - where - H: embedded_svc::http::server::asynch::Handler>, - Q: Handler<'b, T, N>, - Q::Error: Into, - T: Read + Write, - { - type Error = H::Error; - - async fn handle( - &self, - task_id: impl core::fmt::Display + Copy, - connection: &mut super::Connection<'b, T, N>, - ) -> Result<(), Self::Error> { - let headers = connection.headers().ok(); - - if let Some(headers) = headers { - if headers.path == self.path && headers.method == self.method.into() { - return self.handler.handle(connection).await; - } - } - - self.next - .handle(task_id, connection) - .await - .map_err(Into::into) - } - } + // NOTE: Currently, the `edge-http` and the `embedded-svc` Handler traits are + // incompatible, in that the `edge-http` async `Handler`'s `handle` method is generic, + // while the `embedded-svc` `Handler`'s `handle` method is not. + // + // Code below is commented out until `embedded-svc`'s `Handler` signature is changed + // to match the `edge-http` `Handler` signature. + + // pub struct SvcHandler(H); + + // impl<'b, T, const N: usize, H> Handler for SvcHandler + // where + // H: embedded_svc::http::server::asynch::Handler>, + // T: Read + Write, + // { + // type Error = Error where E: Debug; + + // async fn handle( + // &self, + // _task_id: impl core::fmt::Display + Copy, + // connection: &mut super::Connection<'_, T, N>, + // ) -> Result<(), Self::Error> + // where + // T: Read + Write, + // { + // self.0.handle(connection).await.unwrap(); + + // Ok(()) + // } + // } } diff --git a/edge-http/src/lib.rs b/edge-http/src/lib.rs index 6fe9546..7051203 100644 --- a/edge-http/src/lib.rs +++ b/edge-http/src/lib.rs @@ -16,7 +16,7 @@ pub mod io; /// Errors related to invalid combinations of connection type /// and body type (Content-Length, Transfer-Encoding) in the headers -#[derive(Debug)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum HeadersMismatchError { /// Connection type mismatch: Keep-Alive connection type in the response, /// while the request contained a Close connection type @@ -734,7 +734,7 @@ impl RequestHeaders<'_, N> { } } -impl<'b, const N: usize> Default for RequestHeaders<'b, N> { +impl Default for RequestHeaders<'_, N> { #[inline(always)] fn default() -> Self { Self::new() @@ -795,7 +795,7 @@ impl ResponseHeaders<'_, N> { } } -impl<'b, const N: usize> Default for ResponseHeaders<'b, N> { +impl Default for ResponseHeaders<'_, N> { #[inline(always)] fn default() -> Self { Self::new() @@ -888,7 +888,7 @@ pub mod ws { } /// Websocket upgrade errors - #[derive(Debug, Copy, Clone, Eq, PartialEq)] + #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum UpgradeError { /// No `Sec-WebSocket-Version` header NoVersion, @@ -1411,7 +1411,7 @@ mod embedded_svc_compat { } } - impl<'b, const N: usize> embedded_svc::http::Query for super::RequestHeaders<'b, N> { + impl embedded_svc::http::Query for super::RequestHeaders<'_, N> { fn uri(&self) -> &'_ str { self.path } @@ -1421,13 +1421,13 @@ mod embedded_svc_compat { } } - impl<'b, const N: usize> embedded_svc::http::Headers for super::RequestHeaders<'b, N> { + impl embedded_svc::http::Headers for super::RequestHeaders<'_, N> { fn header(&self, name: &str) -> Option<&'_ str> { self.headers.get(name) } } - impl<'b, const N: usize> embedded_svc::http::Status for super::ResponseHeaders<'b, N> { + impl embedded_svc::http::Status for super::ResponseHeaders<'_, N> { fn status(&self) -> u16 { self.code } @@ -1437,13 +1437,13 @@ mod embedded_svc_compat { } } - impl<'b, const N: usize> embedded_svc::http::Headers for super::ResponseHeaders<'b, N> { + impl embedded_svc::http::Headers for super::ResponseHeaders<'_, N> { fn header(&self, name: &str) -> Option<&'_ str> { self.headers.get(name) } } - impl<'b, const N: usize> embedded_svc::http::Headers for super::Headers<'b, N> { + impl embedded_svc::http::Headers for super::Headers<'_, N> { fn header(&self, name: &str) -> Option<&'_ str> { self.get(name) } diff --git a/edge-mdns/src/io.rs b/edge-mdns/src/io.rs index 25889d2..f3cb775 100644 --- a/edge-mdns/src/io.rs +++ b/edge-mdns/src/io.rs @@ -5,6 +5,7 @@ use core::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddr use core::pin::pin; use buf::BufferAccess; + use embassy_futures::select::{select, Either}; use embassy_sync::blocking_mutex; use embassy_sync::blocking_mutex::raw::RawMutex; @@ -14,7 +15,8 @@ use embassy_sync::signal::Signal; use edge_nal::{MulticastV4, MulticastV6, Readable, UdpBind, UdpReceive, UdpSend}; use embassy_time::{Duration, Timer}; -use log::{info, warn}; + +use log::{debug, warn}; use super::*; @@ -31,7 +33,7 @@ pub const IPV6_BROADCAST_ADDR: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0 pub const PORT: u16 = 5353; /// A wrapper for mDNS and IO errors. -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum MdnsIoError { MdnsError(MdnsError), NoRecvBufError, @@ -39,6 +41,22 @@ pub enum MdnsIoError { IoError(E), } +pub type MdnsIoErrorKind = MdnsIoError; + +impl MdnsIoError +where + E: edge_nal::io::Error, +{ + pub fn erase(&self) -> MdnsIoError { + match self { + Self::MdnsError(e) => MdnsIoError::MdnsError(*e), + Self::NoRecvBufError => MdnsIoError::NoRecvBufError, + Self::NoSendBufError => MdnsIoError::NoSendBufError, + Self::IoError(e) => MdnsIoError::IoError(e.kind()), + } + } +} + impl From for MdnsIoError { fn from(err: MdnsError) -> Self { Self::MdnsError(err) @@ -301,7 +319,7 @@ where // Support one-shot legacy queries by replying privately // to the remote address, if the query was not sent from the mDNS port (as per the spec) - info!("Replying privately to a one-shot mDNS query from {remote}"); + debug!("Replying privately to a one-shot mDNS query from {remote}"); if let Err(err) = send.send(remote, data).await { warn!("Failed to reply privately to {remote}: {err:?}"); @@ -313,7 +331,7 @@ where self.delay().await; } - info!("Re-broadcasting due to mDNS query from {remote}"); + debug!("Re-broadcasting due to mDNS query from {remote}"); self.broadcast_once(send, data).await?; } @@ -340,7 +358,7 @@ where ) { if !data.is_empty() { - info!("Broadcasting mDNS entry to {remote_addr}"); + debug!("Broadcasting mDNS entry to {remote_addr}"); let fut = pin!(send.send(remote_addr, data)); diff --git a/edge-mdns/src/lib.rs b/edge-mdns/src/lib.rs index dd4cb03..d6748ed 100644 --- a/edge-mdns/src/lib.rs +++ b/edge-mdns/src/lib.rs @@ -37,7 +37,7 @@ pub const DNS_SD_OWNER: NameSlice = NameSlice::new(&["_services", "_dns-sd", "_u /// A wrapper type for the errors returned by the `domain` library during parsing and /// constructing mDNS messages. -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum MdnsError { ShortBuf, InvalidMessage, diff --git a/edge-nal-embassy/Cargo.toml b/edge-nal-embassy/Cargo.toml index 0a33735..63a3c83 100644 --- a/edge-nal-embassy/Cargo.toml +++ b/edge-nal-embassy/Cargo.toml @@ -3,7 +3,7 @@ name = "edge-nal-embassy" version = "0.3.0" edition = "2021" rust-version = "1.77" -description = "Am implementation of edge-nal based on `embassy-net`" +description = "An implementation of edge-nal based on `embassy-net`" repository = "https://github.com/ivmarkov/edge-net" readme = "README.md" license = "MIT OR Apache-2.0" @@ -11,13 +11,21 @@ categories = [ "embedded", "no-std::no-alloc", "asynchronous", - "network-programming" + "network-programming", ] [dependencies] embedded-io-async = { workspace = true } edge-nal = { workspace = true } heapless = { workspace = true } -# TODO: Do not require these features and conditionalize the code instead -embassy-net = { version = "0.4", features = ["tcp", "udp", "dns", "proto-ipv6", "medium-ethernet", "proto-ipv4", "igmp"] } +# Do not require these features and conditionalize the code instead +embassy-net = { version = "0.5", features = [ + "tcp", + "udp", + "dns", + "proto-ipv6", + "medium-ethernet", + "proto-ipv4", + "multicast", +] } embassy-futures = { workspace = true } diff --git a/edge-nal-embassy/README.md b/edge-nal-embassy/README.md index 3df5501..bde536b 100644 --- a/edge-nal-embassy/README.md +++ b/edge-nal-embassy/README.md @@ -14,9 +14,8 @@ All traits except `Readable` which - while implemented - panics if called. ### UDP -* All traits except `UdpConnect`. +* All traits except `UdpConnect`. * `MulticastV6` - while implemented - panics if `join_v6` / `leave_v6` are called. -* `Readable` - while implemented - panics if called. ### Raw sockets diff --git a/edge-nal-embassy/src/dns.rs b/edge-nal-embassy/src/dns.rs index 1d16cbb..a5b4194 100644 --- a/edge-nal-embassy/src/dns.rs +++ b/edge-nal-embassy/src/dns.rs @@ -4,7 +4,6 @@ use edge_nal::AddrType; use embassy_net::{ dns::{DnsQueryType, Error}, - driver::Driver, Stack, }; use embedded_io_async::ErrorKind; @@ -12,29 +11,20 @@ use embedded_io_async::ErrorKind; use crate::to_net_addr; /// A struct that implements the `Dns` trait from `edge-nal` -pub struct Dns<'a, D> -where - D: Driver + 'static, -{ - stack: &'a Stack, +pub struct Dns<'a> { + stack: Stack<'a>, } -impl<'a, D> Dns<'a, D> -where - D: Driver + 'static, -{ +impl<'a> Dns<'a> { /// Create a new `Dns` instance for the provided Embassy networking stack /// /// NOTE: If using DHCP, make sure it has reconfigured the stack to ensure the DNS servers are updated - pub fn new(stack: &'a Stack) -> Self { + pub fn new(stack: Stack<'a>) -> Self { Self { stack } } } -impl<'a, D> edge_nal::Dns for Dns<'a, D> -where - D: Driver + 'static, -{ +impl<'a> edge_nal::Dns for Dns<'a> { type Error = DnsError; async fn get_host_by_name( diff --git a/edge-nal-embassy/src/lib.rs b/edge-nal-embassy/src/lib.rs index 6761a3f..0f99f6e 100644 --- a/edge-nal-embassy/src/lib.rs +++ b/edge-nal-embassy/src/lib.rs @@ -89,11 +89,11 @@ pub(crate) fn to_emb_bind_socket(socket: SocketAddr) -> IpListenEndpoint { pub(crate) fn to_net_addr(addr: IpAddress) -> IpAddr { match addr { //#[cfg(feature = "proto-ipv4")] - IpAddress::Ipv4(addr) => addr.0.into(), + IpAddress::Ipv4(addr) => addr.into(), // #[cfg(not(feature = "proto-ipv4"))] // IpAddr::V4(_) => panic!("ipv4 support not enabled"), //#[cfg(feature = "proto-ipv6")] - IpAddress::Ipv6(addr) => addr.0.into(), + IpAddress::Ipv6(addr) => addr.into(), // #[cfg(not(feature = "proto-ipv6"))] // IpAddr::V6(_) => panic!("ipv6 support not enabled"), } @@ -102,11 +102,11 @@ pub(crate) fn to_net_addr(addr: IpAddress) -> IpAddr { pub(crate) fn to_emb_addr(addr: IpAddr) -> IpAddress { match addr { //#[cfg(feature = "proto-ipv4")] - IpAddr::V4(addr) => IpAddress::Ipv4(embassy_net::Ipv4Address::from_bytes(&addr.octets())), + IpAddr::V4(addr) => IpAddress::Ipv4(addr), // #[cfg(not(feature = "proto-ipv4"))] // IpAddr::V4(_) => panic!("ipv4 support not enabled"), //#[cfg(feature = "proto-ipv6")] - IpAddr::V6(addr) => IpAddress::Ipv6(embassy_net::Ipv6Address::from_bytes(&addr.octets())), + IpAddr::V6(addr) => IpAddress::Ipv6(addr), // #[cfg(not(feature = "proto-ipv6"))] // IpAddr::V6(_) => panic!("ipv6 support not enabled"), } diff --git a/edge-nal-embassy/src/tcp.rs b/edge-nal-embassy/src/tcp.rs index 8e86906..b697834 100644 --- a/edge-nal-embassy/src/tcp.rs +++ b/edge-nal-embassy/src/tcp.rs @@ -6,7 +6,6 @@ use edge_nal::{Close, Readable, TcpBind, TcpConnect, TcpShutdown, TcpSplit}; use embassy_futures::join::join; -use embassy_net::driver::Driver; use embassy_net::tcp::{AcceptError, ConnectError, Error, TcpReader, TcpWriter}; use embassy_net::Stack; @@ -16,27 +15,24 @@ use crate::{to_emb_bind_socket, to_emb_socket, to_net_socket, Pool}; /// A struct that implements the `TcpConnect` and `TcpBind` factory traits from `edge-nal` /// Capable of managing up to N concurrent connections with TX and RX buffers according to TX_SZ and RX_SZ. -pub struct Tcp<'d, D: Driver, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> -{ - stack: &'d Stack, +pub struct Tcp<'d, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> { + stack: Stack<'d>, buffers: &'d TcpBuffers, } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> - Tcp<'d, D, N, TX_SZ, RX_SZ> -{ +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Tcp<'d, N, TX_SZ, RX_SZ> { /// Create a new `Tcp` instance for the provided Embassy networking stack, using the provided TCP buffers /// /// Ensure that the number of buffers `N` fits within StackResources of /// [embassy_net::Stack], while taking into account the sockets used for DHCP, DNS, etc. else /// [smoltcp::iface::SocketSet] will panic with `adding a socket to a full SocketSet`. - pub fn new(stack: &'d Stack, buffers: &'d TcpBuffers) -> Self { + pub fn new(stack: Stack<'d>, buffers: &'d TcpBuffers) -> Self { Self { stack, buffers } } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnect - for Tcp<'d, D, N, TX_SZ, RX_SZ> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnect + for Tcp<'d, N, TX_SZ, RX_SZ> { type Error = TcpError; @@ -54,13 +50,13 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpC } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpBind - for Tcp<'d, D, N, TX_SZ, RX_SZ> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpBind + for Tcp<'d, N, TX_SZ, RX_SZ> { type Error = TcpError; type Accept<'a> - = TcpAccept<'a, D, N, TX_SZ, RX_SZ> + = TcpAccept<'a, N, TX_SZ, RX_SZ> where Self: 'a; @@ -70,19 +66,13 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpB } /// Represents an acceptor for incoming TCP client connections. Implements the `TcpAccept` factory trait from `edge-nal` -pub struct TcpAccept< - 'd, - D: Driver, - const N: usize, - const TX_SZ: usize = 1024, - const RX_SZ: usize = 1024, -> { - stack: &'d Tcp<'d, D, N, TX_SZ, RX_SZ>, +pub struct TcpAccept<'d, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> { + stack: &'d Tcp<'d, N, TX_SZ, RX_SZ>, local: SocketAddr, } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize> edge_nal::TcpAccept - for TcpAccept<'d, D, N, TX_SZ, RX_SZ> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> edge_nal::TcpAccept + for TcpAccept<'d, N, TX_SZ, RX_SZ> { type Error = TcpError; @@ -111,8 +101,8 @@ pub struct TcpSocket<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> } impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpSocket<'d, N, TX_SZ, RX_SZ> { - fn new( - stack: &'d Stack, + fn new( + stack: Stack<'d>, stack_buffers: &'d TcpBuffers, ) -> Result { let mut socket_buffers = stack_buffers.pool.alloc().ok_or(TcpError::NoBuffers)?; @@ -214,7 +204,7 @@ impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Readable for TcpSocket<'d, N, TX_SZ, RX_SZ> { async fn readable(&mut self) -> Result<(), Self::Error> { - panic!("Not implemented yet") + Ok(self.socket.wait_read_ready().await) } } @@ -246,7 +236,7 @@ impl<'a> Read for TcpSocketRead<'a> { impl<'a> Readable for TcpSocketRead<'a> { async fn readable(&mut self) -> Result<(), Self::Error> { - panic!("Not implemented yet") + Ok(self.0.wait_read_ready().await) } } diff --git a/edge-nal-embassy/src/udp.rs b/edge-nal-embassy/src/udp.rs index eafca46..96b39c6 100644 --- a/edge-nal-embassy/src/udp.rs +++ b/edge-nal-embassy/src/udp.rs @@ -3,7 +3,6 @@ use core::ptr::NonNull; use edge_nal::{MulticastV4, MulticastV6, Readable, UdpBind, UdpReceive, UdpSend, UdpSplit}; -use embassy_net::driver::Driver; use embassy_net::udp::{BindError, PacketMetadata, RecvError, SendError}; use embassy_net::{MulticastError, Stack}; @@ -15,32 +14,35 @@ use crate::{to_emb_addr, to_emb_bind_socket, to_emb_socket, to_net_socket, Pool} /// Capable of managing up to N concurrent connections with TX and RX buffers according to TX_SZ and RX_SZ, and packet metadata according to `M`. pub struct Udp< 'd, - D: Driver, const N: usize, const TX_SZ: usize = 1500, const RX_SZ: usize = 1500, const M: usize = 2, > { - stack: &'d Stack, + stack: Stack<'d>, buffers: &'d UdpBuffers, } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - Udp<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> + Udp<'d, N, TX_SZ, RX_SZ, M> { /// Create a new `Udp` instance for the provided Embassy networking stack using the provided UDP buffers. - pub fn new(stack: &'d Stack, buffers: &'d UdpBuffers) -> Self { + /// + /// Ensure that the number of buffers `N` fits within StackResources of + /// [embassy_net::Stack], while taking into account the sockets used for DHCP, DNS, etc. else + /// [smoltcp::iface::SocketSet] will panic with `adding a socket to a full SocketSet`. + pub fn new(stack: Stack<'d>, buffers: &'d UdpBuffers) -> Self { Self { stack, buffers } } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpBind - for Udp<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpBind + for Udp<'d, N, TX_SZ, RX_SZ, M> { type Error = UdpError; type Socket<'a> - = UdpSocket<'a, D, N, TX_SZ, RX_SZ, M> + = UdpSocket<'a, N, TX_SZ, RX_SZ, M> where Self: 'a; @@ -55,26 +57,19 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons /// A UDP socket /// Implements the `UdpReceive` `UdpSend` and `UdpSplit` traits from `edge-nal` -pub struct UdpSocket< - 'd, - D: Driver, - const N: usize, - const TX_SZ: usize, - const RX_SZ: usize, - const M: usize, -> { - stack: &'d embassy_net::Stack, +pub struct UdpSocket<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> { + stack: embassy_net::Stack<'d>, socket: embassy_net::udp::UdpSocket<'d>, stack_buffers: &'d UdpBuffers, socket_buffers: NonNull<([u8; TX_SZ], [u8; RX_SZ])>, socket_meta_buffers: NonNull<([PacketMetadata; M], [PacketMetadata; M])>, } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> + UdpSocket<'d, N, TX_SZ, RX_SZ, M> { fn new( - stack: &'d Stack, + stack: Stack<'d>, stack_buffers: &'d UdpBuffers, ) -> Result { let mut socket_buffers = stack_buffers.pool.alloc().ok_or(UdpError::NoBuffers)?; @@ -98,8 +93,8 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Drop - for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Drop + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { fn drop(&mut self) { unsafe { @@ -110,24 +105,24 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - ErrorType for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> ErrorType + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { type Error = UdpError; } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - UdpReceive for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpReceive + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), Self::Error> { let (len, remote_endpoint) = self.socket.recv_from(buffer).await?; - Ok((len, to_net_socket(remote_endpoint))) + Ok((len, to_net_socket(remote_endpoint.endpoint))) } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSend - for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSend + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn send(&mut self, remote: SocketAddr, data: &[u8]) -> Result<(), Self::Error> { self.socket.send_to(data, to_emb_socket(remote)).await?; @@ -136,24 +131,24 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - ErrorType for &UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> ErrorType + for &UdpSocket<'d, N, TX_SZ, RX_SZ, M> { type Error = UdpError; } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - UdpReceive for &UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpReceive + for &UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), Self::Error> { let (len, remote_endpoint) = self.socket.recv_from(buffer).await?; - Ok((len, to_net_socket(remote_endpoint))) + Ok((len, to_net_socket(remote_endpoint.endpoint))) } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSend - for &UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSend + for &UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn send(&mut self, remote: SocketAddr, data: &[u8]) -> Result<(), Self::Error> { self.socket.send_to(data, to_emb_socket(remote)).await?; @@ -162,16 +157,16 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Readable - for &UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Readable + for &UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn readable(&mut self) -> Result<(), Self::Error> { - panic!("Not implemented yet") + Ok(self.socket.wait_recv_ready().await) } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSplit - for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> UdpSplit + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { type Receive<'a> = &'a Self @@ -188,8 +183,8 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - MulticastV4 for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> MulticastV4 + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn join_v4( &mut self, @@ -197,8 +192,7 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons _interface: Ipv4Addr, ) -> Result<(), Self::Error> { self.stack - .join_multicast_group(to_emb_addr(IpAddr::V4(multicast_addr))) - .await?; + .join_multicast_group(to_emb_addr(IpAddr::V4(multicast_addr)))?; Ok(()) } @@ -209,15 +203,14 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons _interface: Ipv4Addr, ) -> Result<(), Self::Error> { self.stack - .leave_multicast_group(to_emb_addr(IpAddr::V4(multicast_addr))) - .await?; + .leave_multicast_group(to_emb_addr(IpAddr::V4(multicast_addr)))?; Ok(()) } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> - MulticastV6 for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> MulticastV6 + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn join_v6( &mut self, @@ -236,11 +229,11 @@ impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, cons } } -impl<'d, D: Driver, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Readable - for UdpSocket<'d, D, N, TX_SZ, RX_SZ, M> +impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize, const M: usize> Readable + for UdpSocket<'d, N, TX_SZ, RX_SZ, M> { async fn readable(&mut self) -> Result<(), Self::Error> { - panic!("Not implemented yet") + Ok(self.socket.wait_recv_ready().await) } } diff --git a/edge-nal-std/src/lib.rs b/edge-nal-std/src/lib.rs index 72b89b7..0053faf 100644 --- a/edge-nal-std/src/lib.rs +++ b/edge-nal-std/src/lib.rs @@ -22,10 +22,10 @@ use edge_nal::{ TcpSplit, UdpBind, UdpConnect, UdpReceive, UdpSend, UdpSplit, }; -#[cfg(all(unix, not(target_os = "espidf")))] +#[cfg(any(target_os = "linux", target_os = "android"))] pub use raw::*; -#[derive(Default)] +#[derive(Default, Clone)] pub struct Stack(()); impl Stack { @@ -101,7 +101,12 @@ impl TcpAccept for TcpAcceptor { match self.0.as_ref().accept() { Ok((socket, _)) => break Ok((socket.peer_addr()?, TcpSocket(Async::new(socket)?))), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { - async_io::Timer::after(core::time::Duration::from_millis(20)).await; + #[cfg(not(feature = "async-io-mini"))] + use async_io::Timer; + #[cfg(feature = "async-io-mini")] + use async_io_mini::Timer; + + Timer::after(core::time::Duration::from_millis(20)).await; } Err(err) => break Err(err), } @@ -576,7 +581,9 @@ fn dns_lookup_host(host: &str, addr_type: AddrType) -> Result .ok_or_else(|| io::ErrorKind::AddrNotAvailable.into()) } -#[cfg(all(unix, not(target_os = "espidf")))] +// TODO: Figure out if the RAW socket implementation can be used on any other OS. +// It seems, that would be difficult on Darwin; wondering about the other BSDs though? +#[cfg(any(target_os = "linux", target_os = "android"))] mod raw { use core::ops::Deref; use core::pin::pin; @@ -816,6 +823,7 @@ mod raw { } } +#[cfg(any(target_os = "linux", target_os = "android", target_os = "espidf"))] mod sys { pub use libc::*; diff --git a/edge-nal/src/lib.rs b/edge-nal/src/lib.rs index 2bb9279..993fd41 100644 --- a/edge-nal/src/lib.rs +++ b/edge-nal/src/lib.rs @@ -17,3 +17,7 @@ mod stack; mod tcp; mod timeout; mod udp; + +pub mod io { + pub use embedded_io_async::*; +} diff --git a/edge-nal/src/timeout.rs b/edge-nal/src/timeout.rs index 1be0995..b12c2e4 100644 --- a/edge-nal/src/timeout.rs +++ b/edge-nal/src/timeout.rs @@ -194,7 +194,8 @@ where { type Error = WithTimeoutError; - type Socket<'a> = WithTimeout> + type Socket<'a> + = WithTimeout> where Self: 'a; diff --git a/edge-raw/src/bytes.rs b/edge-raw/src/bytes.rs index f7abbf3..ae0e208 100644 --- a/edge-raw/src/bytes.rs +++ b/edge-raw/src/bytes.rs @@ -1,4 +1,4 @@ -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum Error { BufferOverflow, DataUnderflow, diff --git a/edge-raw/src/io.rs b/edge-raw/src/io.rs index c7648d3..5790462 100644 --- a/edge-raw/src/io.rs +++ b/edge-raw/src/io.rs @@ -9,7 +9,7 @@ use edge_nal::{MacAddr, RawReceive, RawSend, RawSplit, Readable, UdpReceive, Udp use crate as raw; /// An error that can occur when sending or receiving UDP packets over a raw socket. -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum Error { Io(E), UnsupportedProtocol, diff --git a/edge-raw/src/lib.rs b/edge-raw/src/lib.rs index 5c7e39c..569ab40 100644 --- a/edge-raw/src/lib.rs +++ b/edge-raw/src/lib.rs @@ -18,7 +18,7 @@ pub mod udp; use bytes::BytesIn; /// An error type for decoding and encoding IP and UDP oackets -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum Error { DataUnderflow, BufferOverflow, diff --git a/edge-ws/README.md b/edge-ws/README.md index 5bbf20a..7d1cf69 100644 --- a/edge-ws/README.md +++ b/edge-ws/README.md @@ -148,7 +148,7 @@ where ### Websocket echo server ```rust -use core::fmt::Display; +use core::fmt::{Debug, Display}; use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::io::Error; @@ -180,36 +180,39 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { .bind(addr.parse().unwrap()) .await?; - server.run(acceptor, WsHandler).await?; + server.run(None, acceptor, WsHandler).await?; Ok(()) } #[derive(Debug)] enum WsHandlerError { - ConnectionError(C), - WsError(W), + Connection(C), + Ws(W), } impl From for WsHandlerError { fn from(e: C) -> Self { - Self::ConnectionError(e) + Self::Connection(e) } } struct WsHandler; -impl<'b, T, const N: usize> Handler<'b, T, N> for WsHandler -where - T: Read + Write, -{ - type Error = WsHandlerError, edge_ws::Error>; +impl Handler for WsHandler { + type Error + = WsHandlerError, edge_ws::Error> + where + E: Debug; - async fn handle( + async fn handle( &self, _task_id: impl Display + Clone, - conn: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + conn: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { let headers = conn.headers()?; if headers.method != Method::Get { @@ -241,11 +244,11 @@ where loop { let mut header = FrameHeader::recv(&mut socket) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; let payload = header .recv_payload(&mut socket, &mut buf) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; match header.frame_type { FrameType::Text(_) => { @@ -276,14 +279,11 @@ where info!("Echoing back as {header}"); - header - .send(&mut socket) - .await - .map_err(WsHandlerError::WsError)?; + header.send(&mut socket).await.map_err(WsHandlerError::Ws)?; header .send_payload(&mut socket, payload) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; } } diff --git a/edge-ws/src/io.rs b/edge-ws/src/io.rs index c4c3a0f..8b94e8f 100644 --- a/edge-ws/src/io.rs +++ b/edge-ws/src/io.rs @@ -9,6 +9,21 @@ pub use embedded_svc_compat::*; pub type Error = super::Error; +impl Error +where + E: embedded_io_async::Error, +{ + pub fn erase(&self) -> Error { + match self { + Self::Incomplete(size) => Error::Incomplete(*size), + Self::Invalid => Error::Invalid, + Self::BufferOverflow => Error::BufferOverflow, + Self::InvalidLen => Error::InvalidLen, + Self::Io(e) => Error::Io(e.kind()), + } + } +} + impl From> for Error { fn from(e: ReadExactError) -> Self { match e { diff --git a/edge-ws/src/lib.rs b/edge-ws/src/lib.rs index 625e225..5b7a243 100644 --- a/edge-ws/src/lib.rs +++ b/edge-ws/src/lib.rs @@ -63,7 +63,7 @@ impl fmt::Display for FrameType { } } -#[derive(Clone, Eq, PartialEq, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum Error { Incomplete(usize), Invalid, diff --git a/examples/http_client.rs b/examples/http_client.rs index 30b9fcc..75f0efe 100644 --- a/examples/http_client.rs +++ b/examples/http_client.rs @@ -43,8 +43,8 @@ where Ok(()) } -async fn request<'b, const N: usize, T: TcpConnect>( - conn: &mut Connection<'b, T, N>, +async fn request( + conn: &mut Connection<'_, T, N>, uri: &str, ) -> Result<(), Error> { conn.initiate_request(true, Method::Get, uri, &[("Host", "httpbin.org")]) diff --git a/examples/http_server.rs b/examples/http_server.rs index 8923506..57063d4 100644 --- a/examples/http_server.rs +++ b/examples/http_server.rs @@ -1,4 +1,4 @@ -use core::fmt::Display; +use core::fmt::{Debug, Display}; use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::io::Error; @@ -28,24 +28,27 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { .bind(addr.parse().unwrap()) .await?; - server.run(acceptor, HttpHandler).await?; + server.run(None, acceptor, HttpHandler).await?; Ok(()) } struct HttpHandler; -impl<'b, T, const N: usize> Handler<'b, T, N> for HttpHandler -where - T: Read + Write, -{ - type Error = Error; +impl Handler for HttpHandler { + type Error + = Error + where + E: Debug; - async fn handle( + async fn handle( &self, _task_id: impl Display + Copy, - conn: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + conn: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { let headers = conn.headers()?; if headers.method != Method::Get { diff --git a/examples/ws_server.rs b/examples/ws_server.rs index e0f2f6d..f1ed6bc 100644 --- a/examples/ws_server.rs +++ b/examples/ws_server.rs @@ -1,4 +1,4 @@ -use core::fmt::Display; +use core::fmt::{Debug, Display}; use edge_http::io::server::{Connection, DefaultServer, Handler}; use edge_http::io::Error; @@ -30,36 +30,39 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> { .bind(addr.parse().unwrap()) .await?; - server.run(acceptor, WsHandler).await?; + server.run(None, acceptor, WsHandler).await?; Ok(()) } #[derive(Debug)] enum WsHandlerError { - ConnectionError(C), - WsError(W), + Connection(C), + Ws(W), } impl From for WsHandlerError { fn from(e: C) -> Self { - Self::ConnectionError(e) + Self::Connection(e) } } struct WsHandler; -impl<'b, T, const N: usize> Handler<'b, T, N> for WsHandler -where - T: Read + Write, -{ - type Error = WsHandlerError, edge_ws::Error>; +impl Handler for WsHandler { + type Error + = WsHandlerError, edge_ws::Error> + where + E: Debug; - async fn handle( + async fn handle( &self, _task_id: impl Display + Clone, - conn: &mut Connection<'b, T, N>, - ) -> Result<(), Self::Error> { + conn: &mut Connection<'_, T, N>, + ) -> Result<(), Self::Error> + where + T: Read + Write, + { let headers = conn.headers()?; if headers.method != Method::Get { @@ -91,11 +94,11 @@ where loop { let mut header = FrameHeader::recv(&mut socket) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; let payload = header .recv_payload(&mut socket, &mut buf) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; match header.frame_type { FrameType::Text(_) => { @@ -126,14 +129,11 @@ where info!("Echoing back as {header}"); - header - .send(&mut socket) - .await - .map_err(WsHandlerError::WsError)?; + header.send(&mut socket).await.map_err(WsHandlerError::Ws)?; header .send_payload(&mut socket, payload) .await - .map_err(WsHandlerError::WsError)?; + .map_err(WsHandlerError::Ws)?; } }