1
1
use super :: { AsyncPollable , AsyncRead , AsyncWrite } ;
2
2
use std:: cell:: OnceCell ;
3
- use std:: io:: Result ;
3
+ use std:: future:: { poll_fn, Future } ;
4
+ use std:: pin:: Pin ;
5
+ use std:: task:: { Context , Poll } ;
4
6
use wasi:: io:: streams:: { InputStream , OutputStream , StreamError } ;
5
7
6
8
/// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and
@@ -21,18 +23,23 @@ impl AsyncInputStream {
21
23
stream,
22
24
}
23
25
}
24
- /// Await for read readiness.
25
- async fn ready ( & self ) {
26
+ fn poll_ready ( & self , cx : & mut Context < ' _ > ) -> Poll < ( ) > {
26
27
// Lazily initialize the AsyncPollable
27
28
let subscription = self
28
29
. subscription
29
30
. get_or_init ( || AsyncPollable :: new ( self . stream . subscribe ( ) ) ) ;
30
31
// Wait on readiness
31
- subscription. wait_for ( ) . await ;
32
+ let wait_for = subscription. wait_for ( ) ;
33
+ let mut pinned = std:: pin:: pin!( wait_for) ;
34
+ pinned. as_mut ( ) . poll ( cx)
35
+ }
36
+ /// Await for read readiness.
37
+ async fn ready ( & self ) {
38
+ poll_fn ( |cx| self . poll_ready ( cx) ) . await
32
39
}
33
40
/// Asynchronously read from the input stream.
34
41
/// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`.
35
- pub async fn read ( & self , buf : & mut [ u8 ] ) -> Result < usize > {
42
+ pub async fn read ( & self , buf : & mut [ u8 ] ) -> std :: io :: Result < usize > {
36
43
let read = loop {
37
44
self . ready ( ) . await ;
38
45
// Ideally, the ABI would be able to read directly into buf.
@@ -56,10 +63,40 @@ impl AsyncInputStream {
56
63
buf[ 0 ..len] . copy_from_slice ( & read) ;
57
64
Ok ( len)
58
65
}
66
+
67
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
68
+ /// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
69
+ /// will be at most 8k. If you want to control chunk size, use
70
+ /// `Self::into_stream_of`.
71
+ pub fn into_stream ( self ) -> AsyncInputChunkStream {
72
+ AsyncInputChunkStream {
73
+ stream : self ,
74
+ chunk_size : 8 * 1024 ,
75
+ }
76
+ }
77
+
78
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
79
+ /// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
80
+ /// will be at most the `chunk_size` argument specified.
81
+ pub fn into_stream_of ( self , chunk_size : usize ) -> AsyncInputChunkStream {
82
+ AsyncInputChunkStream {
83
+ stream : self ,
84
+ chunk_size,
85
+ }
86
+ }
87
+
88
+ /// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
89
+ /// items of `Result<u8, std::io::Error>`.
90
+ pub fn into_bytestream ( self ) -> AsyncInputByteStream {
91
+ AsyncInputByteStream {
92
+ stream : self . into_stream ( ) ,
93
+ buffer : std:: io:: Read :: bytes ( std:: io:: Cursor :: new ( Vec :: new ( ) ) ) ,
94
+ }
95
+ }
59
96
}
60
97
61
98
impl AsyncRead for AsyncInputStream {
62
- async fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < usize > {
99
+ async fn read ( & mut self , buf : & mut [ u8 ] ) -> std :: io :: Result < usize > {
63
100
Self :: read ( self , buf) . await
64
101
}
65
102
@@ -69,6 +106,87 @@ impl AsyncRead for AsyncInputStream {
69
106
}
70
107
}
71
108
109
+ /// Wrapper of `AsyncInputStream` that impls `futures_core::stream::Stream`
110
+ /// with an item of `Result<Vec<u8>, std::io::Error>`
111
+ pub struct AsyncInputChunkStream {
112
+ stream : AsyncInputStream ,
113
+ chunk_size : usize ,
114
+ }
115
+
116
+ impl AsyncInputChunkStream {
117
+ /// Extract the `AsyncInputStream` which backs this stream.
118
+ pub fn into_inner ( self ) -> AsyncInputStream {
119
+ self . stream
120
+ }
121
+ }
122
+
123
+ impl futures_core:: stream:: Stream for AsyncInputChunkStream {
124
+ type Item = Result < Vec < u8 > , std:: io:: Error > ;
125
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
126
+ match self . stream . poll_ready ( cx) {
127
+ Poll :: Pending => Poll :: Pending ,
128
+ Poll :: Ready ( ( ) ) => match self . stream . stream . read ( self . chunk_size as u64 ) {
129
+ Ok ( r) if r. is_empty ( ) => Poll :: Pending ,
130
+ Ok ( r) => Poll :: Ready ( Some ( Ok ( r) ) ) ,
131
+ Err ( StreamError :: LastOperationFailed ( err) ) => {
132
+ Poll :: Ready ( Some ( Err ( std:: io:: Error :: other ( err. to_debug_string ( ) ) ) ) )
133
+ }
134
+ Err ( StreamError :: Closed ) => Poll :: Ready ( None ) ,
135
+ } ,
136
+ }
137
+ }
138
+ }
139
+
140
+ pin_project_lite:: pin_project! {
141
+ /// Wrapper of `AsyncInputStream` that impls
142
+ /// `futures_core::stream::Stream` with item `Result<u8, std::io::Error>`.
143
+ pub struct AsyncInputByteStream {
144
+ #[ pin]
145
+ stream: AsyncInputChunkStream ,
146
+ buffer: std:: io:: Bytes <std:: io:: Cursor <Vec <u8 >>>,
147
+ }
148
+ }
149
+
150
+ impl AsyncInputByteStream {
151
+ /// Extract the `AsyncInputStream` which backs this stream, and any bytes
152
+ /// read from the `AsyncInputStream` which have not yet been yielded by
153
+ /// the byte stream.
154
+ pub fn into_inner ( self ) -> ( AsyncInputStream , Vec < u8 > ) {
155
+ (
156
+ self . stream . into_inner ( ) ,
157
+ self . buffer
158
+ . collect :: < Result < Vec < u8 > , std:: io:: Error > > ( )
159
+ . expect ( "read of Cursor<Vec<u8>> is infallible" ) ,
160
+ )
161
+ }
162
+ }
163
+
164
+ impl futures_core:: stream:: Stream for AsyncInputByteStream {
165
+ type Item = Result < u8 , std:: io:: Error > ;
166
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
167
+ let this = self . project ( ) ;
168
+ match this. buffer . next ( ) {
169
+ Some ( byte) => Poll :: Ready ( Some ( Ok ( byte. expect ( "cursor on Vec<u8> is infallible" ) ) ) ) ,
170
+ None => match futures_core:: stream:: Stream :: poll_next ( this. stream , cx) {
171
+ Poll :: Ready ( Some ( Ok ( bytes) ) ) => {
172
+ let mut bytes = std:: io:: Read :: bytes ( std:: io:: Cursor :: new ( bytes) ) ;
173
+ match bytes. next ( ) {
174
+ Some ( Ok ( byte) ) => {
175
+ * this. buffer = bytes;
176
+ Poll :: Ready ( Some ( Ok ( byte) ) )
177
+ }
178
+ Some ( Err ( err) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
179
+ None => Poll :: Ready ( None ) ,
180
+ }
181
+ }
182
+ Poll :: Ready ( Some ( Err ( err) ) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
183
+ Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
184
+ Poll :: Pending => Poll :: Pending ,
185
+ } ,
186
+ }
187
+ }
188
+ }
189
+
72
190
/// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and
73
191
/// `AsyncPollable`.
74
192
#[ derive( Debug ) ]
@@ -104,7 +222,7 @@ impl AsyncOutputStream {
104
222
/// a `std::io::Error` indicating either an error returned by the stream write
105
223
/// using the debug string provided by the WASI error, or else that the,
106
224
/// indicated by `std::io::ErrorKind::ConnectionReset`.
107
- pub async fn write ( & self , buf : & [ u8 ] ) -> Result < usize > {
225
+ pub async fn write ( & self , buf : & [ u8 ] ) -> std :: io :: Result < usize > {
108
226
// Loops at most twice.
109
227
loop {
110
228
match self . stream . check_write ( ) {
@@ -145,7 +263,7 @@ impl AsyncOutputStream {
145
263
/// the stream flush, using the debug string provided by the WASI error,
146
264
/// or else that the stream is closed, indicated by
147
265
/// `std::io::ErrorKind::ConnectionReset`.
148
- pub async fn flush ( & self ) -> Result < ( ) > {
266
+ pub async fn flush ( & self ) -> std :: io :: Result < ( ) > {
149
267
match self . stream . flush ( ) {
150
268
Ok ( ( ) ) => {
151
269
self . ready ( ) . await ;
@@ -162,10 +280,10 @@ impl AsyncOutputStream {
162
280
}
163
281
impl AsyncWrite for AsyncOutputStream {
164
282
// Required methods
165
- async fn write ( & mut self , buf : & [ u8 ] ) -> Result < usize > {
283
+ async fn write ( & mut self , buf : & [ u8 ] ) -> std :: io :: Result < usize > {
166
284
Self :: write ( self , buf) . await
167
285
}
168
- async fn flush ( & mut self ) -> Result < ( ) > {
286
+ async fn flush ( & mut self ) -> std :: io :: Result < ( ) > {
169
287
Self :: flush ( self ) . await
170
288
}
171
289
@@ -180,7 +298,7 @@ pub(crate) async fn splice(
180
298
reader : & AsyncInputStream ,
181
299
writer : & AsyncOutputStream ,
182
300
len : u64 ,
183
- ) -> core :: result :: Result < u64 , StreamError > {
301
+ ) -> Result < u64 , StreamError > {
184
302
// Wait for both streams to be ready.
185
303
let r = reader. ready ( ) ;
186
304
writer. ready ( ) . await ;
0 commit comments