Skip to content
This repository was archived by the owner on Mar 13, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ hyper = "0.12.27"
tokio = "0.1.19"
tower = { git = "https://github.com/tower-rs/tower" }
tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
30 changes: 9 additions & 21 deletions src/http-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use hyper::{
Request, Response, Uri,
};
use std::time::Duration;
use tower::{builder::ServiceBuilder, reconnect::Reconnect, Service, ServiceExt};
use tower_hyper::{
client::{Builder, Connect},
retry::{Body, RetryPolicy},
util::Connector,
};
use tower::{builder::ServiceBuilder, Service, ServiceExt};
use tower_hyper::{client::Connect, util::Connector, Body};
use tower_reconnect::Reconnect;

fn main() {
let fut = futures::lazy(|| {
Expand All @@ -22,29 +19,24 @@ fn main() {

fn request() -> impl Future<Item = Response<hyper::Body>, Error = ()> {
let connector = Connector::new(HttpConnector::new(1));
let hyper = Connect::new(connector, Builder::new());
let hyper = Connect::new(connector);

// RetryPolicy is a very simple policy that retries `n` times
// if the response has a 500 status code. Here, `n` is 5.
let policy = RetryPolicy::new(5);
// We're calling the tower/examples/server.rs.
let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap();

// Now, to build the service! We use two BufferLayers in order to:
// - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer
// - meet `RetryLayer`'s requirement that our service implement `Service + Clone`
// - ..and to provide cheap clones on the service.
let maker = ServiceBuilder::new()
let service = ServiceBuilder::new()
.buffer(5)
.rate_limit(5, Duration::from_secs(1))
.concurrency_limit(5)
.retry(policy)
.buffer(5)
.make_service(hyper);
.service(hyper);

// `Reconnect` accepts a destination and a MakeService, creating a new service
// `Reconnect` accepts a destination and a Service, creating a new service
// any time the connection encounters an error.
let client = Reconnect::new(maker, dst);
let client = Reconnect::new(service, dst);

let request = Request::builder()
.method("GET")
Expand All @@ -55,9 +47,5 @@ fn request() -> impl Future<Item = Response<hyper::Body>, Error = ()> {
client
.ready()
.map_err(|e| panic!("Service is not ready: {:?}", e))
.and_then(|mut c| {
c.call(request)
.map(|res| res.map(|b| b.into_inner()))
.map_err(|e| panic!("{:?}", e))
})
.and_then(|mut c| c.call(request).map_err(|e| panic!("{:?}", e)))
}
11 changes: 4 additions & 7 deletions src/http-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{future, Future, Poll, Stream};
use hyper::{self, Body, Request, Response};
use tokio::net::TcpListener;
use tower::{builder::ServiceBuilder, Service};
use tower_hyper::{body::LiftBody, server::Server};
use tower_hyper::server::Server;

fn main() {
hyper::rt::run(future::lazy(|| {
Expand All @@ -11,10 +11,7 @@ fn main() {

println!("Listening on http://{}", addr);

let maker = ServiceBuilder::new()
.concurrency_limit(5)
.make_service(MakeSvc);

let maker = ServiceBuilder::new().concurrency_limit(5).service(MakeSvc);
let server = Server::new(maker);

bind.incoming()
Expand All @@ -37,7 +34,7 @@ fn main() {
}

struct Svc;
impl Service<Request<LiftBody<Body>>> for Svc {
impl Service<Request<Body>> for Svc {
type Response = Response<&'static str>;
type Error = hyper::Error;
type Future = future::FutureResult<Self::Response, Self::Error>;
Expand All @@ -46,7 +43,7 @@ impl Service<Request<LiftBody<Body>>> for Svc {
Ok(().into())
}

fn call(&mut self, _req: Request<LiftBody<Body>>) -> Self::Future {
fn call(&mut self, _req: Request<Body>) -> Self::Future {
let res = Response::new("Hello World!");
future::ok(res)
}
Expand Down