Skip to content

Commit ebf67d6

Browse files
authored
outbound: Decouple the resolver from concrete target types (#938)
The endpoint resolver module currently has a fixed dependency on the `Concrete` and `Endpoint` target types. This change extracts endpoint-type construction so that the HTTP logical stack is agnostic of its endpoint target type. It also updates the `EndpointFromMetadata` type to be responsible for marking identity as disabled when appropriate. Finally, we are able to re-enable the outbound TLS hinting test.
1 parent bf760fb commit ebf67d6

File tree

13 files changed

+162
-127
lines changed

13 files changed

+162
-127
lines changed

linkerd/app/gateway/src/lib.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use self::gateway::NewGateway;
88
use linkerd_app_core::{
99
config::ProxyConfig,
1010
detect, discovery_rejected, io, metrics, profiles,
11-
proxy::{api_resolve::Metadata, core::Resolve, http},
11+
proxy::{api_resolve::Metadata, core::Resolve, http, resolve::map_endpoint},
1212
svc::{self, Param},
1313
tls,
1414
transport::OrigDstAddr,
@@ -116,7 +116,10 @@ where
116116
let tcp = outbound
117117
.clone()
118118
.push_tcp_endpoint()
119-
.push_tcp_logical(resolve.clone())
119+
.push_tcp_logical(map_endpoint::Resolve::new(
120+
outbound::target::EndpointFromMetadata::default(),
121+
resolve.clone(),
122+
))
120123
.into_stack()
121124
.push_request_filter(|(p, _): (Option<profiles::Receiver>, _)| match p {
122125
Some(rx) if rx.borrow().name.is_some() => Ok(outbound::tcp::Logical {
@@ -160,7 +163,10 @@ where
160163
let http = outbound
161164
.push_tcp_endpoint()
162165
.push_http_endpoint()
163-
.push_http_logical(resolve)
166+
.push_http_logical(map_endpoint::Resolve::new(
167+
outbound::target::EndpointFromMetadata::default(),
168+
resolve,
169+
))
164170
.into_stack()
165171
.push(NewGateway::layer(local_id))
166172
.push(profiles::discover::layer(profiles, move |t: HttpTarget| {

linkerd/app/integration/src/tests/telemetry.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ impl Fixture {
5050
let labels = metrics::labels()
5151
.label("authority", "tele.test.svc.cluster.local")
5252
.label("direction", "inbound")
53-
.label("tls", "disabled")
5453
.label("target_addr", orig_dst);
5554
Fixture {
5655
client,
@@ -79,7 +78,6 @@ impl Fixture {
7978
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
8079
let labels = metrics::labels()
8180
.label("direction", "outbound")
82-
.label("tls", "disabled")
8381
.label("authority", authority)
8482
.label("target_addr", orig_dst);
8583
Fixture {
@@ -175,7 +173,6 @@ impl TcpFixture {
175173
let dst_labels = metrics::labels()
176174
.label("direction", "outbound")
177175
.label("peer", "dst")
178-
.label("tls", "disabled")
179176
.label("authority", orig_dst);
180177

181178
TcpFixture {
@@ -525,7 +522,6 @@ mod outbound_dst_labels {
525522
let client = client::new(proxy.outbound, dest);
526523
let labels = metrics::labels()
527524
.label("direction", "outbound")
528-
.label("tls", "disabled")
529525
.label("authority", dest_and_port)
530526
.label("target_addr", addr);
531527
let f = Fixture {

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ where
3737
runtime: rt,
3838
stack: connect,
3939
} = self;
40-
let identity_disabled = rt.identity.is_none();
4140
let config::ConnectConfig {
4241
h1_settings,
4342
h2_settings,
@@ -70,14 +69,7 @@ where
7069
]))
7170
.push_on_response(http::BoxResponse::layer())
7271
.check_new::<Endpoint>()
73-
.instrument(|e: &Endpoint| debug_span!("endpoint", peer.addr = %e.addr))
74-
.push_map_target(move |e: Endpoint| {
75-
if identity_disabled {
76-
e.identity_disabled()
77-
} else {
78-
e
79-
}
80-
});
72+
.instrument(|e: &Endpoint| debug_span!("endpoint", peer.addr = %e.addr));
8173

8274
Outbound {
8375
config,

linkerd/app/outbound/src/http/logical.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use super::{Concrete, Endpoint, Logical};
1+
use super::{Concrete, Logical};
22
use crate::{resolve, stack_labels, Outbound};
33
use linkerd_app_core::{
44
classify, config, profiles,
5-
proxy::{api_resolve::Metadata, core::Resolve, http},
6-
retry, svc, tls, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
5+
proxy::{core::Resolve, http},
6+
retry, svc, tls, Error, Never, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
77
};
88
use tracing::debug_span;
99

@@ -25,13 +25,14 @@ impl<E> Outbound<E> {
2525
where
2626
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
2727
B::Data: Send + 'static,
28-
E: svc::NewService<Endpoint, Service = ESvc> + Clone + Send + 'static,
28+
E: svc::NewService<R::Endpoint, Service = ESvc> + Clone + Send + 'static,
2929
ESvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
3030
+ Send
3131
+ 'static,
3232
ESvc::Error: Into<Error>,
3333
ESvc::Future: Send,
34-
R: Resolve<Concrete, Endpoint = Metadata, Error = Error> + Clone + Send + 'static,
34+
R: Resolve<Concrete, Error = Error> + Clone + Send + 'static,
35+
R::Endpoint: From<(tls::NoClientTls, Logical)> + Clone + Send,
3536
R::Resolution: Send,
3637
R::Future: Send + Unpin,
3738
{
@@ -50,6 +51,7 @@ impl<E> Outbound<E> {
5051

5152
let stack = endpoint
5253
.clone()
54+
.check_new_service::<R::Endpoint, http::Request<http::BoxBody>>()
5355
.push_on_response(
5456
svc::layers()
5557
.push(http::BoxRequest::layer())
@@ -62,6 +64,7 @@ impl<E> Outbound<E> {
6264
// the balancer need not drive them all directly.
6365
.push(svc::layer::mk(svc::SpawnReady::new)),
6466
)
67+
.check_new_service::<R::Endpoint, http::Request<_>>()
6568
// Resolve the service to its endpoints and balance requests over them.
6669
//
6770
// If the balancer has been empty/unavailable, eagerly fail requests.
@@ -79,6 +82,7 @@ impl<E> Outbound<E> {
7982
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout))
8083
.push(http::BoxResponse::layer()),
8184
)
85+
.check_make_service::<Concrete, http::Request<_>>()
8286
.push(svc::MapErrLayer::new(Into::into))
8387
// Drives the initial resolution via the service's readiness.
8488
.into_new_service()
@@ -96,9 +100,12 @@ impl<E> Outbound<E> {
96100
.push(http::BoxRequest::layer())
97101
.push(http::BoxResponse::layer()),
98102
)
99-
.push_map_target(Endpoint::from_logical(
100-
tls::NoClientTls::NotProvidedByServiceDiscovery,
101-
))
103+
.push_map_target(|logical: Logical| {
104+
R::Endpoint::from((
105+
tls::NoClientTls::NotProvidedByServiceDiscovery,
106+
logical,
107+
))
108+
})
102109
.into_inner(),
103110
))
104111
// Distribute requests over a distribution of balancers via a
@@ -147,7 +154,24 @@ impl<E> Outbound<E> {
147154
)
148155
.instrument(|l: &Logical| debug_span!("logical", dst = %l.addr()))
149156
.push_switch(
150-
Logical::or_endpoint(tls::NoClientTls::NotProvidedByServiceDiscovery),
157+
|logical: Logical| {
158+
let should_resolve = match logical.profile.as_ref() {
159+
Some(p) => {
160+
let p = p.borrow();
161+
p.endpoint.is_none() && (p.name.is_some() || !p.targets.is_empty())
162+
}
163+
None => false,
164+
};
165+
166+
if should_resolve {
167+
Ok::<_, Never>(svc::Either::A(logical))
168+
} else {
169+
Ok(svc::Either::B(R::Endpoint::from((
170+
tls::NoClientTls::NotProvidedByServiceDiscovery,
171+
logical,
172+
))))
173+
}
174+
},
151175
svc::stack(endpoint)
152176
.push_on_response(http::BoxRequest::layer())
153177
.into_inner(),

linkerd/app/outbound/src/http/tests.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::Endpoint;
22

33
use crate::{
4+
target,
45
test_util::{
56
support::{connect::Connect, http_util, profile, resolver, track},
67
*,
@@ -11,6 +12,7 @@ use bytes::Bytes;
1112
use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
1213
use linkerd_app_core::{
1314
io,
15+
proxy::resolve::map_endpoint,
1416
svc::{self, NewService},
1517
tls,
1618
transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
@@ -74,7 +76,10 @@ where
7476
out.clone().with_stack(NoTcpBalancer).push_detect_http(
7577
out.with_stack(connect)
7678
.push_http_endpoint()
77-
.push_http_logical(resolver)
79+
.push_http_logical(map_endpoint::Resolve::new(
80+
target::EndpointFromMetadata::default(),
81+
resolver,
82+
))
7883
.push_http_server()
7984
.into_inner(),
8085
)

linkerd/app/outbound/src/ingress.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ impl Outbound<()> {
6363

6464
let tcp = svc::stack(tcp)
6565
.push_on_response(drain::Retain::layer(self.runtime.drain.clone()))
66-
.push_map_target(tcp::Endpoint::from_accept(tls::NoClientTls::IngressNonHttp))
66+
.push_map_target(|a: tcp::Accept| {
67+
tcp::Endpoint::from((tls::NoClientTls::IngressNonHttp, a))
68+
})
6769
.into_inner();
6870

6971
svc::stack(http)

linkerd/app/outbound/src/lib.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub(crate) mod test_util;
1717
use linkerd_app_core::{
1818
config::ProxyConfig,
1919
io, metrics, profiles,
20-
proxy::{api_resolve::Metadata, core::Resolve},
20+
proxy::{api_resolve::Metadata, core::Resolve, resolve::map_endpoint},
2121
serve, svc, tls,
2222
transport::listen,
2323
AddrMatch, Error, ProxyRuntime,
@@ -121,16 +121,24 @@ impl<S> Outbound<S> {
121121
P::Error: Send,
122122
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
123123
{
124+
let identity_disabled = self.runtime.identity.is_none();
125+
124126
let http = self
125127
.clone()
126128
.push_tcp_endpoint()
127129
.push_http_endpoint()
128-
.push_http_logical(resolve.clone())
130+
.push_http_logical(map_endpoint::Resolve::new(
131+
target::EndpointFromMetadata { identity_disabled },
132+
resolve.clone(),
133+
))
129134
.push_http_server()
130135
.into_inner();
131136

132137
self.push_tcp_endpoint()
133-
.push_tcp_logical(resolve)
138+
.push_tcp_logical(map_endpoint::Resolve::new(
139+
target::EndpointFromMetadata { identity_disabled },
140+
resolve,
141+
))
134142
.push_detect_http(http)
135143
.push_discover(profiles)
136144
.into_inner()
@@ -162,6 +170,7 @@ impl Outbound<()> {
162170
let serve = async move {
163171
if self.config.ingress_mode {
164172
info!("Outbound routing in ingress-mode");
173+
let identity_disabled = self.runtime.identity.is_none();
165174
let tcp = self
166175
.to_tcp_connect()
167176
.push_tcp_endpoint()
@@ -171,7 +180,10 @@ impl Outbound<()> {
171180
.to_tcp_connect()
172181
.push_tcp_endpoint()
173182
.push_http_endpoint()
174-
.push_http_logical(resolve)
183+
.push_http_logical(map_endpoint::Resolve::new(
184+
target::EndpointFromMetadata { identity_disabled },
185+
resolve,
186+
))
175187
.into_inner();
176188
let stack = self.to_ingress(profiles, tcp, http);
177189
let shutdown = self.runtime.drain.signaled();
Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,30 @@
1-
#![allow(warnings)]
2-
3-
use crate::target::{Concrete, Endpoint, EndpointFromMetadata};
4-
use futures::{future, prelude::*, stream};
51
use linkerd_app_core::{
6-
discovery_rejected, is_discovery_rejected,
72
proxy::{
8-
api_resolve::Metadata,
9-
core::{Resolve, ResolveService, Update},
10-
discover::{self, Buffer, FromResolve, MakeEndpoint},
11-
resolve::map_endpoint,
12-
},
13-
svc::{
14-
layer,
15-
stack::{Filter, Param, Predicate},
16-
NewService,
3+
core::Resolve,
4+
discover::{self, Buffer},
175
},
18-
Addr, Error,
6+
svc::{layer, NewService},
197
};
20-
use std::{
21-
future::Future,
22-
pin::Pin,
23-
task::{Context, Poll},
24-
time::Duration,
25-
};
26-
27-
type Stack<P, R, N> =
28-
Buffer<discover::Stack<N, map_endpoint::Resolve<EndpointFromMetadata, R>, Endpoint<P>>>;
8+
use std::time::Duration;
299

30-
pub fn layer<P, R, N>(
10+
pub fn layer<T, R, N>(
3111
resolve: R,
3212
watchdog: Duration,
33-
) -> impl layer::Layer<N, Service = Stack<P, R, N>> + Clone
13+
) -> impl layer::Layer<N, Service = Buffer<discover::Stack<N, R, R::Endpoint>>> + Clone
3414
where
35-
P: Copy + Send + std::fmt::Debug,
36-
R: Resolve<Concrete<P>, Endpoint = Metadata> + Clone,
15+
T: Clone + Send + std::fmt::Debug,
16+
R: Resolve<T> + Clone,
3717
R::Resolution: Send,
3818
R::Future: Send,
39-
N: NewService<Endpoint<P>>,
19+
N: NewService<R::Endpoint>,
4020
{
4121
const ENDPOINT_BUFFER_CAPACITY: usize = 1_000;
4222

43-
let to_endpoint = EndpointFromMetadata;
4423
layer::mk(move |new_endpoint| {
45-
let endpoints = discover::resolve(
46-
new_endpoint,
47-
map_endpoint::Resolve::new(to_endpoint, resolve.clone()),
48-
);
49-
Buffer::new(ENDPOINT_BUFFER_CAPACITY, watchdog, endpoints)
24+
Buffer::new(
25+
ENDPOINT_BUFFER_CAPACITY,
26+
watchdog,
27+
discover::resolve(new_endpoint, resolve.clone()),
28+
)
5029
})
5130
}

0 commit comments

Comments
 (0)