Skip to content

Commit 048afd1

Browse files
authored
Make HTTP client stacks synchronous (#674)
This change implements `NewService` for `Reconnect` and other HTTP-client middlewares. HTTP clients are almost always coerced to `NewService`, so this reduces boilerplate and type complexity.
1 parent 2a19082 commit 048afd1

File tree

9 files changed

+66
-229
lines changed

9 files changed

+66
-229
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,7 @@ version = "0.1.0"
14441444
dependencies = [
14451445
"futures 0.3.5",
14461446
"linkerd2-error",
1447+
"linkerd2-stack",
14471448
"pin-project",
14481449
"tower",
14491450
"tracing",

linkerd/app/core/src/control.rs

Lines changed: 21 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,11 @@ impl Config {
6262
.push_timeout(self.connect.timeout)
6363
.push(self::client::layer())
6464
.push(reconnect::layer(backoff.clone()))
65-
.push_spawn_ready()
66-
.into_new_service()
6765
.push(self::resolve::layer(dns, backoff))
6866
.push_on_response(self::control::balance::layer())
69-
.push(metrics.into_layer::<classify::Response>())
70-
.push(self::add_origin::Layer::new())
7167
.into_new_service()
72-
.check_new_service()
68+
.push(metrics.into_layer::<classify::Response>())
69+
.push(self::add_origin::layer())
7370
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
7471
.new_service(self.addr)
7572
}
@@ -78,126 +75,54 @@ impl Config {
7875
/// Sets the request's URI from `Config`.
7976
mod add_origin {
8077
use super::ControlAddr;
81-
use futures::{ready, TryFuture};
82-
use linkerd2_error::Error;
83-
use pin_project::pin_project;
84-
use std::future::Future;
78+
use linkerd2_stack::{layer, NewService, ResultService};
8579
use std::marker::PhantomData;
86-
use std::pin::Pin;
87-
use std::task::{Context, Poll};
8880
use tower_request_modifier::{Builder, RequestModifier};
8981

90-
#[derive(Debug)]
91-
pub struct Layer<B> {
92-
_marker: PhantomData<fn(B)>,
82+
pub fn layer<M, B>() -> impl layer::Layer<M, Service = NewAddOrigin<M, B>> + Clone {
83+
layer::mk(|inner| NewAddOrigin {
84+
inner,
85+
_marker: PhantomData,
86+
})
9387
}
9488

9589
#[derive(Debug)]
96-
pub struct MakeAddOrigin<M, B> {
90+
pub struct NewAddOrigin<M, B> {
9791
inner: M,
9892
_marker: PhantomData<fn(B)>,
9993
}
10094

101-
#[pin_project]
102-
pub struct MakeFuture<F, B> {
103-
#[pin]
104-
inner: F,
105-
authority: http::uri::Authority,
106-
_marker: PhantomData<fn(B)>,
107-
}
95+
// === impl NewAddOrigin ===
10896

109-
// === impl Layer ===
110-
111-
impl<B> Layer<B> {
112-
pub fn new() -> Self {
113-
Layer {
114-
_marker: PhantomData,
115-
}
116-
}
117-
}
118-
119-
impl<B> Clone for Layer<B> {
97+
impl<M: Clone, B> Clone for NewAddOrigin<M, B> {
12098
fn clone(&self) -> Self {
12199
Self {
100+
inner: self.inner.clone(),
122101
_marker: self._marker,
123102
}
124103
}
125104
}
126105

127-
impl<M, B> tower::layer::Layer<M> for Layer<B> {
128-
type Service = MakeAddOrigin<M, B>;
129-
130-
fn layer(&self, inner: M) -> Self::Service {
131-
Self::Service {
132-
inner,
133-
_marker: PhantomData,
134-
}
135-
}
136-
}
137-
138-
// === impl MakeAddOrigin ===
139-
140-
impl<M, B> tower::Service<ControlAddr> for MakeAddOrigin<M, B>
106+
impl<M, B> NewService<ControlAddr> for NewAddOrigin<M, B>
141107
where
142-
M: tower::Service<ControlAddr>,
143-
M::Error: Into<Error>,
108+
M: NewService<ControlAddr>,
144109
{
145-
type Response = RequestModifier<M::Response, B>;
146-
type Error = Error;
147-
type Future = MakeFuture<M::Future, B>;
110+
type Service = ResultService<RequestModifier<M::Service, B>, BuildError>;
148111

149-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150-
self.inner.poll_ready(cx).map_err(Into::into)
151-
}
152-
153-
fn call(&mut self, target: ControlAddr) -> Self::Future {
112+
fn new_service(&mut self, target: ControlAddr) -> Self::Service {
154113
let authority = target.addr.to_http_authority();
155-
let inner = self.inner.call(target);
156-
MakeFuture {
157-
inner,
158-
authority,
159-
_marker: PhantomData,
160-
}
161-
}
162-
}
163-
164-
impl<M, B> Clone for MakeAddOrigin<M, B>
165-
where
166-
M: tower::Service<ControlAddr> + Clone,
167-
{
168-
fn clone(&self) -> Self {
169-
Self {
170-
inner: self.inner.clone(),
171-
_marker: PhantomData,
172-
}
173-
}
174-
}
175-
176-
// === impl MakeFuture ===
177-
178-
impl<F, B> Future for MakeFuture<F, B>
179-
where
180-
F: TryFuture,
181-
F::Error: Into<Error>,
182-
{
183-
type Output = Result<RequestModifier<F::Ok, B>, Error>;
184-
185-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186-
let this = self.project();
187-
let inner = ready!(this.inner.try_poll(cx).map_err(Into::into))?;
188-
189-
Poll::Ready(
114+
ResultService::from(
190115
Builder::new()
191-
.set_origin(format!("http://{}", this.authority))
192-
.build(inner)
193-
.map_err(|_| BuildError.into()),
116+
.set_origin(format!("http://{}", authority))
117+
.build(self.inner.new_service(target))
118+
.map_err(|_| BuildError(())),
194119
)
195120
}
196121
}
197122

198123
// XXX the request_modifier build error does not implement Error...
199124
#[derive(Debug)]
200-
struct BuildError;
125+
pub struct BuildError(());
201126

202127
impl std::error::Error for BuildError {}
203128
impl std::fmt::Display for BuildError {

linkerd/app/inbound/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Config {
203203
let backoff = connect.backoff.clone();
204204
move |_| Ok(backoff.stream())
205205
}))
206-
.check_make_service::<HttpEndpoint, http::Request<_>>();
206+
.check_new_service::<HttpEndpoint, http::Request<_>>();
207207

208208
let observe = svc::layers()
209209
// Registers the stack to be tapped.
@@ -220,7 +220,6 @@ impl Config {
220220
.push_map_target(HttpEndpoint::from)
221221
.push(observe)
222222
.push_on_response(svc::layers().box_http_response())
223-
.into_new_service()
224223
.check_new_service::<Target, http::Request<_>>();
225224

226225
// Attempts to discover a service profile for each logical target (as

linkerd/app/outbound/src/lib.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use linkerd2_app_core::{
2020
spans::SpanConverter,
2121
svc::{self},
2222
transport::{self, listen, tls},
23-
Addr, Conditional, DiscoveryRejected, Error, Never, ProxyMetrics, StackMetrics,
24-
TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
23+
Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
24+
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
2525
};
2626
use std::{
2727
collections::HashMap,
@@ -173,18 +173,15 @@ impl Config {
173173
tap_layer: tap::Layer,
174174
metrics: ProxyMetrics,
175175
span_sink: Option<mpsc::Sender<oc::Span>>,
176-
) -> impl tower::Service<
176+
) -> impl svc::NewService<
177177
HttpEndpoint,
178-
Error = Never,
179-
Future = impl Unpin + Send,
180-
Response = impl tower::Service<
178+
Service = impl tower::Service<
181179
http::Request<B>,
182180
Response = http::Response<http::boxed::Payload>,
183181
Error = Error,
184182
Future = impl Send,
185183
> + Send,
186-
> + Unpin
187-
+ Clone
184+
> + Clone
188185
+ Send
189186
where
190187
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
@@ -226,14 +223,15 @@ impl Config {
226223
}
227224
}
228225
}))
226+
.check_new::<HttpEndpoint>()
229227
.push(observability.clone())
230228
.push(identity_headers.clone())
231229
.push(http::override_authority::Layer::new(vec![
232230
::http::header::HOST.as_str(),
233231
CANONICAL_DST_HEADER,
234232
]))
235233
.push_on_response(svc::layers().box_http_response())
236-
.check_service::<HttpEndpoint>()
234+
.check_new::<HttpEndpoint>()
237235
.instrument(|e: &HttpEndpoint| info_span!("endpoint", peer.addr = %e.addr))
238236
.into_inner()
239237
}
@@ -260,9 +258,7 @@ impl Config {
260258
where
261259
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
262260
B::Data: Send + 'static,
263-
E: tower::Service<HttpEndpoint, Response = S> + Unpin + Clone + Send + Sync + 'static,
264-
E::Error: Into<Error>,
265-
E::Future: Unpin + Send,
261+
E: svc::NewService<HttpEndpoint, Service = S> + Clone + Send + Sync + 'static,
266262
S: tower::Service<
267263
http::Request<http::boxed::Payload>,
268264
Response = http::Response<http::boxed::Payload>,
@@ -302,14 +298,12 @@ impl Config {
302298

303299
// Builds a balancer for each concrete destination.
304300
let concrete = svc::stack(endpoint.clone())
305-
.check_make_service::<HttpEndpoint, http::Request<http::boxed::Payload>>()
301+
.check_new_service::<HttpEndpoint, http::Request<http::boxed::Payload>>()
306302
.push_on_response(
307303
svc::layers()
308304
.push(metrics.stack.layer(stack_labels("balance.endpoint")))
309305
.box_http_request(),
310306
)
311-
.push_spawn_ready()
312-
.into_new_service()
313307
.check_new_service::<HttpEndpoint, http::Request<_>>()
314308
.push(discover)
315309
.check_service::<HttpConcrete>()
@@ -365,7 +359,6 @@ impl Config {
365359

366360
// Caches clients that bypass discovery/balancing.
367361
let forward = svc::stack(endpoint)
368-
.into_new_service()
369362
.instrument(|t: &HttpEndpoint| debug_span!("forward", peer.id = ?t.identity))
370363
.check_new_service::<HttpEndpoint, http::Request<_>>();
371364

linkerd/proxy/http/src/override_authority.rs

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use super::h1;
2-
use futures::{ready, TryFuture};
32
use http::{self, header::AsHeaderName, uri::Authority};
4-
use pin_project::pin_project;
3+
use linkerd2_stack::NewService;
54
use std::fmt;
6-
use std::future::Future;
7-
use std::pin::Pin;
85
use std::task::{Context, Poll};
96
use tracing::debug;
107

@@ -23,14 +20,6 @@ pub struct MakeSvc<H, M> {
2320
inner: M,
2421
}
2522

26-
#[pin_project]
27-
pub struct MakeSvcFut<M, H> {
28-
authority: Option<Authority>,
29-
headers_to_strip: Vec<H>,
30-
#[pin]
31-
inner: M,
32-
}
33-
3423
#[derive(Clone, Debug)]
3524
pub struct Service<S, H> {
3625
authority: Option<Authority>,
@@ -71,49 +60,25 @@ where
7160
}
7261
}
7362

74-
impl<H, T, M> tower::Service<T> for MakeSvc<H, M>
63+
impl<H, T, M> NewService<T> for MakeSvc<H, M>
7564
where
7665
T: CanOverrideAuthority + Clone + Send + Sync + 'static,
77-
M: tower::Service<T>,
66+
M: NewService<T>,
7867
H: AsHeaderName + Clone,
7968
{
80-
type Response = Service<M::Response, H>;
81-
type Error = M::Error;
82-
type Future = MakeSvcFut<M::Future, H>;
83-
84-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), M::Error>> {
85-
self.inner.poll_ready(cx)
86-
}
69+
type Service = Service<M::Service, H>;
8770

88-
fn call(&mut self, t: T) -> Self::Future {
71+
fn new_service(&mut self, t: T) -> Self::Service {
8972
let authority = t.override_authority();
90-
let inner = self.inner.call(t);
91-
MakeSvcFut {
73+
let inner = self.inner.new_service(t);
74+
Service {
9275
authority,
9376
headers_to_strip: self.headers_to_strip.clone(),
9477
inner,
9578
}
9679
}
9780
}
9881

99-
impl<F, H> Future for MakeSvcFut<F, H>
100-
where
101-
F: TryFuture,
102-
H: AsHeaderName + Clone,
103-
{
104-
type Output = Result<Service<F::Ok, H>, F::Error>;
105-
106-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107-
let this = self.project();
108-
let inner = ready!(this.inner.try_poll(cx))?;
109-
Poll::Ready(Ok(Service {
110-
authority: this.authority.clone(),
111-
headers_to_strip: this.headers_to_strip.clone(),
112-
inner,
113-
}))
114-
}
115-
}
116-
11782
// === impl Service ===
11883

11984
impl<S, H, B> tower::Service<http::Request<B>> for Service<S, H>

linkerd/reconnect/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ publish = false
77

88
[dependencies]
99
linkerd2-error = { path = "../error" }
10+
linkerd2-stack = { path = "../stack" }
1011
futures = "0.3"
1112
tower = { version = "0.3", default-features = false }
1213
tracing = "0.1.19"

0 commit comments

Comments
 (0)