Skip to content

Commit e06982b

Browse files
authored
outbound: Reorganize outbound stacks (#662)
There's no functional change here, just some general cleanup after recent changes, including: * The TCP balancer is no longer responsible for fallback forwarding. This makes the server responsible for all TCP forwarding. The metrics have been removed from the TCP balancer stack (for now). * Loop detection is moved to the connect stack, rather than the accept stack. * The TCP balancer tests have been simplified somewhat. * `tcp::Forward` no longer has a maker. `OnResponse` does the job just fine.
1 parent 6719486 commit e06982b

File tree

6 files changed

+139
-179
lines changed

6 files changed

+139
-179
lines changed

linkerd/app/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl From<Addr> for DiscoveryRejected {
122122
}
123123
}
124124

125-
#[derive(Clone, Debug)]
125+
#[derive(Clone, Debug, Default)]
126126
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);
127127

128128
impl From<indexmap::IndexSet<u16>> for SkipByPort {

linkerd/app/inbound/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ impl Config {
366366
// Forwards TCP streams that cannot be decoded as HTTP.
367367
let tcp_forward = svc::stack(tcp_connect)
368368
.push_make_thunk()
369-
.push(svc::layer::mk(tcp::Forward::new));
369+
.push_on_response(svc::layer::mk(tcp::Forward::new));
370370

371371
let http = DetectHttp::new(
372372
h2_settings,

linkerd/app/outbound/src/lib.rs

Lines changed: 102 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#![deny(warnings, rust_2018_idioms)]
77

88
pub use self::endpoint::{HttpConcrete, HttpEndpoint, HttpLogical, LogicalPerRequest, TcpEndpoint};
9-
use ::http::header::HOST;
109
use futures::{future, prelude::*};
1110
use linkerd2_app_core::{
1211
admit, classify,
@@ -21,8 +20,8 @@ use linkerd2_app_core::{
2120
spans::SpanConverter,
2221
svc::{self, NewService},
2322
transport::{self, listen, tls},
24-
Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
25-
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
23+
Addr, Conditional, DiscoveryRejected, Error, Never, ProxyMetrics, StackMetrics,
24+
TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
2625
};
2726
use std::{
2827
collections::HashMap,
@@ -53,6 +52,7 @@ pub struct Config {
5352
impl Config {
5453
pub fn build_tcp_connect(
5554
&self,
55+
prevent_loop: impl Into<PreventLoop>,
5656
local_identity: tls::Conditional<identity::Local>,
5757
metrics: &ProxyMetrics,
5858
) -> impl tower::Service<
@@ -76,9 +76,75 @@ impl Config {
7676
// Limits the time we wait for a connection to be established.
7777
.push_timeout(self.proxy.connect.timeout)
7878
.push(metrics.transport.layer_connect(TransportLabels))
79+
.push(admit::AdmitLayer::new(prevent_loop.into()))
7980
.into_inner()
8081
}
8182

83+
/// Constructs a TCP load balancer.
84+
pub fn build_tcp_balance<C, E, I>(
85+
&self,
86+
connect: C,
87+
resolve: E,
88+
) -> impl tower::Service<
89+
SocketAddr,
90+
Error = impl Into<Error>,
91+
Future = impl Unpin + Send + 'static,
92+
Response = impl tower::Service<
93+
I,
94+
Response = (),
95+
Future = impl Unpin + Send + 'static,
96+
Error = impl Into<Error>,
97+
> + Unpin
98+
+ Clone
99+
+ Send
100+
+ 'static,
101+
> + Unpin
102+
+ Clone
103+
+ Send
104+
+ 'static
105+
where
106+
C: tower::Service<TcpEndpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
107+
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
108+
C::Future: Unpin + Send,
109+
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
110+
E::Future: Unpin + Send,
111+
E::Resolution: Unpin + Send,
112+
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
113+
{
114+
let ProxyConfig {
115+
dispatch_timeout,
116+
cache_max_idle_age,
117+
buffer_capacity,
118+
..
119+
} = self.proxy;
120+
121+
svc::stack(connect)
122+
.push_make_thunk()
123+
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
124+
.check_make_service::<TcpEndpoint, ()>()
125+
.push(discover::resolve(map_endpoint::Resolve::new(
126+
endpoint::FromMetadata,
127+
resolve,
128+
)))
129+
.push(discover::buffer(1_000, cache_max_idle_age))
130+
.push_map_target(Addr::from)
131+
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
132+
.push_on_response(svc::layer::mk(tcp::Forward::new))
133+
.into_new_service()
134+
.check_new_service::<SocketAddr, I>()
135+
.cache(
136+
svc::layers().push_on_response(
137+
svc::layers()
138+
.push_failfast(dispatch_timeout)
139+
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
140+
),
141+
)
142+
.spawn_buffer(buffer_capacity)
143+
.push_make_ready()
144+
.instrument(|_: &_| info_span!("tcp"))
145+
.check_make_service::<SocketAddr, I>()
146+
}
147+
82148
pub fn build_dns_refine(
83149
&self,
84150
dns_resolver: dns::Resolver,
@@ -119,14 +185,13 @@ impl Config {
119185

120186
pub fn build_http_endpoint<B, C>(
121187
&self,
122-
prevent_loop: impl Into<PreventLoop>,
123188
tcp_connect: C,
124189
tap_layer: tap::Layer,
125190
metrics: ProxyMetrics,
126191
span_sink: Option<mpsc::Sender<oc::Span>>,
127192
) -> impl tower::Service<
128193
HttpEndpoint,
129-
Error = Error,
194+
Error = Never,
130195
Future = impl Unpin + Send,
131196
Response = impl tower::Service<
132197
http::Request<B>,
@@ -169,18 +234,24 @@ impl Config {
169234
// Re-establishes a connection when the client fails.
170235
.push(reconnect::layer({
171236
let backoff = self.proxy.connect.backoff.clone();
172-
move |_| Ok(backoff.stream())
237+
move |e: Error| {
238+
if is_loop(&*e) {
239+
Err(e)
240+
} else {
241+
Ok(backoff.stream())
242+
}
243+
}
173244
}))
174-
.push(admit::AdmitLayer::new(prevent_loop.into()))
175245
.push(observability.clone())
176246
.push(identity_headers.clone())
177247
.push(http::override_authority::Layer::new(vec![
178-
HOST.as_str(),
248+
::http::header::HOST.as_str(),
179249
CANONICAL_DST_HEADER,
180250
]))
181251
.push_on_response(svc::layers().box_http_response())
182252
.check_service::<HttpEndpoint>()
183253
.instrument(|e: &HttpEndpoint| info_span!("endpoint", peer.addr = %e.addr))
254+
.into_inner()
184255
}
185256

186257
pub fn build_http_router<B, E, S, R, P>(
@@ -205,12 +276,8 @@ impl Config {
205276
where
206277
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
207278
B::Data: Send + 'static,
208-
E: tower::Service<HttpEndpoint, Error = Error, Response = S>
209-
+ Unpin
210-
+ Clone
211-
+ Send
212-
+ Sync
213-
+ 'static,
279+
E: tower::Service<HttpEndpoint, Response = S> + Unpin + Clone + Send + Sync + 'static,
280+
E::Error: Into<Error>,
214281
E::Future: Unpin + Send,
215282
S: tower::Service<
216283
http::Request<http::boxed::Payload>,
@@ -319,6 +386,7 @@ impl Config {
319386
),
320387
)
321388
.spawn_buffer(buffer_capacity)
389+
.push_make_ready()
322390
.check_make_service::<HttpLogical, http::Request<_>>();
323391

324392
// Caches clients that bypass discovery/balancing.
@@ -343,7 +411,6 @@ impl Config {
343411
// `forward` stack is used instead, bypassing load balancing, etc.
344412
logical
345413
.push_on_response(svc::layers().box_http_response())
346-
.push_make_ready()
347414
.push_fallback_with_predicate(
348415
forward
349416
.push_map_target(HttpEndpoint::from)
@@ -359,82 +426,6 @@ impl Config {
359426
.into_inner()
360427
}
361428

362-
/// Constructs a TCP load balancer.
363-
pub fn build_tcp_balance<C, E, I>(
364-
&self,
365-
tcp_connect: &C,
366-
resolve: E,
367-
prevent_loop: PreventLoop,
368-
metrics: &ProxyMetrics,
369-
) -> impl tower::Service<
370-
SocketAddr,
371-
Error = Error,
372-
Future = impl Unpin + Send + 'static,
373-
Response = impl tower::Service<
374-
I,
375-
Response = (),
376-
Future = impl Unpin + Send + 'static,
377-
Error = Error,
378-
> + Unpin
379-
+ Clone
380-
+ Send
381-
+ 'static,
382-
> + Unpin
383-
+ Clone
384-
+ Send
385-
+ Sync
386-
+ 'static
387-
where
388-
C: tower::Service<TcpEndpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
389-
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
390-
C::Future: Unpin + Send,
391-
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
392-
E::Future: Unpin + Send,
393-
E::Resolution: Unpin + Send,
394-
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
395-
{
396-
let ProxyConfig {
397-
dispatch_timeout,
398-
cache_max_idle_age,
399-
buffer_capacity,
400-
..
401-
} = self.proxy;
402-
svc::stack(tcp_connect.clone())
403-
.push_make_thunk()
404-
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
405-
.push(admit::AdmitLayer::new(prevent_loop))
406-
.check_make_service::<TcpEndpoint, ()>()
407-
.push(discover::resolve(map_endpoint::Resolve::new(
408-
endpoint::FromMetadata,
409-
resolve,
410-
)))
411-
.push(discover::buffer(1_000, cache_max_idle_age))
412-
.push_map_target(Addr::from)
413-
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
414-
.push_fallback_with_predicate(
415-
svc::stack(tcp_connect.clone())
416-
.push_make_thunk()
417-
.push(admit::AdmitLayer::new(prevent_loop))
418-
.push_map_target(TcpEndpoint::from)
419-
.instrument(|_: &SocketAddr| debug_span!("forward")),
420-
is_discovery_rejected,
421-
)
422-
.into_new_service()
423-
.check_new_service::<SocketAddr, ()>()
424-
.cache(
425-
svc::layers().push_on_response(
426-
svc::layers()
427-
.push_failfast(dispatch_timeout)
428-
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
429-
.push(metrics.stack.layer(stack_labels("tcp"))),
430-
),
431-
)
432-
.spawn_buffer(buffer_capacity)
433-
.check_make_service::<SocketAddr, ()>()
434-
.push(svc::layer::mk(tcp::Forward::new))
435-
.instrument(|_: &SocketAddr| info_span!("tcp"))
436-
}
437-
438429
pub async fn build_server<E, R, C, H, S>(
439430
self,
440431
listen_addr: std::net::SocketAddr,
@@ -483,13 +474,6 @@ impl Config {
483474
..
484475
} = self.proxy;
485476
let canonicalize_timeout = self.canonicalize_timeout;
486-
let prevent_loop = PreventLoop::from(listen_addr.port());
487-
488-
// Load balances TCP streams that cannot be decoded as HTTP.
489-
let tcp_balance =
490-
svc::stack(self.build_tcp_balance(&tcp_connect, resolve, prevent_loop, &metrics))
491-
.push_map_target(|a: listen::Addrs| a.target_addr())
492-
.into_inner();
493477

494478
let http_admit_request = svc::layers()
495479
// Limits the number of in-flight requests.
@@ -510,7 +494,6 @@ impl Config {
510494
// its canonical FQDN to use for routing.
511495
.push(http::canonicalize::Layer::new(refine, canonicalize_timeout))
512496
.check_make_service::<HttpLogical, http::Request<_>>()
513-
.push_make_ready()
514497
.push_timeout(dispatch_timeout)
515498
.push(router::Layer::new(LogicalPerRequest::from))
516499
.check_new_service::<listen::Addrs, http::Request<_>>()
@@ -529,6 +512,21 @@ impl Config {
529512
.into_inner()
530513
.into_make_service();
531514

515+
let tcp_forward = svc::stack(tcp_connect.clone())
516+
.push_make_thunk()
517+
.push_on_response(svc::layer::mk(tcp::Forward::new))
518+
.instrument(|_: &TcpEndpoint| info_span!("forward"))
519+
.check_service::<TcpEndpoint>();
520+
521+
// Load balances TCP streams that cannot be decoded as HTTP.
522+
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
523+
.push_fallback_with_predicate(
524+
svc::stack(tcp_forward.clone()).push_map_target(TcpEndpoint::from),
525+
is_discovery_rejected,
526+
)
527+
.push_map_target(|a: listen::Addrs| a.target_addr())
528+
.into_inner();
529+
532530
let http = http::DetectHttp::new(
533531
h2_settings,
534532
detect_protocol_timeout,
@@ -537,16 +535,10 @@ impl Config {
537535
drain.clone(),
538536
);
539537

540-
let tcp_forward = svc::stack(tcp_connect)
541-
.push_make_thunk()
542-
.push(svc::layer::mk(tcp::Forward::new))
543-
.push(admit::AdmitLayer::new(prevent_loop))
544-
.push_map_target(TcpEndpoint::from);
545-
546538
let accept = svc::stack(svc::stack::MakeSwitch::new(
547539
skip_detect.clone(),
548540
http,
549-
tcp_forward,
541+
tcp_forward.push_map_target(TcpEndpoint::from),
550542
))
551543
.push(metrics.transport.layer_accept(TransportLabels));
552544

@@ -604,3 +596,7 @@ fn is_discovery_rejected(err: &Error) -> bool {
604596
tracing::debug!(rejected, %err);
605597
rejected
606598
}
599+
600+
fn is_loop(err: &(dyn std::error::Error + 'static)) -> bool {
601+
err.is::<prevent_loop::LoopPrevented>() || err.source().map(is_loop).unwrap_or(false)
602+
}

0 commit comments

Comments
 (0)