diff --git a/embedded-io-adapters/Cargo.toml b/embedded-io-adapters/Cargo.toml index 89721f1b..a2e13847 100644 --- a/embedded-io-adapters/Cargo.toml +++ b/embedded-io-adapters/Cargo.toml @@ -16,8 +16,10 @@ categories = [ std = ["embedded-io/std"] tokio-1 = ["std", "dep:tokio", "dep:embedded-io-async", "embedded-io-async?/std"] futures-03 = ["std", "dep:futures", "dep:embedded-io-async", "embedded-io-async?/std"] +unblock = ["std", "dep:embedded-io-async", "dep:blocking"] [dependencies] +blocking = { version = "1.6.0", optional = true } embedded-io = { version = "0.6", path = "../embedded-io" } embedded-io-async = { version = "0.6.1", path = "../embedded-io-async", optional = true } @@ -25,5 +27,5 @@ futures = { version = "0.3.21", features = ["std"], default-features = false, op tokio = { version = "1", features = ["io-util"], default-features = false, optional = true } [package.metadata.docs.rs] -features = ["std", "tokio-1", "futures-03"] +features = ["std", "tokio-1", "futures-03", "unblock"] rustdoc-args = ["--cfg", "docsrs"] diff --git a/embedded-io-adapters/src/lib.rs b/embedded-io-adapters/src/lib.rs index c5e6710a..5a2edeb9 100644 --- a/embedded-io-adapters/src/lib.rs +++ b/embedded-io-adapters/src/lib.rs @@ -16,3 +16,7 @@ pub mod futures_03; #[cfg(feature = "tokio-1")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio-1")))] pub mod tokio_1; + +#[cfg(feature = "unblock")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub mod unblock; diff --git a/embedded-io-adapters/src/unblock.rs b/embedded-io-adapters/src/unblock.rs new file mode 100644 index 00000000..630918ea --- /dev/null +++ b/embedded-io-adapters/src/unblock.rs @@ -0,0 +1,168 @@ +//! Adapters to/from `futures::io` traits. + +// MSRV is 1.60 if you don't enable async, 1.80 if you do. +// Cargo.toml has 1.60, which makes Clippy complain that `poll_fn` was introduced +// in 1.64. So, just silence it for this file. +#![allow(clippy::incompatible_msrv)] + +use std::sync::{Arc, Mutex}; + +use blocking::unblock; + +/// Adapter from `embedded_io` traits to `embedded_io_async` traits. +/// +/// This is not suitable for use in embedded environments, but it can be useful for quickly +/// iterating on driver code from your desktop without constantly re-flashing development boards. +/// +/// This is quite inefficient, because it does IO operations on a threadpool, and does +/// an awful lot of copying. No attempt has been made to optimize this. +/// +/// If you have access to a port implementing std::io::Read + std::io::Write and either +/// std::os::unix::io::AsRawFd or std::os::windows::io::AsRawSocket, you should attempt to use +/// `async_io::Async` followed by `embedded_io_adapters::futures_03::FromFutures` instead. +/// +/// If you only need `embedded_io_async::Read` or `embedded_io_async::Write`, you can use +/// `UnblockRead` or `UnblockWrite`. In practice, most of the time you should just use this adapter. +/// +/// The ergonomics of this are a bit worse than the other adapters because we need to avoid +/// overlapping impls of embedded_io::ErrorType. +pub struct Unblock { + read: UnblockRead, + write: UnblockWrite, +} + +impl Unblock { + /// Create a new adapter. + pub fn new(port: T) -> Self { + let inner = Arc::new(Mutex::new(port)); + Self { + read: UnblockRead { + inner: inner.clone(), + }, + write: UnblockWrite { inner }, + } + } +} + +impl embedded_io::ErrorType + for Unblock +{ + type Error = T::Error; +} + +impl embedded_io_async::Read + for Unblock +where + T::Error: Send + 'static, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + self.read.read(buf).await + } +} + +impl embedded_io_async::Write + for Unblock +where + T::Error: Send + 'static, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + self.write.write(buf).await + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + self.write.flush().await + } +} + +/// Use this if you have a port that only implements `embedded_io::Read`. Otherwise, use `Unblock`. +/// +/// The ergonomics of this are a bit worse than the other adapters because we need to avoid +/// overlapping impls of embedded_io::ErrorType. +pub struct UnblockRead { + inner: Arc>, +} + +impl UnblockRead { + /// Create a new adapter. + pub fn new(port: T) -> Self { + Self { + inner: Arc::new(Mutex::new(port)), + } + } +} + +impl embedded_io_async::Read for UnblockRead +where + T::Error: Send + 'static, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + let max_len = buf.len(); + let inner = self.inner.clone(); + + let result = unblock(move || { + let mut inner_buf: Vec<_> = std::iter::repeat(0u8).take(max_len).collect(); + match inner.lock().unwrap().read(&mut inner_buf) { + Ok(count) => { + inner_buf.resize(count, 0); + Ok(inner_buf) + } + Err(e) => Err(e), + } + }) + .await; + + match result { + Ok(inner_buf) => { + buf[..inner_buf.len()].copy_from_slice(&inner_buf); + Ok(inner_buf.len()) + } + Err(e) => Err(e), + } + } +} + +impl embedded_io::ErrorType for UnblockRead { + type Error = T::Error; +} + +/// Use this if you have a port that only implements `embedded_io::Write`. Otherwise, use `Unblock`. +/// +/// The ergonomics of this are a bit worse than the other adapters because we need to avoid +/// overlapping impls of embedded_io::ErrorType. +pub struct UnblockWrite { + inner: Arc>, +} + +impl UnblockWrite { + /// Create a new adapter. + pub fn new(port: T) -> Self { + Self { + inner: Arc::new(Mutex::new(port)), + } + } +} + +impl embedded_io::ErrorType for UnblockWrite { + type Error = T::Error; +} + +impl embedded_io_async::Write for UnblockWrite +where + T::Error: Send + 'static, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + let inner = self.inner.clone(); + let inner_buf = Vec::from(buf); + + unblock(move || { + let inner_buf = inner_buf; + inner.lock().unwrap().write(&inner_buf) + }) + .await + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + let inner = self.inner.clone(); + unblock(move || inner.lock().unwrap().flush()).await + } +}