Skip to content

Commit ef637ba

Browse files
authored
Merge pull request #2 from hjr3/read_write-timeout
Include support for read and write timeouts
2 parents 0ecf5c7 + 0b5c2b8 commit ef637ba

File tree

5 files changed

+157
-31
lines changed

5 files changed

+157
-31
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ version = "0.1.0"
44
authors = ["Herman J. Radtke III <[email protected]>"]
55

66
[dependencies]
7-
futures = "0.1.11"
8-
hyper = "0.11.0"
7+
futures = "0.1"
8+
hyper = "0.11"
99
tokio-core = "0.1"
1010
tokio-io = "0.1"
11-
tokio-service = "0.1.0"
11+
tokio-service = "0.1"
12+
tokio-io-timeout = "0.1"
1213

1314
[dev-dependencies]
1415
native-tls = "0.1"
15-
hyper-tls = "0.1.1"
16+
hyper-tls = "0.1"

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ At the time this crate was created, hyper does not support timeouts. There is a
1313

1414
There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This connector wraps around `HttpConnector` or `HttpsConnector` values and provides timeouts.
1515

16+
**Note:** Because of the way `tokio_proto::ClientProto` works, a read or write timeout will return a _broken pipe_ error.
17+
1618
## Usage
1719

1820
First, add this to your `Cargo.toml`:
@@ -28,6 +30,8 @@ Next, add this to your crate:
2830
extern crate hyper_timeout;
2931
```
3032

33+
See the [client example](./examples/client.rs) for a working example.
34+
3135
## License
3236

3337
Licensed under either of

examples/client.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
extern crate futures;
2+
extern crate tokio_core;
3+
extern crate hyper;
4+
extern crate hyper_tls;
5+
extern crate hyper_timeout;
6+
7+
use std::env;
8+
use std::time::Duration;
9+
10+
use futures::Future;
11+
use futures::stream::Stream;
12+
13+
use hyper::Client;
14+
15+
//use hyper::client::HttpConnector;
16+
use hyper_tls::HttpsConnector;
17+
18+
use hyper_timeout::TimeoutConnector;
19+
20+
fn main() {
21+
22+
let url = match env::args().nth(1) {
23+
Some(url) => url,
24+
None => {
25+
println!("Usage: client <url>");
26+
return;
27+
}
28+
};
29+
30+
let url = url.parse::<hyper::Uri>().unwrap();
31+
32+
let mut core = tokio_core::reactor::Core::new().unwrap();
33+
let handle = core.handle();
34+
35+
// This example uses `HttpsConnector`, but you can also use the default hyper `HttpConnector`
36+
//let connector = HttpConnector::new(4, &handle);
37+
let connector = HttpsConnector::new(4, &handle).unwrap();
38+
let mut tm = TimeoutConnector::new(connector, &handle);
39+
tm.set_connect_timeout(Some(Duration::from_secs(5)));
40+
tm.set_read_timeout(Some(Duration::from_secs(5)));
41+
tm.set_write_timeout(Some(Duration::from_secs(5)));
42+
let client = Client::configure().connector(tm).build(&handle);
43+
44+
let get = client.get(url).and_then(|res| {
45+
println!("Response: {}", res.status());
46+
println!("Headers: \n{}", res.headers());
47+
48+
res.body().concat2()
49+
});
50+
51+
let got = core.run(get).unwrap();
52+
let output = String::from_utf8_lossy(&got);
53+
println!("{}", output);
54+
}

src/lib.rs

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ extern crate futures;
22
extern crate tokio_core;
33
extern crate tokio_io;
44
extern crate tokio_service;
5+
extern crate tokio_io_timeout;
56
extern crate hyper;
67

78
use std::time::Duration;
@@ -12,6 +13,7 @@ use futures::future::{Either, Future};
1213
use tokio_core::reactor::{Handle, Timeout};
1314
use tokio_io::{AsyncRead, AsyncWrite};
1415
use tokio_service::Service;
16+
use tokio_io_timeout::TimeoutStream;
1517

1618
use hyper::client::Connect;
1719

@@ -23,46 +25,94 @@ pub struct TimeoutConnector<T> {
2325
/// Handle to be used to set the timeout within tokio's core
2426
handle: Handle,
2527
/// Amount of time to wait connecting
26-
connect_timeout: Duration,
28+
connect_timeout: Option<Duration>,
29+
/// Amount of time to wait reading response
30+
read_timeout: Option<Duration>,
31+
/// Amount of time to wait writing request
32+
write_timeout: Option<Duration>,
2733
}
2834

2935
impl<T: Connect> TimeoutConnector<T> {
3036
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
31-
pub fn new(connector: T, handle: &Handle, timeout: Duration) -> Self {
37+
pub fn new(connector: T, handle: &Handle) -> Self {
3238
TimeoutConnector {
3339
connector: connector,
3440
handle: handle.clone(),
35-
connect_timeout: timeout,
41+
connect_timeout: None,
42+
read_timeout: None,
43+
write_timeout: None,
3644
}
3745
}
46+
47+
/// Set the timeout for connecting to a URL.
48+
///
49+
/// Default is no timeout.
50+
#[inline]
51+
pub fn set_connect_timeout(&mut self, val: Option<Duration>) {
52+
self.connect_timeout = val;
53+
}
54+
55+
/// Set the timeout for the response.
56+
///
57+
/// Default is no timeout.
58+
#[inline]
59+
pub fn set_read_timeout(&mut self, val: Option<Duration>) {
60+
self.read_timeout = val;
61+
}
62+
63+
/// Set the timeout for the request.
64+
///
65+
/// Default is no timeout.
66+
#[inline]
67+
pub fn set_write_timeout(&mut self, val: Option<Duration>) {
68+
self.write_timeout = val;
69+
}
3870
}
3971

4072
impl<T> Service for TimeoutConnector<T>
41-
where T: Service<Error=io::Error> + 'static,
42-
T::Response: AsyncRead + AsyncWrite,
43-
T::Future: Future<Error=io::Error>,
73+
where
74+
T: Service<Error = io::Error> + 'static,
75+
T::Response: AsyncRead + AsyncWrite,
76+
T::Future: Future<Error = io::Error>,
4477
{
4578
type Request = T::Request;
46-
type Response = T::Response;
79+
type Response = TimeoutStream<T::Response>;
4780
type Error = T::Error;
48-
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
81+
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
4982

5083
fn call(&self, req: Self::Request) -> Self::Future {
84+
let handle = self.handle.clone();
85+
let read_timeout = self.read_timeout.clone();
86+
let write_timeout = self.write_timeout.clone();
5187
let connecting = self.connector.call(req);
52-
let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap();
53-
54-
Box::new(connecting.select2(timeout).then(|res| {
55-
match res {
56-
Ok(Either::A((io, _))) => Ok(io),
57-
Ok(Either::B((_, _))) => {
58-
Err(io::Error::new(
59-
io::ErrorKind::TimedOut,
60-
"Client timed out while connecting"
61-
))
62-
}
63-
Err(Either::A((e, _))) => Err(e),
64-
Err(Either::B((e, _))) => Err(e),
88+
89+
if self.connect_timeout.is_none() {
90+
return Box::new(connecting.map(move |io| {
91+
let mut tm = TimeoutStream::new(io, &handle);
92+
tm.set_read_timeout(read_timeout);
93+
tm.set_write_timeout(write_timeout);
94+
tm
95+
}));
96+
}
97+
98+
let connect_timeout = self.connect_timeout.expect("Connect timeout should be set");
99+
let timeout = Timeout::new(connect_timeout, &self.handle).unwrap();
100+
101+
Box::new(connecting.select2(timeout).then(move |res| match res {
102+
Ok(Either::A((io, _))) => {
103+
let mut tm = TimeoutStream::new(io, &handle);
104+
tm.set_read_timeout(read_timeout);
105+
tm.set_write_timeout(write_timeout);
106+
Ok(tm)
65107
}
108+
Ok(Either::B((_, _))) => {
109+
Err(io::Error::new(
110+
io::ErrorKind::TimedOut,
111+
"Client timed out while connecting",
112+
))
113+
}
114+
Err(Either::A((e, _))) => Err(e),
115+
Err(Either::B((e, _))) => Err(e),
66116
}))
67117
}
68118
}
@@ -80,12 +130,15 @@ mod tests {
80130
let mut core = Core::new().unwrap();
81131
// 10.255.255.1 is a not a routable IP address
82132
let url = "http://10.255.255.1".parse().unwrap();
83-
let connector = TimeoutConnector::new(
84-
HttpConnector::new(1, &core.handle()),
85-
&core.handle(),
86-
Duration::from_millis(1)
87-
);
133+
let mut connector =
134+
TimeoutConnector::new(HttpConnector::new(1, &core.handle()), &core.handle());
135+
connector.set_connect_timeout(Some(Duration::from_millis(1)));
88136

89-
assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut);
137+
match core.run(connector.connect(url)) {
138+
Err(e) => {
139+
assert_eq!(e.kind(), io::ErrorKind::TimedOut);
140+
}
141+
_ => panic!("Expected timeout error"),
142+
}
90143
}
91144
}

0 commit comments

Comments
 (0)