Skip to content

Commit 65de36b

Browse files
committed
Upgrade to hyper v0.14
1 parent 441e2b5 commit 65de36b

File tree

5 files changed

+82
-80
lines changed

5 files changed

+82
-80
lines changed

Cargo.toml

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
[package]
22
name = "hyper-timeout"
3-
version = "0.3.1"
3+
version = "0.4.0"
44
authors = ["Herman J. Radtke III <[email protected]>"]
5+
edition = "2018"
56
description = "A connect, read and write timeout aware connector to be used with hyper Client."
67
license = "MIT/Apache-2.0"
78
documentation = "https://github.com/hjr3/hyper-timeout"
89
homepage = "https://github.com/hjr3/hyper-timeout"
910
repository = "https://github.com/hjr3/hyper-timeout"
10-
edition = "2018"
11+
readme = "README.md"
1112

1213
[badges]
1314
travis-ci = { repository = "https://github.com/hjr3/hyper-timeout", branch = "master" }
1415

1516
[dependencies]
16-
bytes = "0.5"
17-
hyper = { version = "0.13", default-features = false, features = ["tcp"] }
18-
tokio = "0.2"
19-
tokio-io-timeout = "0.4"
17+
bytes = "1.0.0"
18+
hyper = { version = "0.14", default-features = false, features = ["client", "http1", "tcp"] }
19+
pin-project-lite = "0.2"
20+
tokio = "1.0.0"
21+
tokio-io-timeout = "1.0.1"
2022

2123
[dev-dependencies]
22-
hyper-tls = "0.4"
23-
tokio = { version = "0.2", features = ["io-std", "macros"] }
24+
#FIXME enable when https://github.com/hyperium/hyper-tls/pull/79 lands
25+
#hyper-tls = "0.4"
26+
tokio = { version = "1.0.0", features = ["io-std", "io-util", "macros"] }

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This c
2020
Hyper version compatibility:
2121

2222
* The `master` branch will track on going development for hyper.
23+
* The `0.4` release supports hyper 0.14.
2324
* The `0.3` release supports hyper 0.13.
2425
* The `0.2` release supports hyper 0.12.
2526
* The `0.1` release supports hyper 0.11.
2627

27-
First, (assuming you are using hyper 0.13) add this to your `Cargo.toml`:
28+
Assuming you are using hyper 0.14, add this to your `Cargo.toml`:
2829

2930
```toml
3031
[dependencies]
31-
hyper-timeout = "0.3"
32+
hyper-timeout = "0.4"
3233
```
3334

3435
See the [client example](./examples/client.rs) for a working example.

examples/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use std::time::Duration;
44
use hyper::{Client, body::HttpBody as _};
55
use tokio::io::{self, AsyncWriteExt as _};
66

7-
//use hyper::client::HttpConnector;
8-
use hyper_tls::HttpsConnector;
7+
use hyper::client::HttpConnector;
8+
//use hyper_tls::HttpsConnector;
99

1010
use hyper_timeout::TimeoutConnector;
1111

