Skip to content

Commit 84e34c8

Browse files
committed
Unblock adapter from embedded_io::{Read, Write} to embedded_io_async::{Read, Write}
1 parent 0f31514 commit 84e34c8

File tree

3 files changed

+175
-1
lines changed

3 files changed

+175
-1
lines changed

embedded-io-adapters/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ categories = [
1616
std = ["embedded-io/std"]
1717
tokio-1 = ["std", "dep:tokio", "dep:embedded-io-async", "embedded-io-async?/std"]
1818
futures-03 = ["std", "dep:futures", "dep:embedded-io-async", "embedded-io-async?/std"]
19+
unblock = ["std", "dep:embedded-io-async", "dep:blocking"]
1920

2021
[dependencies]
22+
blocking = { version = "1.6.0", optional = true }
2123
embedded-io = { version = "0.6", path = "../embedded-io" }
2224
embedded-io-async = { version = "0.6.1", path = "../embedded-io-async", optional = true }
2325

2426
futures = { version = "0.3.21", features = ["std"], default-features = false, optional = true }
2527
tokio = { version = "1", features = ["io-util"], default-features = false, optional = true }
2628

2729
[package.metadata.docs.rs]
28-
features = ["std", "tokio-1", "futures-03"]
30+
features = ["std", "tokio-1", "futures-03", "unblock"]
2931
rustdoc-args = ["--cfg", "docsrs"]

embedded-io-adapters/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ pub mod futures_03;
1616
#[cfg(feature = "tokio-1")]
1717
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-1")))]
1818
pub mod tokio_1;
19+
20+
#[cfg(feature = "unblock")]
21+
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
22+
pub mod unblock;
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
//! Adapters to/from `futures::io` traits.
2+
3+
// MSRV is 1.60 if you don't enable async, 1.80 if you do.
4+
// Cargo.toml has 1.60, which makes Clippy complain that `poll_fn` was introduced
5+
// in 1.64. So, just silence it for this file.
6+
#![allow(clippy::incompatible_msrv)]
7+
8+
use std::sync::{Arc, Mutex};
9+
10+
use blocking::unblock;
11+
12+
/// Adapter from `embedded_io` traits to `embedded_io_async` traits.
13+
///
14+
/// This is not suitable for use in embedded environments, but it can be useful for quickly
15+
/// iterating on driver code from your desktop without constantly re-flashing development boards.
16+
///
17+
/// This is quite inefficient, because it does IO operations on a threadpool, and does
18+
/// an awful lot of copying. No attempt has been made to optimize this.
19+
///
20+
/// If you have access to a port implementing std::io::Read + std::io::Write and either
21+
/// std::os::unix::io::AsRawFd or std::os::windows::io::AsRawSocket, you should attempt to use
22+
/// `async_io::Async` followed by `embedded_io_adapters::futures_03::FromFutures` instead.
23+
///
24+
/// If you only need `embedded_io_async::Read` or `embedded_io_async::Write`, you can use
25+
/// `UnblockRead` or `UnblockWrite`. In practice, most of the time you should just use this adapter.
26+
///
27+
/// The ergonomics of this are a bit worse than the other adapters because we need to avoid
28+
/// overlapping impls of embedded_io::ErrorType.
29+
pub struct Unblock<T: Send + Sync> {
30+
read: UnblockRead<T>,
31+
write: UnblockWrite<T>,
32+
}
33+
34+
impl<T: Send + Sync + 'static> Unblock<T> {
35+
/// Create a new adapter.
36+
pub fn new(port: T) -> Self {
37+
let inner = Arc::new(Mutex::new(port));
38+
Self {
39+
read: UnblockRead {
40+
inner: inner.clone(),
41+
},
42+
write: UnblockWrite { inner },
43+
}
44+
}
45+
}
46+
47+
impl<T: embedded_io::Read + embedded_io::Write + Send + Sync> embedded_io::ErrorType
48+
for Unblock<T>
49+
{
50+
type Error = T::Error;
51+
}
52+
53+
impl<T: embedded_io::Read + embedded_io::Write + Send + Sync + 'static> embedded_io_async::Read
54+
for Unblock<T>
55+
where
56+
T::Error: Send + 'static,
57+
{
58+
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, T::Error> {
59+
self.read.read(buf).await
60+
}
61+
}
62+
63+
impl<T: embedded_io::Read + embedded_io::Write + Send + Sync + 'static> embedded_io_async::Write
64+
for Unblock<T>
65+
where
66+
T::Error: Send + 'static,
67+
{
68+
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
69+
self.write.write(buf).await
70+
}
71+
72+
async fn flush(&mut self) -> Result<(), Self::Error> {
73+
self.write.flush().await
74+
}
75+
}
76+
77+
/// Use this if you have a port that only implements `embedded_io::Read`. Otherwise, use `Unblock`.
78+
///
79+
/// The ergonomics of this are a bit worse than the other adapters because we need to avoid
80+
/// overlapping impls of embedded_io::ErrorType.
81+
pub struct UnblockRead<T: Send + Sync> {
82+
inner: Arc<Mutex<T>>,
83+
}
84+
85+
impl<T: Send + Sync + 'static> UnblockRead<T> {
86+
/// Create a new adapter.
87+
pub fn new(port: T) -> Self {
88+
Self {
89+
inner: Arc::new(Mutex::new(port)),
90+
}
91+
}
92+
}
93+
94+
impl<T: embedded_io::Read + Send + Sync + 'static> embedded_io_async::Read for UnblockRead<T>
95+
where
96+
T::Error: Send + 'static,
97+
{
98+
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, T::Error> {
99+
let max_len = buf.len();
100+
let inner = self.inner.clone();
101+
102+
let result = unblock(move || {
103+
let mut inner_buf: Vec<_> = std::iter::repeat(0u8).take(max_len).collect();
104+
match inner.lock().unwrap().read(&mut inner_buf) {
105+
Ok(count) => {
106+
inner_buf.resize(count, 0);
107+
Ok(inner_buf)
108+
}
109+
Err(e) => Err(e),
110+
}
111+
})
112+
.await;
113+
114+
match result {
115+
Ok(inner_buf) => {
116+
buf[..inner_buf.len()].copy_from_slice(&inner_buf);
117+
Ok(inner_buf.len())
118+
}
119+
Err(e) => Err(e),
120+
}
121+
}
122+
}
123+
124+
impl<T: embedded_io::Read + Send + Sync> embedded_io::ErrorType for UnblockRead<T> {
125+
type Error = T::Error;
126+
}
127+
128+
/// Use this if you have a port that only implements `embedded_io::Write`. Otherwise, use `Unblock`.
129+
///
130+
/// The ergonomics of this are a bit worse than the other adapters because we need to avoid
131+
/// overlapping impls of embedded_io::ErrorType.
132+
pub struct UnblockWrite<T: Send + Sync> {
133+
inner: Arc<Mutex<T>>,
134+
}
135+
136+
impl<T: Send + Sync + 'static> UnblockWrite<T> {
137+
/// Create a new adapter.
138+
pub fn new(port: T) -> Self {
139+
Self {
140+
inner: Arc::new(Mutex::new(port)),
141+
}
142+
}
143+
}
144+
145+
impl<T: embedded_io::Write + Send + Sync> embedded_io::ErrorType for UnblockWrite<T> {
146+
type Error = T::Error;
147+
}
148+
149+
impl<T: embedded_io::Write + Send + Sync + 'static> embedded_io_async::Write for UnblockWrite<T>
150+
where
151+
T::Error: Send + 'static,
152+
{
153+
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
154+
let inner = self.inner.clone();
155+
let inner_buf = Vec::from(buf);
156+
157+
unblock(move || {
158+
let inner_buf = inner_buf;
159+
inner.lock().unwrap().write(&inner_buf)
160+
})
161+
.await
162+
}
163+
164+
async fn flush(&mut self) -> Result<(), Self::Error> {
165+
let inner = self.inner.clone();
166+
unblock(move || inner.lock().unwrap().flush()).await
167+
}
168+
}

0 commit comments

Comments
 (0)