Skip to content

Commit 5a97236

Browse files
authored
outbound: Use per-route services in routing stack (#1380)
The outbound proxy currently maintains a `Proxy` instance for each profile-defined `Route`. This allows the router to use a single underlying logical service, but this prevents per-route logic from influencing the logical target to which the request is dispatched. This change modifies the outbound stack to use an alternate profile router that dispatches over per-route `Service`s instead of per-route `Proxy`s. This is possible because the outbound logical stack is buffered (and implements `Clone`), while the inbound stack is not. To support this, the following changes have been made: * The `linkerd_app_core::dst` module has been eliminated in favor of inbound- and outbound-specific Route target types; * `linkerd_http_classify::Classify` now implements `Service` in addition to `Proxy`; * `linkerd_retry::Retry` now implements `Service` instead of `Proxy`; * `linkerd_service_profiles`now implements two caching routers: `NewServiceRouter` for `Service`s and `NewProxyRouter` for `Proxy`s; * The `linkerd_stack::ProxyService` helper has been removed, as it's not used; and * Various unneeded `http::BoxResponse` layers have been removed from the outound stack.
1 parent 2f19d9c commit 5a97236

File tree

19 files changed

+498
-385
lines changed

19 files changed

+498
-385
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1478,9 +1478,9 @@ dependencies = [
14781478
name = "linkerd-retry"
14791479
version = "0.1.0"
14801480
dependencies = [
1481+
"futures",
14811482
"linkerd-error",
14821483
"linkerd-stack",
1483-
"pin-project",
14841484
"tower",
14851485
"tracing",
14861486
]

linkerd/app/core/src/dst.rs

Lines changed: 0 additions & 28 deletions
This file was deleted.

linkerd/app/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ pub mod classify;
3636
pub mod config;
3737
pub mod control;
3838
pub mod dns;
39-
pub mod dst;
4039
pub mod errors;
4140
pub mod http_tracing;
4241
pub mod metrics;

linkerd/app/core/src/metrics.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub use crate::transport::labels::{TargetAddr, TlsAccept};
22
use crate::{
33
classify::{Class, SuccessOrFailure},
4-
control, dst, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics,
4+
control, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics,
55
svc::Param,
66
telemetry, tls,
77
transport::{self, labels::TlsConnect},
@@ -216,12 +216,22 @@ impl FmtLabels for ControlLabels {
216216

217217
// === impl RouteLabels ===
218218

219-
impl Param<RouteLabels> for dst::Route {
220-
fn param(&self) -> RouteLabels {
221-
RouteLabels {
222-
addr: self.addr.clone(),
223-
direction: self.direction,
224-
labels: prefix_labels("rt", self.route.labels().iter()),
219+
impl RouteLabels {
220+
pub fn inbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self {
221+
let labels = prefix_labels("rt", route.labels().iter());
222+
Self {
223+
addr,
224+
labels,
225+
direction: Direction::In,
226+
}
227+
}
228+
229+
pub fn outbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self {
230+
let labels = prefix_labels("rt", route.labels().iter());
231+
Self {
232+
addr,
233+
labels,
234+
direction: Direction::Out,
225235
}
226236
}
227237
}

linkerd/app/core/src/svc.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use linkerd_error::Recover;
66
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
77
pub use linkerd_reconnect::NewReconnect;
88
pub use linkerd_stack::{
9-
self as stack, layer, ArcNewService, BoxService, BoxServiceLayer, Either, ExtractParam, Fail,
10-
FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter, NewService, Param, Predicate,
11-
UnwrapOr,
9+
self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either,
10+
ExtractParam, Fail, FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter,
11+
NewService, Param, Predicate, UnwrapOr,
1212
};
1313
pub use linkerd_stack_tracing::{NewInstrument, NewInstrumentLayer};
1414
use std::{

linkerd/app/inbound/src/http/router.rs

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::{policy, stack_labels, Inbound};
22
use linkerd_app_core::{
3-
classify, dst, errors, http_tracing, io, metrics,
3+
classify, errors, http_tracing, io, metrics,
44
profiles::{self, DiscoveryRejected},
55
proxy::{http, tap},
66
svc::{self, ExtractParam, Param},
77
tls,
88
transport::{self, ClientAddr, Remote, ServerAddr},
99
Error, Infallible, NameAddr, Result,
1010
};
11-
use std::{borrow::Borrow, net::SocketAddr};
11+
use std::net::SocketAddr;
1212
use tracing::{debug, debug_span};
1313

1414
/// Describes an HTTP client target.
@@ -48,6 +48,12 @@ struct Profile {
4848
profiles: profiles::Receiver,
4949
}
5050

51+
#[derive(Clone, Debug)]
52+
struct Route {
53+
profile: Profile,
54+
route: profiles::http::Route,
55+
}
56+
5157
#[derive(Copy, Clone, Debug)]
5258
struct ClientRescue;
5359

@@ -112,49 +118,42 @@ impl<C> Inbound<C> {
112118
.http_endpoint
113119
.to_layer::<classify::Response, _, _>(),
114120
)
115-
.push_on_service(http_tracing::client(
116-
rt.span_sink.clone(),
117-
super::trace_labels(),
118-
))
119-
.push_on_service(svc::layers()
120-
.push(http::BoxResponse::layer())
121-
// This box is needed to reduce compile times on recent (2021-10-17) nightlies,
122-
// though this may be fixed by https://github.com/rust-lang/rust/pull/89831. It
123-
// should be removed when possible.
124-
.push(svc::BoxService::layer())
121+
.push_on_service(
122+
svc::layers()
123+
.push(http_tracing::client(
124+
rt.span_sink.clone(),
125+
super::trace_labels(),
126+
))
127+
.push(http::BoxResponse::layer())
128+
// This box is needed to reduce compile times on recent
129+
// (2021-10-17) nightlies, though this may be fixed by
130+
// https://github.com/rust-lang/rust/pull/89831. It should
131+
// be removed when possible.
132+
.push(svc::BoxService::layer())
125133
);
126134

127135
// Attempts to discover a service profile for each logical target (as
128136
// informed by the request's headers). The stack is cached until a
129137
// request has not been received for `cache_max_idle_age`.
130138
http.clone()
131139
.check_new_service::<Logical, http::Request<http::BoxBody>>()
132-
// The HTTP stack doesn't use the profile resolution, so drop it.
133-
.push_map_target(Logical::from)
134-
.push_on_service(http::BoxResponse::layer())
135-
.push(profiles::http::route_request::layer(
140+
.push_map_target(|p: Profile| p.logical)
141+
.push(profiles::http::NewProxyRouter::layer(
142+
// If the request matches a route, use a per-route proxy to
143+
// wrap the inner service.
136144
svc::proxies()
145+
.push_map_target(|r: Route| r.profile.logical)
137146
.push_on_service(http::BoxRequest::layer())
138-
// Records per-route metrics.
139147
.push(
140-
rt.metrics.proxy
148+
rt.metrics
149+
.proxy
141150
.http_route
142-
.to_layer::<classify::Response, _, dst::Route>(),
151+
.to_layer::<classify::Response, _, _>(),
143152
)
144-
// Sets the per-route response classifier as a request
145-
// extension.
146-
.push(classify::NewClassify::layer())
147-
// Sets the route as a request extension so that it can be used
148-
// by tap.
149-
.push_http_insert_target::<dst::Route>()
150-
.push_map_target(|(route, logical): (profiles::http::Route, Profile)| {
151-
dst::Route {
152-
route,
153-
addr: logical.addr,
154-
direction: metrics::Direction::In,
155-
}
156-
})
157153
.push_on_service(http::BoxResponse::layer())
154+
.push(classify::NewClassify::layer())
155+
.push_http_insert_target::<profiles::http::Route>()
156+
.push_map_target(|(route, profile)| Route { route, profile })
158157
.into_inner(),
159158
))
160159
.push_switch(
@@ -163,7 +162,7 @@ impl<C> Inbound<C> {
163162
// the underlying target stack directly.
164163
|(rx, logical): (Option<profiles::Receiver>, Logical)| -> Result<_, Infallible> {
165164
if let Some(rx) = rx {
166-
if let Some(addr) = rx.borrow().logical_addr() {
165+
if let Some(addr) = rx.logical_addr() {
167166
return Ok(svc::Either::A(Profile {
168167
addr,
169168
logical,
@@ -174,7 +173,6 @@ impl<C> Inbound<C> {
174173
Ok(svc::Either::B(logical))
175174
},
176175
http.clone()
177-
.push_on_service(http::BoxResponse::layer())
178176
.check_new_service::<Logical, http::Request<_>>()
179177
.into_inner(),
180178
)
@@ -196,19 +194,13 @@ impl<C> Inbound<C> {
196194
Ok(profiles::LookupAddr(addr.into()))
197195
}))
198196
.instrument(|_: &Logical| debug_span!("profile"))
199-
.push_on_service(
200-
svc::layers()
201-
.push(http::BoxResponse::layer())
202-
.push(svc::layer::mk(svc::SpawnReady::new)),
203-
)
197+
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
204198
// Skip the profile stack if it takes too long to become ready.
205199
.push_when_unready(
206200
config.profile_idle_timeout,
207-
http.clone()
208-
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
201+
http.push_on_service(svc::layer::mk(svc::SpawnReady::new))
209202
.into_inner(),
210203
)
211-
.check_new_service::<Logical, http::Request<http::BoxBody>>()
212204
.push_on_service(
213205
svc::layers()
214206
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
@@ -311,6 +303,34 @@ impl<A> svc::stack::RecognizeRoute<http::Request<A>> for LogicalPerRequest {
311303
}
312304
}
313305

306+
// === impl Route ===
307+
308+
impl Param<profiles::http::Route> for Route {
309+
fn param(&self) -> profiles::http::Route {
310+
self.route.clone()
311+
}
312+
}
313+
314+
impl Param<metrics::RouteLabels> for Route {
315+
fn param(&self) -> metrics::RouteLabels {
316+
metrics::RouteLabels::inbound(self.profile.addr.clone(), &self.route)
317+
}
318+
}
319+
320+
impl classify::CanClassify for Route {
321+
type Classify = classify::Request;
322+
323+
fn classify(&self) -> classify::Request {
324+
self.route.response_classes().clone().into()
325+
}
326+
}
327+
328+
impl Param<http::ResponseTimeout> for Route {
329+
fn param(&self) -> http::ResponseTimeout {
330+
http::ResponseTimeout(self.route.timeout())
331+
}
332+
}
333+
314334
// === impl Profile ===
315335

316336
impl Param<profiles::Receiver> for Profile {
@@ -387,8 +407,8 @@ impl tap::Inspect for Logical {
387407

388408
fn route_labels<B>(&self, req: &http::Request<B>) -> Option<tap::Labels> {
389409
req.extensions()
390-
.get::<dst::Route>()
391-
.map(|r| r.route.labels().clone())
410+
.get::<profiles::http::Route>()
411+
.map(|r| r.labels().clone())
392412
}
393413

394414
fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {

linkerd/app/outbound/src/http.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub(crate) use self::{require_id_header::IdentityRequired, server::ServerRescue}
1515
use crate::tcp;
1616
pub use linkerd_app_core::proxy::http::*;
1717
use linkerd_app_core::{
18-
dst,
18+
classify, metrics,
1919
profiles::{self, LogicalAddr},
2020
proxy::{api_resolve::ProtocolHint, tap},
2121
svc::Param,
@@ -30,6 +30,12 @@ pub type Logical = crate::logical::Logical<Version>;
3030
pub type Concrete = crate::logical::Concrete<Version>;
3131
pub type Endpoint = crate::endpoint::Endpoint<Version>;
3232

33+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34+
struct Route {
35+
logical: Logical,
36+
route: profiles::http::Route,
37+
}
38+
3339
#[derive(Clone, Debug)]
3440
pub struct CanonicalDstHeader(pub Addr);
3541

@@ -85,17 +91,6 @@ impl Param<Version> for Logical {
8591
}
8692
}
8793

88-
impl Logical {
89-
pub fn mk_route((route, logical): (profiles::http::Route, Self)) -> dst::Route {
90-
use linkerd_app_core::metrics::Direction;
91-
dst::Route {
92-
route,
93-
addr: logical.logical_addr,
94-
direction: Direction::Out,
95-
}
96-
}
97-
}
98-
9994
impl Param<normalize_uri::DefaultAuthority> for Logical {
10095
fn param(&self) -> normalize_uri::DefaultAuthority {
10196
normalize_uri::DefaultAuthority(Some(
@@ -190,11 +185,39 @@ impl tap::Inspect for Endpoint {
190185

191186
fn route_labels<B>(&self, req: &Request<B>) -> Option<tap::Labels> {
192187
req.extensions()
193-
.get::<dst::Route>()
194-
.map(|r| r.route.labels().clone())
188+
.get::<profiles::http::Route>()
189+
.map(|r| r.labels().clone())
195190
}
196191

197192
fn is_outbound<B>(&self, _: &Request<B>) -> bool {
198193
true
199194
}
200195
}
196+
197+
// === impl Route ===
198+
199+
impl Param<profiles::http::Route> for Route {
200+
fn param(&self) -> profiles::http::Route {
201+
self.route.clone()
202+
}
203+
}
204+
205+
impl Param<metrics::RouteLabels> for Route {
206+
fn param(&self) -> metrics::RouteLabels {
207+
metrics::RouteLabels::outbound(self.logical.logical_addr.clone(), &self.route)
208+
}
209+
}
210+
211+
impl Param<ResponseTimeout> for Route {
212+
fn param(&self) -> ResponseTimeout {
213+
ResponseTimeout(self.route.timeout())
214+
}
215+
}
216+
217+
impl classify::CanClassify for Route {
218+
type Classify = classify::Request;
219+
220+
fn classify(&self) -> classify::Request {
221+
self.route.response_classes().clone().into()
222+
}
223+
}

0 commit comments

Comments
 (0)