@@ -5,7 +5,258 @@ use tokio::io::{
5
5
} ;
6
6
7
7
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
8
- /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
8
+ /// a [`tokio::io::AsyncWrite`] synchronously as a [`std::io::Write`].
9
+ ///
10
+ /// # Alternatives
11
+ ///
12
+ /// In many cases, there are better alternatives to using `SyncIoBridge`, especially
13
+ /// if you want to avoid blocking the async runtime. Consider the following scenarios:
14
+ ///
15
+ /// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and
16
+ /// might not fully leverage the async capabilities of the system.
17
+ ///
18
+ /// ### Why It Matters:
19
+ ///
20
+ /// `SyncIoBridge` allows you to use asynchronous I/O operations in an synchronous
21
+ /// context by blocking the current thread. However, this can be inefficient because:
22
+ /// - **Inefficient Resource Usage**: `SyncIoBridge` takes up an entire OS thread,
23
+ /// which is inefficient compared to asynchronous code that can multiplex many
24
+ /// tasks on a single thread.
25
+ /// - **Thread Pool Saturation**: Excessive use of `SyncIoBridge` can exhaust the
26
+ /// async runtime's thread pool, reducing the number of threads available for
27
+ /// other tasks and impacting overall performance.
28
+ /// - **Missed Concurrency Benefits**: By using synchronous operations with
29
+ /// `SyncIoBridge`, you lose the ability to interleave tasks efficiently,
30
+ /// which is a key advantage of asynchronous programming.
31
+ ///
32
+ /// ## Example 1: Hashing Data
33
+ ///
34
+ /// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can
35
+ /// process the data asynchronously by reading it into memory, which avoids blocking
36
+ /// the async runtime.
37
+ ///
38
+ /// There are two strategies for avoiding `SyncIoBridge` when hashing data. When
39
+ /// the data fits into memory, the easiest is to read the data into a `Vec<u8>`
40
+ /// and hash it:
41
+ ///
42
+ /// Explanation: This example demonstrates how to asynchronously read data from a
43
+ /// reader into memory and hash it using a synchronous hashing function. The
44
+ /// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked.
45
+ /// ```rust
46
+ /// use tokio::io::AsyncReadExt;
47
+ /// use tokio::io::AsyncRead;
48
+ /// use std::io::Cursor;
49
+ /// # mod blake3 { pub fn hash(_: &[u8]) {} }
50
+ ///
51
+ /// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
52
+ /// // Read all data from the reader into a Vec<u8>.
53
+ /// let mut data = Vec::new();
54
+ /// reader.read_to_end(&mut data).await?;
55
+ ///
56
+ /// // Hash the data using the blake3 hashing function.
57
+ /// let hash = blake3::hash(&data);
58
+ ///
59
+ /// Ok(hash)
60
+ ///}
61
+ ///
62
+ /// #[tokio::main]
63
+ /// async fn main() -> Result<(), std::io::Error> {
64
+ /// // Example: In-memory data.
65
+ /// let data = b"Hello, world!"; // A byte slice.
66
+ /// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
67
+ /// hash_contents(reader).await
68
+ /// }
69
+ /// ```
70
+ ///
71
+ /// When the data doesn't fit into memory, the hashing library will usually
72
+ /// provide a `hasher` that you can repeatedly call `update` on to hash the data
73
+ /// one chunk at the time.
74
+ ///
75
+ /// Explanation: This example demonstrates how to asynchronously stream data in
76
+ /// chunks for hashing. Each chunk is read asynchronously, and the hash is updated
77
+ /// incrementally. This avoids blocking and improves performance over using
78
+ /// `SyncIoBridge`.
79
+ ///
80
+ /// ```rust
81
+ /// use tokio::io::AsyncReadExt;
82
+ /// use tokio::io::AsyncRead;
83
+ /// use std::io::Cursor;
84
+ /// # struct Hasher;
85
+ /// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
86
+ ///
87
+ /// /// Asynchronously streams data from an async reader, processes it in chunks,
88
+ /// /// and hashes the data incrementally.
89
+ /// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
90
+ /// // Create a buffer to read data into, sized for performance.
91
+ /// let mut data = vec![0; 64 * 1024];
92
+ /// loop {
93
+ /// // Read data from the reader into the buffer.
94
+ /// let len = reader.read(&mut data).await?;
95
+ /// if len == 0 { break; } // Exit loop if no more data.
96
+ ///
97
+ /// // Update the hash with the data read.
98
+ /// hasher.update(&data[..len]);
99
+ /// }
100
+ ///
101
+ /// // Finalize the hash after all data has been processed.
102
+ /// let hash = hasher.finalize();
103
+ ///
104
+ /// Ok(hash)
105
+ ///}
106
+ ///
107
+ /// #[tokio::main]
108
+ /// async fn main() -> Result<(), std::io::Error> {
109
+ /// // Example: In-memory data.
110
+ /// let data = b"Hello, world!"; // A byte slice.
111
+ /// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
112
+ /// let hasher = Hasher;
113
+ /// hash_stream(reader, hasher).await
114
+ /// }
115
+ /// ```
116
+ ///
117
+ ///
118
+ /// ## Example 2: Compressing Data
119
+ ///
120
+ /// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces
121
+ /// blocking and inefficient code. Instead, you can utilize an async compression library
122
+ /// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/)
123
+ /// crate, which is built to handle asynchronous data streams efficiently.
124
+ ///
125
+ /// Explanation: This example shows how to asynchronously compress data using an
126
+ /// async compression library. By reading and writing asynchronously, it avoids
127
+ /// blocking and is more efficient than using `SyncIoBridge` with a non-async
128
+ /// compression library.
129
+ ///
130
+ /// ```ignore
131
+ /// use async_compression::tokio::write::GzipEncoder;
132
+ /// use std::io::Cursor;
133
+ /// use tokio::io::AsyncRead;
134
+ ///
135
+ /// /// Asynchronously compresses data from an async reader using Gzip and an async encoder.
136
+ /// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
137
+ /// let writer = tokio::io::sink();
138
+ ///
139
+ /// // Create a Gzip encoder that wraps the writer.
140
+ /// let mut encoder = GzipEncoder::new(writer);
141
+ ///
142
+ /// // Copy data from the reader to the encoder, compressing it.
143
+ /// tokio::io::copy(&mut reader, &mut encoder).await?;
144
+ ///
145
+ /// Ok(())
146
+ ///}
147
+ ///
148
+ /// #[tokio::main]
149
+ /// async fn main() -> Result<(), std::io::Error> {
150
+ /// // Example: In-memory data.
151
+ /// let data = b"Hello, world!"; // A byte slice.
152
+ /// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
153
+ /// compress_data(reader).await?;
154
+ ///
155
+ /// Ok(())
156
+ /// }
157
+ /// ```
158
+ ///
159
+ ///
160
+ /// ## Example 3: Parsing Data Formats
161
+ ///
162
+ ///
163
+ /// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it
164
+ /// blocks async operations. A more efficient approach is to read data asynchronously
165
+ /// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead.
166
+ ///
167
+ /// Explanation: This example shows how to asynchronously read data into memory
168
+ /// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime
169
+ /// remains unblocked, leading to better performance when working with asynchronous
170
+ /// I/O streams.
171
+ ///
172
+ /// ```rust,no_run
173
+ /// use tokio::io::AsyncRead;
174
+ /// use tokio::io::AsyncReadExt;
175
+ /// use std::io::Cursor;
176
+ /// # mod serde {
177
+ /// # pub trait DeserializeOwned: 'static {}
178
+ /// # impl<T: 'static> DeserializeOwned for T {}
179
+ /// # }
180
+ /// # mod serde_json {
181
+ /// # use super::serde::DeserializeOwned;
182
+ /// # pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
183
+ /// # unimplemented!()
184
+ /// # }
185
+ /// # }
186
+ /// # #[derive(Debug)] struct MyStruct;
187
+ ///
188
+ ///
189
+ /// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
190
+ /// // Read all data from the reader into a Vec<u8>.
191
+ /// let mut data = Vec::new();
192
+ /// reader.read_to_end(&mut data).await?;
193
+ ///
194
+ /// // Deserialize the data from the Vec<u8> into a MyStruct instance.
195
+ /// let value: MyStruct = serde_json::from_slice(&data)?;
196
+ ///
197
+ /// Ok(value)
198
+ ///}
199
+ ///
200
+ /// #[tokio::main]
201
+ /// async fn main() -> Result<(), std::io::Error> {
202
+ /// // Example: In-memory data.
203
+ /// let data = b"Hello, world!"; // A byte slice.
204
+ /// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
205
+ /// parse_json(reader).await?;
206
+ /// Ok(())
207
+ /// }
208
+ /// ```
209
+ ///
210
+ /// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
211
+ ///
212
+ /// `SyncIoBridge` is mainly useful when you need to interface with synchronous
213
+ /// libraries from an asynchronous context.
214
+ ///
215
+ /// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking`
216
+ /// task to safely perform synchronous I/O without blocking the async runtime. The
217
+ /// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated
218
+ /// thread pool, preventing it from interfering with the async tasks.
219
+ ///
220
+ /// ```rust
221
+ /// use tokio::task::spawn_blocking;
222
+ /// use tokio_util::io::SyncIoBridge;
223
+ /// use tokio::io::AsyncRead;
224
+ /// use std::marker::Unpin;
225
+ /// use std::io::Cursor;
226
+ ///
227
+ /// /// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
228
+ /// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
229
+ /// // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
230
+ /// let mut sync_reader = SyncIoBridge::new(reader);
231
+ ///
232
+ /// // Spawn a blocking task to perform synchronous I/O operations.
233
+ /// let result = spawn_blocking(move || {
234
+ /// // Create an in-memory buffer to hold the copied data.
235
+ /// let mut buffer = Vec::new();
236
+ /// // Copy data from the sync_reader to the buffer.
237
+ /// std::io::copy(&mut sync_reader, &mut buffer)?;
238
+ /// // Return the buffer containing the copied data.
239
+ /// Ok::<_, std::io::Error>(buffer)
240
+ /// })
241
+ /// .await??;
242
+ ///
243
+ /// // Return the result from the blocking task.
244
+ /// Ok(result)
245
+ ///}
246
+ ///
247
+ /// #[tokio::main]
248
+ /// async fn main() -> Result<(), std::io::Error> {
249
+ /// // Example: In-memory data.
250
+ /// let data = b"Hello, world!"; // A byte slice.
251
+ /// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
252
+ /// let result = process_sync_io(reader).await?;
253
+ ///
254
+ /// // You can use `result` here as needed.
255
+ ///
256
+ /// Ok(())
257
+ /// }
258
+ /// ```
259
+ ///
9
260
#[ derive( Debug ) ]
10
261
pub struct SyncIoBridge < T > {
11
262
src : T ,
0 commit comments