1
- extern crate futures;
2
- extern crate tokio;
3
- extern crate tokio_io;
4
- extern crate tokio_service;
5
- extern crate tokio_io_timeout;
6
- extern crate hyper;
7
-
8
- use std:: time:: Duration ;
1
+ use std:: future:: Future ;
9
2
use std:: io;
3
+ use std:: pin:: Pin ;
4
+ use std:: task:: { Context , Poll } ;
5
+ use std:: time:: Duration ;
10
6
11
- use futures:: Future ;
12
-
13
- use tokio:: timer:: Timeout ;
7
+ use tokio:: io:: { AsyncRead , AsyncWrite } ;
8
+ use tokio:: time:: timeout;
14
9
use tokio_io_timeout:: TimeoutStream ;
15
10
16
- use hyper:: client:: connect:: { Connect , Connected , Destination } ;
11
+ use hyper:: { service:: Service , Uri } ;
12
+ use hyper:: client:: connect:: { Connect , Connected , Connection } ;
13
+
14
+ mod stream;
15
+
16
+ use stream:: TimeoutConnectorStream ;
17
+
18
+ type BoxError = Box < dyn std:: error:: Error + Send + Sync > ;
17
19
18
20
/// A connector that enforces as connection timeout
19
- #[ derive( Debug ) ]
21
+ #[ derive( Debug , Clone ) ]
20
22
pub struct TimeoutConnector < T > {
21
23
/// A connector implementing the `Connect` trait
22
24
connector : T ,
@@ -40,42 +42,57 @@ impl<T: Connect> TimeoutConnector<T> {
40
42
}
41
43
}
42
44
43
- impl < T : Connect > Connect for TimeoutConnector < T >
45
+ impl < T > Service < Uri > for TimeoutConnector < T >
44
46
where
45
- T : Connect < Error = io:: Error > + ' static ,
46
- T :: Future : ' static ,
47
+ T : Service < Uri > ,
48
+ T :: Response : AsyncRead + AsyncWrite + Send + Unpin ,
49
+ T :: Future : Send + ' static ,
50
+ T :: Error : Into < BoxError > ,
47
51
{
48
- type Transport = TimeoutStream < T :: Transport > ;
49
- type Error = T :: Error ;
50
- type Future = Box < Future < Item = ( Self :: Transport , Connected ) , Error = Self :: Error > + Send > ;
51
-
52
- fn connect ( & self , dst : Destination ) -> Self :: Future {
52
+ type Response = TimeoutConnectorStream < T :: Response > ;
53
+ type Error = BoxError ;
54
+ type Future = Pin < Box < dyn Future < Output = Result < Self :: Response , Self :: Error > > + Send > > ;
55
+
56
+ fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
57
+ match self . connector . poll_ready ( cx) {
58
+ Poll :: Ready ( Ok ( ( ) ) ) => Poll :: Ready ( Ok ( ( ) ) ) ,
59
+ Poll :: Ready ( Err ( e) ) => Poll :: Ready ( Err ( e. into ( ) ) ) ,
60
+ Poll :: Pending => Poll :: Pending ,
61
+ }
62
+ }
53
63
64
+ fn call ( & mut self , dst : Uri ) -> Self :: Future {
54
65
let read_timeout = self . read_timeout . clone ( ) ;
55
66
let write_timeout = self . write_timeout . clone ( ) ;
56
- let connecting = self . connector . connect ( dst) ;
67
+ let connecting = self . connector . call ( dst) ;
57
68
58
69
if self . connect_timeout . is_none ( ) {
59
- return Box :: new ( connecting. map ( move |( io, c) | {
60
- let mut tm = TimeoutStream :: new ( io) ;
70
+ let fut = async move {
71
+ let io = connecting. await . map_err ( Into :: into) ?;
72
+
73
+ let mut tm = TimeoutConnectorStream :: new ( TimeoutStream :: new ( io) ) ;
61
74
tm. set_read_timeout ( read_timeout) ;
62
75
tm. set_write_timeout ( write_timeout) ;
63
- ( tm, c)
64
- } ) ) ;
76
+ Ok ( tm)
77
+ } ;
78
+
79
+ return Box :: pin ( fut) ;
65
80
}
66
81
67
82
let connect_timeout = self . connect_timeout . expect ( "Connect timeout should be set" ) ;
68
- let timeout = Timeout :: new ( connecting , connect_timeout ) ;
83
+ let timeout = timeout ( connect_timeout , connecting ) ;
69
84
70
- Box :: new ( timeout. then ( move |res| match res {
71
- Ok ( ( io, c) ) => {
72
- let mut tm = TimeoutStream :: new ( io) ;
73
- tm. set_read_timeout ( read_timeout) ;
74
- tm. set_write_timeout ( write_timeout) ;
75
- Ok ( ( tm, c) )
76
- }
77
- Err ( e) => Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , e) ) ,
78
- } ) )
85
+ let fut = async move {
86
+ let connecting = timeout. await . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: TimedOut , e) ) ?;
87
+ let io = connecting. map_err ( Into :: into) ?;
88
+
89
+ let mut tm = TimeoutConnectorStream :: new ( TimeoutStream :: new ( io) ) ;
90
+ tm. set_read_timeout ( read_timeout) ;
91
+ tm. set_write_timeout ( write_timeout) ;
92
+ Ok ( tm)
93
+ } ;
94
+
95
+ Box :: pin ( fut)
79
96
}
80
97
}
81
98
@@ -105,32 +122,35 @@ impl<T> TimeoutConnector<T> {
105
122
}
106
123
}
107
124
125
+ impl < T > Connection for TimeoutConnector < T > {
126
+ fn connected ( & self ) -> Connected {
127
+ Connected :: new ( )
128
+ }
129
+ }
130
+
108
131
#[ cfg( test) ]
109
132
mod tests {
110
133
use std:: error:: Error ;
111
134
use std:: io;
112
135
use std:: time:: Duration ;
113
- use futures:: future;
114
- use tokio:: runtime:: current_thread:: Runtime ;
136
+
115
137
use hyper:: Client ;
116
138
use hyper:: client:: HttpConnector ;
139
+
117
140
use super :: TimeoutConnector ;
118
141
119
- #[ test]
120
- fn test_timeout_connector ( ) {
121
- let mut rt = Runtime :: new ( ) . unwrap ( ) ;
122
- let res = rt. block_on ( future:: lazy ( || {
123
- // 10.255.255.1 is a not a routable IP address
124
- let url = "http://10.255.255.1" . parse ( ) . unwrap ( ) ;
142
+ #[ tokio:: test]
143
+ async fn test_timeout_connector ( ) {
144
+ // 10.255.255.1 is a not a routable IP address
145
+ let url = "http://10.255.255.1" . parse ( ) . unwrap ( ) ;
125
146
126
- let http = HttpConnector :: new ( 1 ) ;
127
- let mut connector = TimeoutConnector :: new ( http) ;
128
- connector. set_connect_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
147
+ let http = HttpConnector :: new ( ) ;
148
+ let mut connector = TimeoutConnector :: new ( http) ;
149
+ connector. set_connect_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
129
150
130
- let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
151
+ let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
131
152
132
- client. get ( url)
133
- } ) ) ;
153
+ let res = client. get ( url) . await ;
134
154
135
155
match res {
136
156
Ok ( _) => panic ! ( "Expected a timeout" ) ,
@@ -144,21 +164,18 @@ mod tests {
144
164
}
145
165
}
146
166
147
- #[ test]
148
- fn test_read_timeout ( ) {
149
- let mut rt = Runtime :: new ( ) . unwrap ( ) ;
150
- let res = rt. block_on ( future:: lazy ( || {
151
- let url = "http://example.com" . parse ( ) . unwrap ( ) ;
167
+ #[ tokio:: test]
168
+ async fn test_read_timeout ( ) {
169
+ let url = "http://example.com" . parse ( ) . unwrap ( ) ;
152
170
153
- let http = HttpConnector :: new ( 1 ) ;
154
- let mut connector = TimeoutConnector :: new ( http) ;
155
- // A 1 ms read timeout should be so short that we trigger a timeout error
156
- connector. set_read_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
171
+ let http = HttpConnector :: new ( ) ;
172
+ let mut connector = TimeoutConnector :: new ( http) ;
173
+ // A 1 ms read timeout should be so short that we trigger a timeout error
174
+ connector. set_read_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
157
175
158
- let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
176
+ let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
159
177
160
- client. get ( url)
161
- } ) ) ;
178
+ let res = client. get ( url) . await ;
162
179
163
180
match res {
164
181
Ok ( _) => panic ! ( "Expected a timeout" ) ,
0 commit comments