Skip to content

Commit 1009645

Browse files
authored
outbound: Configure endpoint construction in logical stack (#949)
ebf67d6 moved endpoint construction logic out of the logical stacks. In doing so, the gateway was incorrectly configured to mark endpoints with disabled identity, preventing mTLS between the gateway and destination service. This change moves endpoint construction into each logical stack so that it need not be configured on each instantiation.
1 parent 5e9d673 commit 1009645

File tree

7 files changed

+121
-158
lines changed

7 files changed

+121
-158
lines changed

linkerd/app/gateway/src/lib.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ use self::gateway::NewGateway;
88
use linkerd_app_core::{
99
config::ProxyConfig,
1010
detect, discovery_rejected, io, metrics, profiles,
11-
proxy::{api_resolve::Metadata, core::Resolve, http, resolve::map_endpoint},
11+
proxy::{
12+
api_resolve::{ConcreteAddr, Metadata},
13+
core::Resolve,
14+
http,
15+
},
1216
svc::{self, Param},
1317
tls,
1418
transport::OrigDstAddr,
@@ -87,13 +91,10 @@ where
8791
P: profiles::GetProfile<profiles::LogicalAddr> + Clone + Send + Sync + Unpin + 'static,
8892
P::Future: Send + 'static,
8993
P::Error: Send,
90-
R: Resolve<outbound::http::Concrete, Endpoint = Metadata, Error = Error>
91-
+ Resolve<outbound::tcp::Concrete, Endpoint = Metadata, Error = Error>,
9294
R: Clone + Send + Sync + Unpin + 'static,
93-
<R as Resolve<outbound::http::Concrete>>::Resolution: Send,
94-
<R as Resolve<outbound::http::Concrete>>::Future: Send + Unpin,
95-
<R as Resolve<outbound::tcp::Concrete>>::Resolution: Send,
96-
<R as Resolve<outbound::tcp::Concrete>>::Future: Send + Unpin,
95+
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
96+
R::Resolution: Send,
97+
R::Future: Send + Unpin,
9798
{
9899
let ProxyConfig {
99100
buffer_capacity,
@@ -116,10 +117,7 @@ where
116117
let tcp = outbound
117118
.clone()
118119
.push_tcp_endpoint()
119-
.push_tcp_logical(map_endpoint::Resolve::new(
120-
outbound::target::EndpointFromMetadata::default(),
121-
resolve.clone(),
122-
))
120+
.push_tcp_logical(resolve.clone())
123121
.into_stack()
124122
.push_request_filter(|(p, _): (Option<profiles::Receiver>, _)| match p {
125123
Some(rx) if rx.borrow().name.is_some() => Ok(outbound::tcp::Logical {
@@ -163,10 +161,7 @@ where
163161
let http = outbound
164162
.push_tcp_endpoint()
165163
.push_http_endpoint()
166-
.push_http_logical(map_endpoint::Resolve::new(
167-
outbound::target::EndpointFromMetadata::default(),
168-
resolve,
169-
))
164+
.push_http_logical(resolve)
170165
.into_stack()
171166
.push(NewGateway::layer(local_id))
172167
.push(profiles::discover::layer(profiles, move |t: HttpTarget| {

linkerd/app/outbound/src/http/logical.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
use super::{CanonicalDstHeader, Concrete, Logical};
2-
use crate::{resolve, stack_labels, Outbound};
1+
use super::{CanonicalDstHeader, Concrete, Endpoint, Logical};
2+
use crate::{resolve, stack_labels, target, Outbound};
33
use linkerd_app_core::{
44
classify, config, profiles,
5-
proxy::{core::Resolve, http},
5+
proxy::{
6+
api_resolve::{ConcreteAddr, Metadata},
7+
core::Resolve,
8+
http,
9+
resolve::map_endpoint,
10+
},
611
retry, svc, tls, Error, Never, DST_OVERRIDE_HEADER,
712
};
813
use tracing::debug_span;
@@ -25,14 +30,13 @@ impl<E> Outbound<E> {
2530
where
2631
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
2732
B::Data: Send + 'static,
28-
E: svc::NewService<R::Endpoint, Service = ESvc> + Clone + Send + 'static,
33+
E: svc::NewService<Endpoint, Service = ESvc> + Clone + Send + 'static,
2934
ESvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
3035
+ Send
3136
+ 'static,
3237
ESvc::Error: Into<Error>,
3338
ESvc::Future: Send,
34-
R: Resolve<Concrete, Error = Error> + Clone + Send + 'static,
35-
R::Endpoint: From<(tls::NoClientTls, Logical)> + Clone + Send,
39+
R: Resolve<ConcreteAddr, Error = Error, Endpoint = Metadata> + Clone + Send + 'static,
3640
R::Resolution: Send,
3741
R::Future: Send + Unpin,
3842
{
@@ -49,9 +53,27 @@ impl<E> Outbound<E> {
4953
} = config.proxy;
5054
let watchdog = cache_max_idle_age * 2;
5155

56+
let identity_disabled = rt.identity.is_none();
57+
let no_tls_reason = if identity_disabled {
58+
tls::NoClientTls::Disabled
59+
} else {
60+
tls::NoClientTls::NotProvidedByServiceDiscovery
61+
};
62+
let resolve = svc::stack(resolve.into_service())
63+
.check_service::<ConcreteAddr>()
64+
.push_request_filter(|c: Concrete| Ok::<_, Never>(c.resolve))
65+
.push(svc::layer::mk(move |inner| {
66+
map_endpoint::Resolve::new(
67+
target::EndpointFromMetadata { identity_disabled },
68+
inner,
69+
)
70+
}))
71+
.check_service::<Concrete>()
72+
.into_inner();
73+
5274
let stack = endpoint
5375
.clone()
54-
.check_new_service::<R::Endpoint, http::Request<http::BoxBody>>()
76+
.check_new_service::<Endpoint, http::Request<http::BoxBody>>()
5577
.push_on_response(
5678
svc::layers()
5779
.push(http::BoxRequest::layer())
@@ -64,7 +86,7 @@ impl<E> Outbound<E> {
6486
// the balancer need not drive them all directly.
6587
.push(svc::layer::mk(svc::SpawnReady::new)),
6688
)
67-
.check_new_service::<R::Endpoint, http::Request<_>>()
89+
.check_new_service::<Endpoint, http::Request<_>>()
6890
// Resolve the service to its endpoints and balance requests over them.
6991
//
7092
// If the balancer has been empty/unavailable, eagerly fail requests.
@@ -100,12 +122,7 @@ impl<E> Outbound<E> {
100122
.push(http::BoxRequest::layer())
101123
.push(http::BoxResponse::layer()),
102124
)
103-
.push_map_target(|logical: Logical| {
104-
R::Endpoint::from((
105-
tls::NoClientTls::NotProvidedByServiceDiscovery,
106-
logical,
107-
))
108-
})
125+
.push_map_target(move |l: Logical| Endpoint::from((no_tls_reason, l)))
109126
.into_inner(),
110127
))
111128
// Distribute requests over a distribution of balancers via a
@@ -154,7 +171,7 @@ impl<E> Outbound<E> {
154171
)
155172
.instrument(|l: &Logical| debug_span!("logical", dst = %l.addr()))
156173
.push_switch(
157-
|logical: Logical| {
174+
move |logical: Logical| {
158175
let should_resolve = match logical.profile.as_ref() {
159176
Some(p) => {
160177
let p = p.borrow();
@@ -166,10 +183,7 @@ impl<E> Outbound<E> {
166183
if should_resolve {
167184
Ok::<_, Never>(svc::Either::A(logical))
168185
} else {
169-
Ok(svc::Either::B(R::Endpoint::from((
170-
tls::NoClientTls::NotProvidedByServiceDiscovery,
171-
logical,
172-
))))
186+
Ok(svc::Either::B(Endpoint::from((no_tls_reason, logical))))
173187
}
174188
},
175189
svc::stack(endpoint)

linkerd/app/outbound/src/http/tests.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::Endpoint;
22

33
use crate::{
4-
target,
54
test_util::{
65
support::{connect::Connect, http_util, profile, resolver, track},
76
*,
@@ -12,7 +11,6 @@ use bytes::Bytes;
1211
use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
1312
use linkerd_app_core::{
1413
io,
15-
proxy::resolve::map_endpoint,
1614
svc::{self, NewService},
1715
tls,
1816
transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
@@ -76,10 +74,7 @@ where
7674
out.clone().with_stack(NoTcpBalancer).push_detect_http(
7775
out.with_stack(connect)
7876
.push_http_endpoint()
79-
.push_http_logical(map_endpoint::Resolve::new(
80-
target::EndpointFromMetadata::default(),
81-
resolver,
82-
))
77+
.push_http_logical(resolver)
8378
.push_http_server()
8479
.into_inner(),
8580
)

linkerd/app/outbound/src/lib.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ pub(crate) mod test_util;
1717
use linkerd_app_core::{
1818
config::ProxyConfig,
1919
io, metrics, profiles,
20-
proxy::{api_resolve::Metadata, core::Resolve, resolve::map_endpoint},
20+
proxy::{
21+
api_resolve::{ConcreteAddr, Metadata},
22+
core::Resolve,
23+
},
2124
serve, svc, tls,
2225
transport::listen,
2326
AddrMatch, Error, ProxyRuntime,
@@ -105,40 +108,29 @@ impl<S> Outbound<S> {
105108
<S as svc::Service<http::Endpoint>>::Response:
106109
tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
107110
<S as svc::Service<http::Endpoint>>::Future: Send + Unpin,
108-
R: Resolve<http::Concrete, Endpoint = Metadata, Error = Error>,
109-
<R as Resolve<http::Concrete>>::Resolution: Send,
110-
<R as Resolve<http::Concrete>>::Future: Send + Unpin,
111111
<S as svc::Service<tcp::Endpoint>>::Response: tls::HasNegotiatedProtocol,
112112
<S as svc::Service<tcp::Endpoint>>::Response:
113113
tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
114114
<S as svc::Service<tcp::Endpoint>>::Future: Send,
115-
R: Resolve<tcp::Concrete, Endpoint = Metadata, Error = Error>,
116-
<R as Resolve<tcp::Concrete>>::Resolution: Send,
117-
<R as Resolve<tcp::Concrete>>::Future: Send + Unpin,
118115
R: Clone + Send + 'static,
116+
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
117+
R::Resolution: Send,
118+
R::Future: Send + Unpin,
119119
P: profiles::GetProfile<profiles::LogicalAddr> + Clone + Send + 'static,
120120
P::Future: Send,
121121
P::Error: Send,
122122
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
123123
{
124-
let identity_disabled = self.runtime.identity.is_none();
125-
126124
let http = self
127125
.clone()
128126
.push_tcp_endpoint()
129127
.push_http_endpoint()
130-
.push_http_logical(map_endpoint::Resolve::new(
131-
target::EndpointFromMetadata { identity_disabled },
132-
resolve.clone(),
133-
))
128+
.push_http_logical(resolve.clone())
134129
.push_http_server()
135130
.into_inner();
136131

137132
self.push_tcp_endpoint()
138-
.push_tcp_logical(map_endpoint::Resolve::new(
139-
target::EndpointFromMetadata { identity_disabled },
140-
resolve,
141-
))
133+
.push_tcp_logical(resolve)
142134
.push_detect_http(http)
143135
.push_discover(profiles)
144136
.into_inner()
@@ -148,13 +140,10 @@ impl<S> Outbound<S> {
148140
impl Outbound<()> {
149141
pub fn serve<P, R>(self, profiles: P, resolve: R) -> (SocketAddr, impl Future<Output = ()>)
150142
where
151-
R: Resolve<http::Concrete, Endpoint = Metadata, Error = Error>,
152-
<R as Resolve<http::Concrete>>::Resolution: Send,
153-
<R as Resolve<http::Concrete>>::Future: Send + Unpin,
154-
R: Resolve<tcp::Concrete, Endpoint = Metadata, Error = Error>,
155-
<R as Resolve<tcp::Concrete>>::Resolution: Send,
156-
<R as Resolve<tcp::Concrete>>::Future: Send + Unpin,
157143
R: Clone + Send + Sync + Unpin + 'static,
144+
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
145+
R::Resolution: Send,
146+
R::Future: Send + Unpin,
158147
P: profiles::GetProfile<profiles::LogicalAddr> + Clone + Send + Sync + Unpin + 'static,
159148
P::Future: Send,
160149
P::Error: Send,
@@ -170,7 +159,6 @@ impl Outbound<()> {
170159
let serve = async move {
171160
if self.config.ingress_mode {
172161
info!("Outbound routing in ingress-mode");
173-
let identity_disabled = self.runtime.identity.is_none();
174162
let tcp = self
175163
.to_tcp_connect()
176164
.push_tcp_endpoint()
@@ -180,10 +168,7 @@ impl Outbound<()> {
180168
.to_tcp_connect()
181169
.push_tcp_endpoint()
182170
.push_http_endpoint()
183-
.push_http_logical(map_endpoint::Resolve::new(
184-
target::EndpointFromMetadata { identity_disabled },
185-
resolve,
186-
))
171+
.push_http_logical(resolve)
187172
.into_inner();
188173
let stack = self.to_ingress(profiles, tcp, http);
189174
let shutdown = self.runtime.drain.signaled();

linkerd/app/outbound/src/target.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::net::SocketAddr;
1313
use tracing::debug;
1414

1515
#[derive(Copy, Clone)]
16-
pub struct EndpointFromMetadata {
16+
pub(crate) struct EndpointFromMetadata {
1717
pub identity_disabled: bool,
1818
}
1919

@@ -191,7 +191,7 @@ impl<P> From<(tls::NoClientTls, Logical<P>)> for Endpoint<P> {
191191
},
192192
Some((addr, metadata)) => Self {
193193
addr: Remote(ServerAddr(addr)),
194-
tls: EndpointFromMetadata::client_tls(&metadata),
194+
tls: EndpointFromMetadata::client_tls(&metadata, reason),
195195
metadata,
196196
logical_addr: logical.addr(),
197197
protocol: logical.protocol,
@@ -252,16 +252,8 @@ impl<P: std::hash::Hash> std::hash::Hash for Endpoint<P> {
252252

253253
// === EndpointFromMetadata ===
254254

255-
impl Default for EndpointFromMetadata {
256-
fn default() -> Self {
257-
Self {
258-
identity_disabled: false,
259-
}
260-
}
261-
}
262-
263255
impl EndpointFromMetadata {
264-
fn client_tls(metadata: &Metadata) -> tls::ConditionalClientTls {
256+
fn client_tls(metadata: &Metadata, reason: tls::NoClientTls) -> tls::ConditionalClientTls {
265257
// If we're transporting an opaque protocol OR we're communicating with
266258
// a gateway, then set an ALPN value indicating support for a transport
267259
// header.
@@ -283,9 +275,7 @@ impl EndpointFromMetadata {
283275
},
284276
})
285277
})
286-
.unwrap_or(Conditional::None(
287-
tls::NoClientTls::NotProvidedByServiceDiscovery,
288-
))
278+
.unwrap_or(Conditional::None(reason))
289279
}
290280
}
291281

@@ -302,7 +292,7 @@ impl<P: Copy + std::fmt::Debug> MapEndpoint<Concrete<P>, Metadata> for EndpointF
302292
let tls = if self.identity_disabled {
303293
tls::ConditionalClientTls::None(tls::NoClientTls::Disabled)
304294
} else {
305-
Self::client_tls(&metadata)
295+
Self::client_tls(&metadata, tls::NoClientTls::NotProvidedByServiceDiscovery)
306296
};
307297
Endpoint {
308298
addr: Remote(ServerAddr(addr)),

0 commit comments

Comments
 (0)