Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions edge-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async fn request<'b, const N: usize, T: TcpConnect>(

```rust
use edge_http::io::server::{Connection, DefaultServer, Handler};
use edge_http::io::Error;
use edge_http::Method;
use edge_nal::TcpBind;

Expand All @@ -130,7 +131,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, HttpHandler, None).await?;
server.run(acceptor, HttpHandler, None, None).await?;

Ok(())
}
Expand All @@ -140,9 +141,8 @@ struct HttpHandler;
impl<'b, T, const N: usize> Handler<'b, T, N> for HttpHandler
where
T: Read + Write,
T::Error: Send + Sync + std::error::Error + 'static,
{
type Error = anyhow::Error;
type Error = Error<T::Error>;

async fn handle(&self, conn: &mut Connection<'b, T, N>) -> Result<(), Self::Error> {
let headers = conn.headers()?;
Expand Down
2 changes: 0 additions & 2 deletions edge-http/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub enum Error<E> {
IncompleteHeaders,
IncompleteBody,
InvalidState,
Timeout,
ConnectionClosed,
HeadersMismatchError(HeadersMismatchError),
WsUpgradeError(UpgradeError),
Expand Down Expand Up @@ -87,7 +86,6 @@ where
Self::IncompleteHeaders => write!(f, "HTTP headers section is incomplete"),
Self::IncompleteBody => write!(f, "HTTP body is incomplete"),
Self::InvalidState => write!(f, "Connection is not in requested state"),
Self::Timeout => write!(f, "Timeout"),
Self::HeadersMismatchError(e) => write!(f, "Headers mismatch: {e}"),
Self::WsUpgradeError(e) => write!(f, "WebSocket upgrade error: {e}"),
Self::ConnectionClosed => write!(f, "Connection closed"),
Expand Down
21 changes: 16 additions & 5 deletions edge-http/src/io/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use core::str;

use embedded_io_async::{ErrorType, Read, Write};

use edge_nal::TcpConnect;
use edge_nal::{Close, TcpConnect, TcpShutdown};

use crate::{
ws::{upgrade_request_headers, MAX_BASE64_KEY_LEN, MAX_BASE64_KEY_RESPONSE_LEN, NONCE_LEN},
Expand Down Expand Up @@ -39,6 +39,11 @@ where
{
/// Create a new client connection.
///
/// Note that the connection does not have any built-in read/write timeouts:
/// - To add a timeout on each IO operation, wrap the `socket` type with the `edge_nal::WithTimeout` wrapper.
/// - To add a global request-response timeout, wrap your complete request-response processing
/// logic with the `edge_nal::with_timeout` function.
///
/// Parameters:
/// - `buf`: A buffer to use for reading and writing data.
/// - `socket`: The TCP stack to use for the connection.
Expand Down Expand Up @@ -234,11 +239,17 @@ where
let mut state = self.unbind();

match result {
Ok(true) | Err(_) => state.io = None,
_ => (),
};
Ok(true) | Err(_) => {
let mut io = state.io.take().unwrap();
*self = Self::Unbound(state);

*self = Self::Unbound(state);
io.close(Close::Both).await.map_err(Error::Io)?;
let _ = io.abort().await;
}
_ => {
*self = Self::Unbound(state);
}
};

result?;

Expand Down
115 changes: 71 additions & 44 deletions edge-http/src/io/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use core::fmt::{self, Debug};
use core::mem::{self, MaybeUninit};
use core::pin::pin;

use embassy_futures::select::Either;
use edge_nal::{with_timeout, Close, TcpShutdown, WithTimeout, WithTimeoutError};
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::mutex::Mutex;
use embassy_time::{Duration, Timer};

use embedded_io_async::{ErrorType, Read, Write};

Expand All @@ -22,7 +20,8 @@ pub use embedded_svc_compat::*;

pub const DEFAULT_HANDLER_TASKS_COUNT: usize = 4;
pub const DEFAULT_BUF_SIZE: usize = 2048;
pub const DEFAULT_TIMEOUT_MS: u32 = 5000;
pub const DEFAULT_REQUEST_TIMEOUT_MS: u32 = 30 * 60 * 1000; // 30 minutes
pub const DEFAULT_IO_TIMEOUT_MS: u32 = 50 * 1000; // 50 seconds

const COMPLETION_BUF_SIZE: usize = 64;

Expand All @@ -41,30 +40,21 @@ where
{
/// Create a new connection state machine for an incoming request
///
/// Note that the connection does not have any built-in read/write timeouts:
/// - To add a timeout on each IO operation, wrap the `io` type with the `edge_nal::WithTimeout` wrapper.
/// - To add a global request-response timeout, wrap your complete request-response processing
/// logic with the `edge_nal::with_timeout` function.
///
/// Parameters:
/// - `buf`: A buffer to store the request headers
/// - `io`: A socket stream
/// - `timeout_ms`: An optional timeout in milliseconds to wait for a new incoming request
pub async fn new(
buf: &'b mut [u8],
mut io: T,
timeout_ms: Option<u32>,
) -> Result<Connection<'b, T, N>, Error<T::Error>> {
let mut request = RequestHeaders::new();

let (buf, read_len) = {
let timeout_ms = timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);

let receive = pin!(request.receive(buf, &mut io, true));
let timer = Timer::after(Duration::from_millis(timeout_ms as _));

let result = embassy_futures::select::select(receive, timer).await;

match result {
Either::First(result) => result,
Either::Second(_) => Err(Error::Timeout),
}?
};
let (buf, read_len) = request.receive(buf, &mut io, true).await?;

let (connection_type, body_type) = request.resolve::<T::Error>()?;

Expand Down Expand Up @@ -436,13 +426,20 @@ where
pub async fn handle_connection<const N: usize, T, H>(
io: T,
buf: &mut [u8],
timeout_ms: Option<u32>,
request_timeout_ms: Option<u32>,
handler: H,
) where
H: for<'b> Handler<'b, &'b mut T, N>,
T: Read + Write,
T: Read + Write + TcpShutdown,
{
handle_task_connection(io, buf, timeout_ms, 0, TaskHandlerAdaptor::new(handler)).await
handle_task_connection(
io,
buf,
request_timeout_ms,
0,
TaskHandlerAdaptor::new(handler),
)
.await
}

/// A convenience function to handle multiple HTTP requests over a single socket stream,
Expand All @@ -453,41 +450,52 @@ pub async fn handle_connection<const N: usize, T, H>(
pub async fn handle_task_connection<const N: usize, T, H>(
mut io: T,
buf: &mut [u8],
timeout_ms: Option<u32>,
request_timeout_ms: Option<u32>,
task_id: usize,
handler: H,
) where
H: for<'b> TaskHandler<'b, &'b mut T, N>,
T: Read + Write,
T: Read + Write + TcpShutdown,
{
loop {
let close = loop {
debug!("Handler task {task_id}: Waiting for new request");

let result =
handle_task_request::<N, _, _>(buf, &mut io, task_id, timeout_ms, &handler).await;
let result = with_timeout(
request_timeout_ms.unwrap_or(DEFAULT_REQUEST_TIMEOUT_MS),
handle_task_request::<N, _, _>(buf, &mut io, task_id, &handler),
)
.await;

match result {
Err(HandleRequestError::Connection(Error::Timeout)) => {
info!("Handler task {task_id}: Connection closed due to timeout");
break;
Err(WithTimeoutError::Timeout) => {
info!("Handler task {task_id}: Connection closed due to request timeout");
break false;
}
Err(HandleRequestError::Connection(Error::ConnectionClosed)) => {
Err(WithTimeoutError::IO(HandleRequestError::Connection(Error::ConnectionClosed))) => {
debug!("Handler task {task_id}: Connection closed");
break;
break false;
}
Err(e) => {
warn!("Handler task {task_id}: Error when handling request: {e:?}");
break;
break true;
}
Ok(needs_close) => {
if needs_close {
debug!("Handler task {task_id}: Request complete; closing connection");
break;
break true;
} else {
debug!("Handler task {task_id}: Request complete");
}
}
}
};

if close {
if let Err(e) = io.close(Close::Both).await {
warn!("Handler task {task_id}: Error when closing the socket: {e:?}");
}
} else {
let _ = io.abort().await;
}
}

Expand Down Expand Up @@ -519,6 +527,19 @@ where
}
}

impl<C, E> embedded_io_async::Error for HandleRequestError<C, E>
where
C: Debug + embedded_io_async::Error,
E: Debug,
{
fn kind(&self) -> embedded_io_async::ErrorKind {
match self {
Self::Connection(Error::Io(e)) => e.kind(),
_ => embedded_io_async::ErrorKind::Other,
}
}
}

#[cfg(feature = "std")]
impl<C, E> std::error::Error for HandleRequestError<C, E>
where
Expand All @@ -532,14 +553,13 @@ where
pub async fn handle_request<'b, const N: usize, H, T>(
buf: &'b mut [u8],
io: T,
timeout_ms: Option<u32>,
handler: H,
) -> Result<bool, HandleRequestError<T::Error, H::Error>>
where
H: Handler<'b, T, N>,
T: Read + Write,
{
handle_task_request(buf, io, 0, timeout_ms, TaskHandlerAdaptor::new(handler)).await
handle_task_request(buf, io, 0, TaskHandlerAdaptor::new(handler)).await
}

/// A convenience function to handle a single HTTP request over a socket stream,
Expand All @@ -548,14 +568,13 @@ pub async fn handle_task_request<'b, const N: usize, H, T>(
buf: &'b mut [u8],
io: T,
task_id: usize,
timeout_ms: Option<u32>,
handler: H,
) -> Result<bool, HandleRequestError<T::Error, H::Error>>
where
H: TaskHandler<'b, T, N>,
T: Read + Write,
{
let mut connection = Connection::<_, N>::new(buf, io, timeout_ms).await?;
let mut connection = Connection::<_, N>::new(buf, io).await?;

let result = handler.handle(task_id, &mut connection).await;

Expand Down Expand Up @@ -601,11 +620,12 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
&mut self,
acceptor: A,
handler: H,
timeout_ms: Option<u32>,
request_timeout_ms: Option<u32>,
io_timeout_ms: Option<u32>,
) -> Result<(), Error<A::Error>>
where
A: edge_nal::TcpAccept,
H: for<'b, 't> Handler<'b, &'b mut A::Socket<'t>, N>,
H: for<'b, 't> Handler<'b, &'b mut WithTimeout<A::Socket<'t>>, N>,
{
let handler = TaskHandlerAdaptor::new(handler);

Expand Down Expand Up @@ -637,12 +657,15 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
acceptor.accept().await.map_err(Error::Io)?.1
};

let io =
WithTimeout::new(io_timeout_ms.unwrap_or(DEFAULT_IO_TIMEOUT_MS), io);

debug!("Handler task {task_id}: Got connection request");

handle_task_connection::<N, _, _>(
io,
unsafe { buf.as_mut() }.unwrap(),
timeout_ms,
request_timeout_ms,
task_id,
handler,
)
Expand All @@ -667,11 +690,12 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
&mut self,
acceptor: A,
handler: H,
timeout_ms: Option<u32>,
request_timeout_ms: Option<u32>,
io_timeout_ms: Option<u32>,
) -> Result<(), Error<A::Error>>
where
A: edge_nal::TcpAccept,
H: for<'b, 't> TaskHandler<'b, &'b mut A::Socket<'t>, N>,
H: for<'b, 't> TaskHandler<'b, &'b mut WithTimeout<A::Socket<'t>>, N>,
{
let mutex = Mutex::<NoopRawMutex, _>::new(());
let mut tasks = heapless::Vec::<_, P>::new();
Expand Down Expand Up @@ -699,12 +723,15 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
acceptor.accept().await.map_err(Error::Io)?.1
};

let io =
WithTimeout::new(io_timeout_ms.unwrap_or(DEFAULT_IO_TIMEOUT_MS), io);

debug!("Handler task {task_id}: Got connection request");

handle_task_connection::<N, _, _>(
io,
unsafe { buf.as_mut() }.unwrap(),
timeout_ms,
request_timeout_ms,
task_id,
handler,
)
Expand Down
3 changes: 2 additions & 1 deletion edge-nal-embassy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ categories = [
embedded-io-async = { workspace = true }
edge-nal = { workspace = true }
heapless = { workspace = true }
# Do not require these features and conditionalize the code instead
# TODO: Do not require these features and conditionalize the code instead
embassy-net = { version = "0.4", features = ["tcp", "udp", "dns", "proto-ipv6", "medium-ethernet", "proto-ipv4", "igmp"] }
embassy-futures = { workspace = true }
6 changes: 4 additions & 2 deletions edge-nal-embassy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ A bare-metal implementation of `edge-nal` based on the [embassy-net](https://cra

### TCP

All traits.
All traits except `Readable` which - while implemented - panics if called.

### UDP

* All traits except `UdpConnect` and `Multicast`.
* All traits except `UdpConnect`.
* `MulticastV6` - while implemented - panics if `join_v6` / `leave_v6` are called.
* `Readable` - while implemented - panics if called.

### Raw sockets

Expand Down
Loading
Loading