22
33use std:: future:: Future ;
44use std:: io;
5- use std:: marker:: Unpin ;
65use std:: pin:: Pin ;
76use std:: task:: { Context , Poll } ;
87use std:: time:: { Duration , Instant } ;
@@ -15,27 +14,37 @@ use super::Delay;
1514///
1615/// [`FutureExt.timeout`]: trait.FutureExt.html
1716#[ derive( Debug ) ]
18- pub struct Timeout < F : Future + Unpin > {
17+ pub struct Timeout < F : Future > {
1918 future : F ,
2019 delay : Delay ,
2120}
2221
23- impl < F : Future + Unpin > Future for Timeout < F > {
22+ impl < F : Future > Future for Timeout < F > {
2423 type Output = Result < F :: Output , io:: Error > ;
2524
26- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
27- if let Poll :: Ready ( t) = Pin :: new ( & mut self . future ) . poll ( cx) {
25+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
26+ // This pinning projection is safe because:
27+ // 1. `Timeout` is only Unpin when `F` is Unpin. (Ok for default auto impl)
28+ // 2. `drop` never moves out of `F`. (No manual `Drop` impl and no `#[repr(packed)]`)
29+ // 3. `drop` on `F` must be called before overwritten or deallocated. (No manual `Drop` impl)
30+ // 4. No other operation provided for moving out `F`. (Ok)
31+ let ( future, delay) = unsafe {
32+ let Timeout { future, delay } = self . get_unchecked_mut ( ) ;
33+ ( Pin :: new_unchecked ( future) , Pin :: new ( delay) )
34+ } ;
35+
36+ if let Poll :: Ready ( t) = future. poll ( cx) {
2837 return Poll :: Ready ( Ok ( t) ) ;
2938 }
3039
31- Pin :: new ( & mut self . delay )
40+ delay
3241 . poll ( cx)
3342 . map ( |_| Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) ) )
3443 }
3544}
3645
3746/// Extend `Future` with methods to time out execution.
38- pub trait FutureExt : Future + Sized + Unpin {
47+ pub trait FutureExt : Future + Sized {
3948 /// Creates a new future which will take at most `dur` time to resolve from
4049 /// the point at which this method is called.
4150 ///
@@ -52,21 +61,32 @@ pub trait FutureExt: Future + Sized + Unpin {
5261 /// # #![feature(async_await)]
5362 /// use futures::prelude::*;
5463 /// use runtime::prelude::*;
55- /// use std::time::Duration;
64+ /// use std::time::{Duration, Instant};
65+ ///
66+ /// async fn long_future(dur: Duration) {
67+ /// // Simulate some network operations...
68+ /// runtime::time::Delay::new(dur).await;
69+ /// }
5670 ///
57- /// # fn long_future() -> impl Future<Output = std::io::Result<()>> {
58- /// # futures::future::ok(())
59- /// # }
60- /// #
6171 /// #[runtime::main]
6272 /// async fn main() {
63- /// let future = long_future();
64- /// let timed_out = future.timeout(Duration::from_millis(100));
73+ /// // Fast operation
74+ /// let begin_inst = Instant::now();
75+ /// let short = long_future(Duration::from_millis(100))
76+ /// .timeout(Duration::from_millis(5000)) // Set timeout
77+ /// .await;
78+ /// assert!(short.is_ok()); // Success
79+ /// assert!(begin_inst.elapsed() >= Duration::from_millis(100));
80+ /// assert!(begin_inst.elapsed() < Duration::from_millis(5000));
6581 ///
66- /// match timed_out.await {
67- /// Ok(item) => println!("got {:?} within enough time!", item),
68- /// Err(_) => println!("took too long to produce the item"),
69- /// }
82+ /// // Slow operation
83+ /// let begin_inst = Instant::now();
84+ /// let long = long_future(Duration::from_millis(5000))
85+ /// .timeout(Duration::from_millis(100)) // Set timeout
86+ /// .await;
87+ /// assert!(long.is_err()); // Timeout
88+ /// assert!(begin_inst.elapsed() >= Duration::from_millis(100));
89+ /// assert!(begin_inst.elapsed() < Duration::from_millis(5000));
7090 /// }
7191 /// ```
7292 fn timeout ( self , dur : Duration ) -> Timeout < Self > {
@@ -116,29 +136,40 @@ pub trait FutureExt: Future + Sized + Unpin {
116136 }
117137}
118138
119- impl < T : Future + Unpin > FutureExt for T { }
139+ impl < T : Future > FutureExt for T { }
120140
121141/// A stream returned by methods in the [`StreamExt`] trait.
122142///
123143/// [`StreamExt`]: trait.StreamExt.html
124144#[ derive( Debug ) ]
125- pub struct TimeoutStream < S : Stream + Unpin > {
145+ pub struct TimeoutStream < S : Stream > {
126146 timeout : Delay ,
127147 dur : Duration ,
128148 stream : S ,
129149}
130150
131- impl < S : Stream + Unpin > Stream for TimeoutStream < S > {
151+ impl < S : Stream > Stream for TimeoutStream < S > {
132152 type Item = Result < S :: Item , io:: Error > ;
133153
134- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
135- if let Poll :: Ready ( s) = Pin :: new ( & mut self . stream ) . poll_next ( cx) {
136- self . timeout = Delay :: new ( self . dur ) ;
154+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
155+ // This pinning projection is safe.
156+ // See detail in `Timeout::poll`.
157+ let ( mut timeout, dur, stream) = unsafe {
158+ let TimeoutStream {
159+ timeout,
160+ dur,
161+ stream,
162+ } = self . get_unchecked_mut ( ) ;
163+ ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
164+ } ;
165+
166+ if let Poll :: Ready ( s) = stream. poll_next ( cx) {
167+ timeout. set ( Delay :: new ( * dur) ) ;
137168 return Poll :: Ready ( Ok ( s) . transpose ( ) ) ;
138169 }
139170
140- Pin :: new ( & mut self . timeout ) . poll ( cx) . map ( |_| {
141- self . timeout = Delay :: new ( self . dur ) ;
171+ Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
172+ timeout. set ( Delay :: new ( * dur) ) ;
142173 Some ( Err ( io:: Error :: new (
143174 io:: ErrorKind :: TimedOut ,
144175 "future timed out" ,
@@ -148,7 +179,7 @@ impl<S: Stream + Unpin> Stream for TimeoutStream<S> {
148179}
149180
150181/// Extend `Stream` with methods to time out execution.
151- pub trait StreamExt : Stream + Sized + Unpin {
182+ pub trait StreamExt : Stream + Sized {
152183 /// Creates a new stream which will take at most `dur` time to yield each
153184 /// item of the stream.
154185 ///
@@ -190,38 +221,49 @@ pub trait StreamExt: Stream + Sized + Unpin {
190221 }
191222}
192223
193- impl < S : Stream + Unpin > StreamExt for S { }
224+ impl < S : Stream > StreamExt for S { }
194225
195226/// A stream returned by methods in the [`StreamExt`] trait.
196227///
197228/// [`StreamExt`]: trait.StreamExt.html
198229#[ derive( Debug ) ]
199- pub struct TimeoutAsyncRead < S : AsyncRead + Unpin > {
230+ pub struct TimeoutAsyncRead < S : AsyncRead > {
200231 timeout : Delay ,
201232 dur : Duration ,
202233 stream : S ,
203234}
204235
205- impl < S : AsyncRead + Unpin > AsyncRead for TimeoutAsyncRead < S > {
236+ impl < S : AsyncRead > AsyncRead for TimeoutAsyncRead < S > {
206237 fn poll_read (
207- mut self : Pin < & mut Self > ,
238+ self : Pin < & mut Self > ,
208239 cx : & mut Context < ' _ > ,
209240 buf : & mut [ u8 ] ,
210241 ) -> Poll < Result < usize , io:: Error > > {
211- if let Poll :: Ready ( s) = Pin :: new ( & mut self . stream ) . poll_read ( cx, buf) {
212- self . timeout = Delay :: new ( self . dur ) ;
242+ // This pinning projection is safe.
243+ // See detail in `Timeout::poll`.
244+ let ( mut timeout, dur, stream) = unsafe {
245+ let TimeoutAsyncRead {
246+ timeout,
247+ dur,
248+ stream,
249+ } = self . get_unchecked_mut ( ) ;
250+ ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
251+ } ;
252+
253+ if let Poll :: Ready ( s) = stream. poll_read ( cx, buf) {
254+ timeout. set ( Delay :: new ( * dur) ) ;
213255 return Poll :: Ready ( s) ;
214256 }
215257
216- Pin :: new ( & mut self . timeout ) . poll ( cx) . map ( |_| {
217- self . timeout = Delay :: new ( self . dur ) ;
258+ Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
259+ timeout. set ( Delay :: new ( * dur) ) ;
218260 Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) )
219261 } )
220262 }
221263}
222264
223265/// Extend `AsyncRead` with methods to time out execution.
224- pub trait AsyncReadExt : AsyncRead + Sized + Unpin {
266+ pub trait AsyncReadExt : AsyncRead + Sized {
225267 /// Creates a new stream which will take at most `dur` time to yield each
226268 /// item of the stream.
227269 ///
@@ -260,4 +302,4 @@ pub trait AsyncReadExt: AsyncRead + Sized + Unpin {
260302 }
261303}
262304
263- impl < S : AsyncRead + Unpin > AsyncReadExt for S { }
305+ impl < S : AsyncRead > AsyncReadExt for S { }
0 commit comments