12-
#[tokio::main]
12+
#[tokio::main(flavor = "current_thread")]
1313
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1414
let url = match env::args().nth(1) {
1515
Some(url) => url,
@@ -23,9 +23,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2323
let url = url.parse::<hyper::Uri>().unwrap();
2424

2525
// This example uses `HttpsConnector`, but you can also use hyper `HttpConnector`
26-
//let http = HttpConnector::new();
27-
let https = HttpsConnector::new();
28-
let mut connector = TimeoutConnector::new(https);
26+
let h = HttpConnector::new();
27+
//let h = HttpsConnector::new();
28+
let mut connector = TimeoutConnector::new(h);
2929
connector.set_connect_timeout(Some(Duration::from_secs(5)));
3030
connector.set_read_timeout(Some(Duration::from_secs(5)));
3131
connector.set_write_timeout(Some(Duration::from_secs(5)));

src/lib.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ use std::future::Future;
22
use std::io;
33
use std::pin::Pin;
44
use std::task::{Context, Poll};
5-
use std::time::Duration;
65

76
use tokio::io::{AsyncRead, AsyncWrite};
8-
use tokio::time::timeout;
7+
use tokio::time::{timeout, Duration};
98
use tokio_io_timeout::TimeoutStream;
109

1110
use hyper::{service::Service, Uri};
@@ -44,21 +43,17 @@ impl<T: Connect> TimeoutConnector<T> {
4443

4544
impl<T> Service<Uri> for TimeoutConnector<T>
4645
where
47-
T: Service<Uri>,
48-
T::Response: AsyncRead + AsyncWrite + Send + Unpin,
46+
T: Service<Uri> + Send,
47+
T::Response: AsyncRead + AsyncWrite + Connection + Send + Unpin,
4948
T::Future: Send + 'static,
5049
T::Error: Into<BoxError>,
5150
{
52-
type Response = TimeoutConnectorStream<T::Response>;
51+
type Response = Pin<Box<TimeoutConnectorStream<T::Response>>>;
5352
type Error = BoxError;
5453
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
5554

5655
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57-
match self.connector.poll_ready(cx) {
58-
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
59-
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
60-
Poll::Pending => Poll::Pending,
61-
}
56+
self.connector.poll_ready(cx).map_err(Into::into)
6257
}
6358

6459
fn call(&mut self, dst: Uri) -> Self::Future {
@@ -73,7 +68,7 @@ where
7368
let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io));
7469
tm.set_read_timeout(read_timeout);
7570
tm.set_write_timeout(write_timeout);
76-
Ok(tm)
71+
Ok(Box::pin(tm))
7772
};
7873

7974
return Box::pin(fut);
@@ -89,7 +84,7 @@ where
8984
let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io));
9085
tm.set_read_timeout(read_timeout);
9186
tm.set_write_timeout(write_timeout);
92-
Ok(tm)
87+
Ok(Box::pin(tm))
9388
};
9489

9590
Box::pin(fut)
@@ -122,9 +117,12 @@ impl<T> TimeoutConnector<T> {
122117
}
123118
}
124119

125-
impl<T> Connection for TimeoutConnector<T> {
120+
impl<T: Connect> Connection for TimeoutConnector<T>
121+
where
122+
T: AsyncRead + AsyncWrite + Connection + Unpin,
123+
{
126124
fn connected(&self) -> Connected {
127-
Connected::new()
125+
self.connector.connected()
128126
}
129127
}
130128

src/stream.rs

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
use std::io;
2-
use std::mem::MaybeUninit;
2+
use std::io::IoSlice;
33
use std::pin::Pin;
44
use std::task::{Context, Poll};
55
use std::time::{Duration};
66

7-
use bytes::{Buf, BufMut};
87
use hyper::client::connect::{Connected, Connection};
9-
use tokio::io::{AsyncRead, AsyncWrite};
8+
use pin_project_lite::pin_project;
9+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1010
use tokio_io_timeout::TimeoutStream;
1111

12-
/// A timeout stream that implements required traits to be a Connector
13-
#[derive(Debug)]
14-
pub struct TimeoutConnectorStream<S>(TimeoutStream<S>);
12+
pin_project! {
13+
/// A timeout stream that implements required traits to be a Connector
14+
#[derive(Debug)]
15+
pub struct TimeoutConnectorStream<S> {
16+
#[pin]
17+
stream: TimeoutStream<S>
18+
}
19+
}
1520

1621
impl<S> TimeoutConnectorStream<S>
1722
where
@@ -21,74 +26,59 @@ where
2126
///
2227
/// There is initially no read or write timeout.
2328
pub fn new(stream: TimeoutStream<S>) -> TimeoutConnectorStream<S> {
24-
TimeoutConnectorStream(stream)
29+
TimeoutConnectorStream { stream }
2530
}
2631

2732
/// Returns the current read timeout.
2833
pub fn read_timeout(&self) -> Option<Duration> {
29-
self.0.read_timeout()
34+
self.stream.read_timeout()
3035
}
3136

3237
/// Sets the read timeout.
3338
///
3439
/// This will reset any pending read timeout.
3540
pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
36-
self.0.set_read_timeout(timeout)
41+
self.stream.set_read_timeout(timeout)
3742
}
3843

3944
/// Returns the current write timeout.
4045
pub fn write_timeout(&self) -> Option<Duration> {
41-
self.0.write_timeout()
46+
self.stream.write_timeout()
4247
}
4348

