Skip to content

Commit dd9372d

Browse files
olix0rhawkw
andauthored
Update routers to support per-request backend distributions (#2095)
The outbound logical stacks use a single `profiles::split` for all requests. Therefore, request routing can't influence a request's backend selection. This change replaces the service-profile-specific router and split implementations in favor of the newer, more general router (577ad52) and distribute (47678e6) modules. Unlike the service-profile-router, the generic router is lazy and only creates route services as a matching request is found. Buffers are moved into the concrete stack so that balancers may be shared by more than one route. This change also updates trace contexts so that we include logical/endpoint metadata as early as possible. Co-authored-by: Eliza Weisman <[email protected]>
1 parent 86feff9 commit dd9372d

File tree

18 files changed

+525
-540
lines changed

18 files changed

+525
-540
lines changed

Cargo.lock

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -939,14 +939,14 @@ dependencies = [
939939
"hyper",
940940
"linkerd-app-core",
941941
"linkerd-app-test",
942+
"linkerd-distribute",
942943
"linkerd-http-classify",
943944
"linkerd-http-retry",
944945
"linkerd-identity",
945946
"linkerd-io",
946947
"linkerd-meshtls",
947948
"linkerd-meshtls-rustls",
948949
"linkerd-retry",
949-
"linkerd-router",
950950
"linkerd-tracing",
951951
"parking_lot",
952952
"pin-project",
@@ -1547,7 +1547,6 @@ dependencies = [
15471547
"futures",
15481548
"http",
15491549
"http-body",
1550-
"indexmap",
15511550
"linkerd-addr",
15521551
"linkerd-dns-name",
15531552
"linkerd-error",
@@ -1557,10 +1556,8 @@ dependencies = [
15571556
"linkerd-tonic-watch",
15581557
"linkerd2-proxy-api",
15591558
"once_cell",
1560-
"pin-project",
15611559
"prost-types",
15621560
"quickcheck",
1563-
"rand",
15641561
"regex",
15651562
"thiserror",
15661563
"tokio",

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ bytes = "1"
1919
http = "0.2"
2020
futures = { version = "0.3", default-features = false }
2121
linkerd-app-core = { path = "../core" }
22+
linkerd-distribute = { path = "../../distribute" }
2223
linkerd-http-classify = { path = "../../http-classify" }
2324
linkerd-http-retry = { path = "../../http-retry" }
2425
linkerd-identity = { path = "../../identity" }
2526
linkerd-retry = { path = "../../retry" }
26-
linkerd-router = { path = "../../router" }
2727
parking_lot = "0.12"
2828
thiserror = "1"
2929
tokio = { version = "1", features = ["sync"] }

linkerd/app/outbound/src/endpoint.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{http, logical::Concrete, tcp, Outbound};
1+
use crate::{http, logical::Concrete, stack_labels, tcp, Outbound};
22
use linkerd_app_core::{
33
io, metrics,
44
profiles::LogicalAddr,
@@ -13,6 +13,7 @@ use std::{
1313
net::{IpAddr, SocketAddr},
1414
sync::Arc,
1515
};
16+
use tracing::info_span;
1617

1718
#[derive(Clone, Debug, PartialEq, Eq)]
1819
pub struct Endpoint<P> {
@@ -201,7 +202,14 @@ impl<P: Copy + std::fmt::Debug> MapEndpoint<Concrete<P>, Metadata> for FromMetad
201202
// === Outbound ===
202203

203204
impl<S> Outbound<S> {
204-
pub fn push_endpoint<I>(self) -> Outbound<svc::ArcNewTcp<tcp::Endpoint, I>>
205+
/// Builds a stack that handles forwarding a connection to a single endpoint
206+
/// (i.e. without routing and load balancing).
207+
///
208+
/// HTTP protocol detection may still be performed on the connection.
209+
///
210+
/// A service produced by this stack is used for a single connection (i.e.
211+
/// without any form of caching for reuse across connections).
212+
pub fn push_forward<I>(self) -> Outbound<svc::ArcNewTcp<tcp::Endpoint, I>>
205213
where
206214
Self: Clone + 'static,
207215
S: svc::MakeConnection<tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
@@ -211,19 +219,38 @@ impl<S> Outbound<S> {
211219
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
212220
I: fmt::Debug + Send + Sync + Unpin + 'static,
213221
{
222+
// The forwarding stacks are **NOT** cached, since they don't do any
223+
// external discovery.
214224
let http = self
215225
.clone()
216226
.push_tcp_endpoint::<http::Connect>()
217227
.push_http_endpoint()
218-
.map_stack(|config, _, stk| {
219-
stk.push_buffer_on_service("HTTP Server", &config.http_request_buffer)
228+
.map_stack(|config, rt, stk| {
229+
stk.push_on_service(
230+
svc::layers()
231+
.push(
232+
rt.metrics
233+
.proxy
234+
.stack
235+
.layer(stack_labels("http", "forward")),
236+
)
237+
// TODO(ver): This buffer config should be distinct from
238+
// that in the concrete stack. It should probably be
239+
// derived from the target so that we can configure it
240+
// via the API.
241+
.push_buffer("HTTP Forward", &config.http_request_buffer),
242+
)
220243
})
221244
.push_http_server()
222245
.into_inner();
223246

224-
self.push_tcp_endpoint()
225-
.push_tcp_forward()
226-
.push_detect_http(http)
247+
let opaque = self.push_tcp_endpoint().push_tcp_forward();
248+
249+
opaque.push_detect_http(http).map_stack(|_, _, stk| {
250+
stk.instrument(|e: &tcp::Endpoint| info_span!("forward", endpoint = %e.addr))
251+
.push_on_service(svc::BoxService::layer())
252+
.push(svc::ArcNewService::layer())
253+
})
227254
}
228255
}
229256

@@ -256,7 +283,7 @@ pub mod tests {
256283
"i don't like you, go away",
257284
))
258285
}))
259-
.push_endpoint()
286+
.push_forward()
260287
.into_inner()
261288
.new_service(tcp::Endpoint::forward(
262289
OrigDstAddr(addr),

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,30 @@ use linkerd_app_core::{
99
},
1010
svc, Error, Infallible,
1111
};
12+
use tracing::info_span;
1213

1314
impl<N> Outbound<N> {
15+
/// Builds a [`svc::NewService`] stack that builds buffered HTTP load
16+
/// balancer services for [`Concrete`] targets.
17+
///
18+
/// When a balancer has no available inner services, it goes into
19+
/// 'failfast'. While in failfast, buffered requests are failed and the
20+
/// service becomes unavailable so callers may choose alternate concrete
21+
/// services.
22+
//
23+
// TODO(ver) make the outer target type generic/parameterized.
1424
pub fn push_http_concrete<NSvc, R>(
1525
self,
1626
resolve: R,
1727
) -> Outbound<
1828
svc::ArcNewService<
1929
Concrete,
2030
impl svc::Service<
21-
http::Request<http::BoxBody>,
22-
Response = http::Response<http::BoxBody>,
23-
Error = Error,
24-
Future = impl Send,
25-
>,
31+
http::Request<http::BoxBody>,
32+
Response = http::Response<http::BoxBody>,
33+
Error = Error,
34+
Future = impl Send,
35+
> + Clone,
2636
>,
2737
>
2838
where
@@ -38,14 +48,7 @@ impl<N> Outbound<N> {
3848
R::Future: Send + Unpin,
3949
{
4050
self.map_stack(|config, rt, endpoint| {
41-
let crate::Config {
42-
discovery_idle_timeout,
43-
..
44-
} = config;
45-
let watchdog = *discovery_idle_timeout * 2;
46-
4751
let resolve = svc::stack(resolve.into_service())
48-
.check_service::<ConcreteAddr>()
4952
.push_request_filter(|c: Concrete| Ok::<_, Infallible>(c.resolve))
5053
.push(svc::layer::mk(move |inner| {
5154
map_endpoint::Resolve::new(
@@ -59,51 +62,43 @@ impl<N> Outbound<N> {
5962
.into_inner();
6063

6164
endpoint
62-
.check_new_service::<Endpoint, http::Request<http::BoxBody>>()
6365
.push_on_service(
64-
svc::layers().push(http::BoxRequest::layer()).push(
65-
rt.metrics
66-
.proxy
67-
.stack
68-
.layer(stack_labels("http", "balance.endpoint")),
69-
),
66+
rt.metrics
67+
.proxy
68+
.stack
69+
.layer(stack_labels("http", "endpoint")),
7070
)
71-
.instrument(|t: &Endpoint| tracing::debug_span!("endpoint", addr = %t.addr))
72-
.check_new_service::<Endpoint, http::Request<_>>()
71+
.instrument(|e: &Endpoint| info_span!("endpoint", addr = %e.addr))
7372
// Resolve the service to its endpoints and balance requests over them.
7473
//
75-
// If the balancer has been empty/unavailable, eagerly fail requests.
76-
// When the balancer is in failfast, spawn the service in a background
77-
// task so it becomes ready without new requests.
78-
//
7974
// We *don't* ensure that the endpoint is driven to readiness here, because this
8075
// might cause us to continually attempt to reestablish connections without
8176
// consulting discovery to see whether the endpoint has been removed. Instead, the
82-
// endpoint layer spawns each _connection_ attempt on a background task, but the
77+
// endpoint stack spawns each _connection_ attempt on a background task, but the
8378
// decision to attempt the connection must be driven by the balancer.
84-
.push(resolve::layer(resolve, watchdog))
79+
//
80+
// TODO(ver) remove the watchdog timeout.
81+
.push(resolve::layer(resolve, config.discovery_idle_timeout * 2))
82+
.push_on_service(http::balance::layer(
83+
crate::EWMA_DEFAULT_RTT,
84+
crate::EWMA_DECAY,
85+
))
86+
.check_make_service::<Concrete, http::Request<http::BoxBody>>()
87+
.push(svc::MapErr::layer(Into::into))
88+
// Drives the initial resolution via the service's readiness.
89+
.into_new_service()
8590
.push_on_service(
8691
svc::layers()
87-
.push(http::balance::layer(
88-
crate::EWMA_DEFAULT_RTT,
89-
crate::EWMA_DECAY,
90-
))
92+
.push(http::BoxResponse::layer())
9193
.push(
9294
rt.metrics
9395
.proxy
9496
.stack
95-
.layer(stack_labels("http", "balancer")),
97+
.layer(stack_labels("http", "concrete")),
9698
)
97-
.push(http::BoxResponse::layer()),
99+
.push_buffer("HTTP Concrete", &config.http_request_buffer),
98100
)
99-
.check_make_service::<Concrete, http::Request<_>>()
100-
.push(svc::MapErr::layer(Into::into))
101-
// Drives the initial resolution via the service's readiness.
102-
.into_new_service()
103-
// The concrete address is only set when the profile could be
104-
// resolved. Endpoint resolution is skipped when there is no
105-
// concrete address.
106-
.instrument(|c: &Concrete| tracing::debug_span!("concrete", service = %c.resolve))
101+
.instrument(|c: &Concrete| info_span!("concrete", svc = %c.resolve))
107102
.push(svc::ArcNewService::layer())
108103
})
109104
}

linkerd/app/outbound/src/http/detect.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ use tracing::debug_span;
1111
pub struct Skip;
1212

1313
impl<N> Outbound<N> {
14+
/// Builds a `NewService` that produces services that optionally (depending
15+
/// on the target) perform HTTP protocol detection on sockets.
16+
///
17+
/// When HTTP is detected, an HTTP service is build from the provided HTTP
18+
/// stack. In either case, the inner service is built for each connection so
19+
/// inner services must implement caching as needed.
20+
//
21+
// TODO(ver) We can be smarter about reusing inner services across
22+
// connections by moving caching into this stack...
23+
//
24+
// TODO(ver) Let discovery influence whether we assume an HTTP protocol
25+
// without deteciton.
1426
pub fn push_detect_http<T, U, NSvc, H, HSvc, I>(self, http: H) -> Outbound<svc::ArcNewTcp<T, I>>
1527
where
1628
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
@@ -31,6 +43,8 @@ impl<N> Outbound<N> {
3143
self.map_stack(|config, rt, tcp| {
3244
let ServerConfig { h2_settings, .. } = config.proxy.server;
3345

46+
let tcp = tcp.instrument(|_: &_| debug_span!("opaque"));
47+
3448
svc::stack(http)
3549
.push_on_service(
3650
svc::layers()
@@ -46,7 +60,14 @@ impl<N> Outbound<N> {
4660
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
4761
.into_inner(),
4862
))
49-
.push_on_service(svc::BoxService::layer())
63+
.push_on_service(
64+
svc::layers()
65+
// `DetectService` oneshots the inner service, so we add
66+
// a loadshed to prevent leaking tasks if (for some
67+
// unexpected reason) the inner service is not ready.
68+
.push(svc::LoadShed::layer())
69+
.push(svc::BoxService::layer()),
70+
)
5071
.check_new_service::<(Option<http::Version>, T), _>()
5172
.push_map_target(detect::allow_timeout)
5273
.push(svc::ArcNewService::layer())

0 commit comments

Comments
 (0)