Skip to content

Commit 8b6f9a0

Browse files
authored
Make fallback HTTP-agnostic (#393)
The fallback module is currently HTTP-aware, as it must make response body types uniform. This change modifies fallback to instead require that both _service types_ are identical. This is accomplished by boxing the services, using a new http boxed service type. This has the side-effect of dramatically improving compilation times for the outbound proxy.
1 parent 8106280 commit 8b6f9a0

File tree

8 files changed

+294
-182
lines changed

8 files changed

+294
-182
lines changed

Cargo.lock

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -736,10 +736,7 @@ dependencies = [
736736
name = "linkerd2-fallback"
737737
version = "0.1.0"
738738
dependencies = [
739-
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
740739
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
741-
"http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
742-
"hyper 0.12.34 (registry+https://github.com/rust-lang/crates.io-index)",
743740
"linkerd2-error 0.1.0",
744741
"tower 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
745742
"tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",

linkerd/app/core/src/proxy/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Tools for building a transparent TCP/HTTP proxy.
22
3+
pub use linkerd2_fallback as fallback;
34
pub use linkerd2_proxy_api_resolve as api_resolve;
45
pub use linkerd2_proxy_core as core;
56
pub use linkerd2_proxy_detect as detect;

linkerd/app/core/src/svc.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crate::proxy::{buffer, pending};
1+
use crate::proxy::{buffer, http, pending};
2+
use crate::Error;
23
pub use linkerd2_router::Make;
34
pub use linkerd2_stack::{self as stack, layer, map_target, Layer, LayerExt, Shared};
45
pub use linkerd2_timeout::stack as timeout;
@@ -53,6 +54,14 @@ impl<L> Layers<L> {
5354
pub fn push_spawn_ready(self) -> Layers<Pair<L, SpawnReadyLayer>> {
5455
self.push(SpawnReadyLayer::new())
5556
}
57+
58+
pub fn boxed<A, B>(self) -> Layers<Pair<L, http::boxed::Layer<A, B>>>
59+
where
60+
A: 'static,
61+
B: hyper::body::Payload<Data = http::boxed::Data, Error = Error> + 'static,
62+
{
63+
self.push(http::boxed::Layer::new())
64+
}
5665
}
5766

5867
impl<M, L: Layer<M>> Layer<M> for Layers<L> {
@@ -104,6 +113,15 @@ impl<S> Stack<S> {
104113
self.push(TimeoutLayer::new(timeout))
105114
}
106115

116+
pub fn boxed<T, A, B>(self) -> Stack<http::boxed::Make<S, A, B>>
117+
where
118+
A: 'static,
119+
S: tower::MakeService<T, http::Request<A>, Response = http::Response<B>>,
120+
B: hyper::body::Payload<Data = http::boxed::Data, Error = Error> + 'static,
121+
{
122+
self.push(http::boxed::Layer::new())
123+
}
124+
107125
/// Validates that this stack serves T-typed targets.
108126
pub fn makes<T>(self) -> Self
109127
where

linkerd/app/outbound/src/lib.rs

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,7 @@ use linkerd2_app_core::{
1515
http_request_l5d_override_dst_addr, http_request_orig_dst_addr,
1616
opencensus::proto::trace::v1 as oc,
1717
proxy::{
18-
self,
19-
core::resolve::Resolve,
20-
discover,
21-
http::{
22-
balance, canonicalize, client, fallback, header_from_target, insert,
23-
metrics as http_metrics, normalize_uri, profiles, retry, settings, strip_header,
24-
},
25-
identity,
26-
resolve::map_endpoint,
18+
self, core::resolve::Resolve, discover, fallback, http, identity, resolve::map_endpoint,
2719
tap, tcp, Server,
2820
},
2921
reconnect, router, serve,
@@ -132,15 +124,15 @@ impl<A: OrigDstAddr> Config<A> {
132124
// Instantiates an HTTP client for for a `client::Config`
133125
let client_stack = connect_stack
134126
.clone()
135-
.push(client::layer(connect.h2_settings))
127+
.push(http::client::layer(connect.h2_settings))
136128
.push(reconnect::layer({
137129
let backoff = connect.backoff.clone();
138130
move |_| Ok(backoff.stream())
139131
}))
140132
.push(trace_context::layer(span_sink.clone().map(|span_sink| {
141133
SpanConverter::client(span_sink, trace_labels())
142134
})))
143-
.push(normalize_uri::layer());
135+
.push(http::normalize_uri::layer());
144136

145137
// A per-`outbound::Endpoint` stack that:
146138
//
@@ -156,21 +148,22 @@ impl<A: OrigDstAddr> Config<A> {
156148
// the server, before we apply our own.
157149
let endpoint_stack = client_stack
158150
.serves::<Endpoint>()
159-
.push(strip_header::response::layer(L5D_REMOTE_IP))
160-
.push(strip_header::response::layer(L5D_SERVER_ID))
161-
.push(strip_header::request::layer(L5D_REQUIRE_ID))
151+
.push(http::strip_header::response::layer(L5D_REMOTE_IP))
152+
.push(http::strip_header::response::layer(L5D_SERVER_ID))
153+
.push(http::strip_header::request::layer(L5D_REQUIRE_ID))
162154
// disabled due to information leagkage
163155
//.push(add_remote_ip_on_rsp::layer())
164156
//.push(add_server_id_on_rsp::layer())
165157
.push(orig_proto_upgrade::layer())
166158
.push(tap_layer.clone())
167-
.push(http_metrics::layer::<_, classify::Response>(
159+
.push(http::metrics::layer::<_, classify::Response>(
168160
metrics.http_endpoint,
169161
))
170162
.push(require_identity_on_endpoint::layer())
171163
.push(trace::layer(|endpoint: &Endpoint| {
172164
info_span!("endpoint", peer.addr = %endpoint.addr, peer.id = ?endpoint.identity)
173-
}));
165+
}))
166+
.serves::<Endpoint>();
174167

175168
// A per-`dst::Route` layer that uses profile data to configure
176169
// a per-route layer.
@@ -184,13 +177,13 @@ impl<A: OrigDstAddr> Config<A> {
184177
// 3. Retries are optionally enabled depending on if the route
185178
// is retryable.
186179
let dst_route_layer = svc::layers()
187-
.push(insert::target::layer())
188-
.push(http_metrics::layer::<_, classify::Response>(
180+
.push(http::insert::target::layer())
181+
.push(http::metrics::layer::<_, classify::Response>(
189182
metrics.http_route_retry.clone(),
190183
))
191-
.push(retry::layer(metrics.http_route_retry))
192-
.push(proxy::http::timeout::layer())
193-
.push(http_metrics::layer::<_, classify::Response>(
184+
.push(http::retry::layer(metrics.http_route_retry))
185+
.push(http::timeout::layer())
186+
.push(http::metrics::layer::<_, classify::Response>(
194187
metrics.http_route,
195188
))
196189
.push(classify::layer())
@@ -215,16 +208,19 @@ impl<A: OrigDstAddr> Config<A> {
215208
.push_spawn_ready()
216209
.push(discover::Layer::new(
217210
DISCOVER_UPDATE_BUFFER_CAPACITY,
218-
map_endpoint::Resolve::new(endpoint::FromMetadata, resolve),
211+
map_endpoint::Resolve::new(endpoint::FromMetadata, resolve.clone()),
219212
))
220-
.push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
213+
.push(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
221214

222215
// If the balancer fails to be created, i.e., because it is unresolvable,
223216
// fall back to using a router that dispatches request to the
224217
// application-selected original destination.
225218
let distributor = endpoint_stack
226-
.push(fallback::layer(balancer_layer, orig_dst_router_layer))
227-
.serves::<DstAddr>()
219+
.serves::<Endpoint>()
220+
.push(fallback::layer(
221+
balancer_layer.boxed(),
222+
orig_dst_router_layer.boxed(),
223+
))
228224
.push(trace::layer(
229225
|dst: &DstAddr| info_span!("concrete", dst.concrete = %dst.dst_concrete()),
230226
));
@@ -237,9 +233,14 @@ impl<A: OrigDstAddr> Config<A> {
237233
// 3. Creates a load balancer , configured by resolving the
238234
// `DstAddr` with a resolver.
239235
let dst_stack = distributor
236+
.serves::<DstAddr>()
240237
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
241-
.push(profiles::router::layer(profiles_client, dst_route_layer))
242-
.push(header_from_target::layer(CANONICAL_DST_HEADER));
238+
.makes::<DstAddr>()
239+
.push(http::profiles::router::layer(
240+
profiles_client,
241+
dst_route_layer,
242+
))
243+
.push(http::header_from_target::layer(CANONICAL_DST_HEADER));
243244

244245
// Routes request using the `DstAddr` extension.
245246
//
@@ -254,7 +255,7 @@ impl<A: OrigDstAddr> Config<A> {
254255
router::Config::new(router_capacity, router_max_idle_age),
255256
|req: &http::Request<_>| {
256257
req.extensions().get::<Addr>().cloned().map(|addr| {
257-
DstAddr::outbound(addr, settings::Settings::from_request(req))
258+
DstAddr::outbound(addr, http::settings::Settings::from_request(req))
258259
})
259260
},
260261
))
@@ -264,8 +265,9 @@ impl<A: OrigDstAddr> Config<A> {
264265
// Canonicalizes the request-specified `Addr` via DNS, and
265266
// annotates each request with a refined `Addr` so that it may be
266267
// routed by the dst_router.
267-
let addr_stack = svc::stack(svc::Shared::new(dst_router))
268-
.push(canonicalize::layer(dns_resolver, canonicalize_timeout));
268+
let addr_stack = svc::stack(svc::Shared::new(dst_router)).push(
269+
http::canonicalize::layer(dns_resolver, canonicalize_timeout),
270+
);
269271

270272
// Routes requests to an `Addr`:
271273
//
@@ -283,9 +285,9 @@ impl<A: OrigDstAddr> Config<A> {
283285
// 5. Finally, if the tls::accept::Meta had an SO_ORIGINAL_DST, this TCP
284286
// address is used.
285287
let addr_router = addr_stack
286-
.push(strip_header::request::layer(L5D_CLIENT_ID))
287-
.push(strip_header::request::layer(DST_OVERRIDE_HEADER))
288-
.push(insert::target::layer())
288+
.push(http::strip_header::request::layer(L5D_CLIENT_ID))
289+
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
290+
.push(http::insert::target::layer())
289291
.push(trace::layer(|addr: &Addr| info_span!("addr", %addr)))
290292
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
291293
.push(router::Layer::new(
@@ -315,17 +317,17 @@ impl<A: OrigDstAddr> Config<A> {
315317
// shared `addr_router`. The `tls::accept::Meta` is stored in the request's
316318
// extensions so that it can be used by the `addr_router`.
317319
let server_stack = svc::stack(svc::Shared::new(admission_control))
318-
.push(insert::layer(move || {
320+
.push(http::insert::layer(move || {
319321
DispatchDeadline::after(buffer.dispatch_timeout)
320322
}))
321-
.push(insert::target::layer())
323+
.push(http::insert::target::layer())
322324
.push(errors::layer())
323325
.push(trace::layer(
324326
|src: &tls::accept::Meta| info_span!("source", target.addr = %src.addrs.target_addr()),
325327
))
326-
.push(trace_context::layer(
327-
span_sink.map(|span_sink| SpanConverter::server(span_sink, trace_labels())),
328-
))
328+
.push(trace_context::layer(span_sink.map(|span_sink| {
329+
SpanConverter::server(span_sink, trace_labels())
330+
})))
329331
.push(metrics.http_handle_time.layer());
330332

331333
let forward_tcp = tcp::Forward::new(

linkerd/fallback/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ edition = "2018"
66
publish = false
77

88
[dependencies]
9-
bytes = "0.4"
109
futures = "0.1"
11-
http = "0.1"
12-
hyper = "0.12"
1310
linkerd2-error = { path = "../error" }
1411
tower= "0.1"
1512
tracing = "0.1"

0 commit comments

Comments
 (0)