4449
/// Sets the write timeout.
4550
///
4651
/// This will reset any pending write timeout.
4752
pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
48-
self.0.set_write_timeout(timeout)
53+
self.stream.set_write_timeout(timeout)
4954
}
5055

5156
/// Returns a shared reference to the inner stream.
5257
pub fn get_ref(&self) -> &S {
53-
self.0.get_ref()
58+
self.stream.get_ref()
5459
}
5560

5661
/// Returns a mutable reference to the inner stream.
5762
pub fn get_mut(&mut self) -> &mut S {
58-
self.0.get_mut()
63+
self.stream.get_mut()
5964
}
6065

6166
/// Consumes the stream, returning the inner stream.
6267
pub fn into_inner(self) -> S {
63-
self.0.into_inner()
68+
self.stream.into_inner()
6469
}
6570
}
6671

6772
impl<S> AsyncRead for TimeoutConnectorStream<S>
6873
where
6974
S: AsyncRead + AsyncWrite + Unpin,
7075
{
71-
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
72-
self.0.prepare_uninitialized_buffer(buf)
73-
}
74-
7576
fn poll_read(
76-
mut self: Pin<&mut Self>,
77-
cx: &mut Context,
78-
buf: &mut [u8],
79-
) -> Poll<Result<usize, io::Error>> {
80-
Pin::new(&mut self.0).poll_read(cx, buf)
81-
}
82-
83-
fn poll_read_buf<B>(
84-
mut self: Pin<&mut Self>,
77+
self: Pin<&mut Self>,
8578
cx: &mut Context,
86-
buf: &mut B,
87-
) -> Poll<Result<usize, io::Error>>
88-
where
89-
B: BufMut,
90-
{
91-
Pin::new(&mut self.0).poll_read_buf(cx, buf)
79+
buf: &mut ReadBuf,
80+
) -> Poll<Result<(), io::Error>> {
81+
self.project().stream.poll_read(cx, buf)
9282
}
9383
}
9484

@@ -97,30 +87,31 @@ where
9787
S: AsyncRead + AsyncWrite + Unpin,
9888
{
9989
fn poll_write(
100-
mut self: Pin<&mut Self>,
101-
cx: &mut Context,
90+
self: Pin<&mut Self>,
91+
cx: &mut Context<'_>,
10292
buf: &[u8],
10393
) -> Poll<Result<usize, io::Error>> {
104-
Pin::new(&mut self.0).poll_write(cx, buf)
94+
self.project().stream.poll_write(cx, buf)
95+
}
96+
97+
fn poll_write_vectored(
98+
self: Pin<&mut Self>,
99+
cx: &mut Context<'_>,
100+
bufs: &[IoSlice<'_>],
101+
) -> Poll<Result<usize, io::Error>> {
102+
self.project().stream.poll_write_vectored(cx, bufs)
105103
}
106104

107-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
108-
Pin::new(&mut self.0).poll_flush(cx)
105+
fn is_write_vectored(&self) -> bool {
106+
self.stream.is_write_vectored()
109107
}
110108

111-
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
112-
Pin::new(&mut self.0).poll_shutdown(cx)
109+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
110+
self.project().stream.poll_flush(cx)
113111
}
114112

115-
fn poll_write_buf<B>(
116-
mut self: Pin<&mut Self>,
117-
cx: &mut Context,
118-
buf: &mut B,
119-
) -> Poll<Result<usize, io::Error>>
120-
where
121-
B: Buf,
122-
{
123-
Pin::new(&mut self.0).poll_write_buf(cx, buf)
113+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
114+
self.project().stream.poll_shutdown(cx)
124115
}
125116
}
126117

@@ -129,6 +120,15 @@ where
129120
S: AsyncRead + AsyncWrite + Connection + Unpin,
130121
{
131122
fn connected(&self) -> Connected {
132-
self.0.get_ref().connected()
123+
self.stream.get_ref().connected()
124+
}
125+
}
126+
127+
impl<S> Connection for Pin<Box<TimeoutConnectorStream<S>>>
128+
where
129+
S: AsyncRead + AsyncWrite + Connection + Unpin,
130+
{
131+
fn connected(&self) -> Connected {
132+
self.stream.get_ref().connected()
133133
}
134134
}

0 commit comments

Comments
 (0)