Skip to content

Commit cac988e

Browse files
authored
outbound: Make HTTP endpoint stack generic on its target (#952)
The HTTP endpoint stack is coupled to the Endpoint target type. In order to make the outbound stack more flexible so that the endpoint stack can be used without the logical stack, this change updates the outbound HTTP endpoint stack to be generic over its target type.
1 parent 827762d commit cac988e

File tree

3 files changed

+28
-24
lines changed

3 files changed

+28
-24
lines changed

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
use super::{require_identity_on_endpoint::NewRequireIdentity, Endpoint};
1+
use super::require_identity_on_endpoint::NewRequireIdentity;
22
use crate::Outbound;
33
use linkerd_app_core::{
4-
classify, config, http_tracing,
4+
classify, config, http_tracing, metrics,
55
proxy::{http, tap},
6-
reconnect, svc, Error, CANONICAL_DST_HEADER, L5D_REQUIRE_ID,
6+
reconnect, svc, tls, Error, CANONICAL_DST_HEADER, L5D_REQUIRE_ID,
77
};
88
use tokio::io;
9-
use tracing::debug_span;
109

1110
impl<C> Outbound<C> {
12-
pub fn push_http_endpoint<B>(
11+
pub fn push_http_endpoint<T, B>(
1312
self,
1413
) -> Outbound<
1514
impl svc::NewService<
16-
Endpoint,
15+
T,
1716
Service = impl svc::Service<
1817
http::Request<B>,
1918
Response = http::Response<http::BoxBody>,
@@ -23,9 +22,15 @@ impl<C> Outbound<C> {
2322
> + Clone,
2423
>
2524
where
25+
T: Clone + Send + Sync + 'static,
26+
T: svc::Param<http::client::Settings>
27+
+ svc::Param<Option<http::AuthorityOverride>>
28+
+ svc::Param<metrics::EndpointLabels>
29+
+ svc::Param<tls::ConditionalClientTls>
30+
+ tap::Inspect,
2631
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
2732
B::Data: Send + 'static,
28-
C: svc::Service<Endpoint> + Clone + Send + Sync + Unpin + 'static,
33+
C: svc::Service<T> + Clone + Send + Sync + Unpin + 'static,
2934
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
3035
C::Error: Into<Error>,
3136
C::Future: Send + Unpin,
@@ -52,7 +57,6 @@ impl<C> Outbound<C> {
5257
let backoff = backoff;
5358
move |_| Ok(backoff.stream())
5459
}))
55-
.check_new::<Endpoint>()
5660
.push(tap::NewTapHttp::layer(rt.tap.clone()))
5761
.push(rt.metrics.http_endpoint.to_layer::<classify::Response, _>())
5862
.push_on_response(http_tracing::client(
@@ -65,9 +69,7 @@ impl<C> Outbound<C> {
6569
"host",
6670
CANONICAL_DST_HEADER,
6771
]))
68-
.push_on_response(http::BoxResponse::layer())
69-
.check_new::<Endpoint>()
70-
.instrument(|e: &Endpoint| debug_span!("endpoint", peer.addr = %e.addr));
72+
.push_on_response(http::BoxResponse::layer());
7173

7274
Outbound {
7375
config,

linkerd/app/outbound/src/http/logical.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ impl<E> Outbound<E> {
5353
} = config.proxy;
5454
let watchdog = cache_max_idle_age * 2;
5555

56+
let endpoint =
57+
endpoint.instrument(|e: &Endpoint| debug_span!("endpoint", server.addr = %e.addr));
58+
5659
let identity_disabled = rt.identity.is_none();
5760
let no_tls_reason = if identity_disabled {
5861
tls::NoClientTls::Disabled

linkerd/app/outbound/src/http/require_identity_on_endpoint.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
use crate::http::Endpoint;
21
use futures::{
32
future::{self, Either},
43
TryFutureExt,
54
};
65
use linkerd_app_core::{
7-
errors::IdentityRequired,
8-
proxy::http::identity_from_header,
9-
svc::{layer, NewService, Service},
10-
tls, Conditional, Error, L5D_REQUIRE_ID,
6+
errors::IdentityRequired, proxy::http::identity_from_header, svc, tls, Conditional, Error,
7+
L5D_REQUIRE_ID,
118
};
129
use std::task::{Context, Poll};
1310
use tracing::debug;
@@ -30,19 +27,20 @@ impl<N> NewRequireIdentity<N> {
3027
Self { inner }
3128
}
3229

33-
pub fn layer() -> impl layer::Layer<N, Service = Self> + Clone + Copy {
34-
layer::mk(Self::new)
30+
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone + Copy {
31+
svc::layer::mk(Self::new)
3532
}
3633
}
3734

38-
impl<N> NewService<Endpoint> for NewRequireIdentity<N>
35+
impl<T, N> svc::NewService<T> for NewRequireIdentity<N>
3936
where
40-
N: NewService<Endpoint>,
37+
T: svc::Param<tls::ConditionalClientTls>,
38+
N: svc::NewService<T>,
4139
{
4240
type Service = RequireIdentity<N::Service>;
4341

44-
fn new_service(&mut self, target: Endpoint) -> Self::Service {
45-
let tls = target.tls.clone();
42+
fn new_service(&mut self, target: T) -> Self::Service {
43+
let tls = target.param();
4644
let inner = self.inner.new_service(target);
4745
RequireIdentity { tls, inner }
4846
}
@@ -53,15 +51,16 @@ where
5351
type ResponseFuture<F, T, E> =
5452
Either<future::Ready<Result<T, Error>>, future::MapErr<F, fn(E) -> Error>>;
5553

56-
impl<S, A> Service<http::Request<A>> for RequireIdentity<S>
54+
impl<S, A> svc::Service<http::Request<A>> for RequireIdentity<S>
5755
where
58-
S: Service<http::Request<A>>,
56+
S: svc::Service<http::Request<A>>,
5957
S::Error: Into<Error>,
6058
{
6159
type Response = S::Response;
6260
type Error = Error;
6361
type Future = ResponseFuture<S::Future, S::Response, S::Error>;
6462

63+
#[inline]
6564
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
6665
self.inner.poll_ready(cx).map_err(Into::into)
6766
}

0 commit comments

Comments
 (0)