Skip to content

Commit 91bbbd4

Browse files
authored
outbound: Refactor stack target types (#2210)
To support a new configuration API, we want to update the outbound stack to compose the following stacks: * Routing discovery; * Protocol switching; * Protocol-specific logical routing; * Concrete forwarding / load balancing; * Endpoint dispatching. To do this, we update the existing stacks so that: * each stack accepts generic input targets (with enumerated `Param` requirements); and * each module defines all necessary internal target types, including those passed to inner stacks. This decouples stack configuration from the specifics of discovery: tests can satisfy stack parameters without having to stub out all of the discovery types and we are setup to change discovery implementation details moving forward. This requires updating the gateway and ingress stacks in addition to the (default) sidecar stack. Generally, we prefer duplicating similar target types to coupling modules. In followup changes, we can introduce macros to reduce `Param` boilerplate if necessary. Changes include: * A new `Gateway` builder type (analogous to `Inbound` and `Outbound`); * A new `Gateway::server` stack handles discovery and protocol switching; * Service cacheing is performed for the http and opaq stacks (to preserve load balancer state, etc) using only the information needed for those stacks; * Concrete stacks provide the only data-plane queue. This enables outer stacks to be cloneable. This also means that the concrete stacks are the only stacks with stack metrics; * The `switch_logical` module is removed; * The `http::detect` module is moved into the `protocol` stack, which now supports discovery-hinted protocols to bypass detection. * The `logical` and `endpoint` modules are removed in favor of protocol-specific modules.
1 parent 922d9dd commit 91bbbd4

File tree

32 files changed

+2488
-2523
lines changed

32 files changed

+2488
-2523
lines changed

linkerd/app/gateway/src/http.rs

Lines changed: 158 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,169 @@
1-
use self::gateway::NewHttpGateway;
2-
use super::OutboundHttp;
3-
use linkerd_app_core::{
4-
identity, io,
5-
proxy::{api_resolve::Metadata, core::Resolve, http},
6-
svc,
7-
transport::{ClientAddr, Local},
8-
Error, Infallible,
1+
use super::{server::Http, Gateway};
2+
use inbound::{GatewayAddr, GatewayDomainInvalid};
3+
use linkerd_app_core::{identity, io, profiles, proxy::http, svc, tls, transport::addrs::*, Error};
4+
use linkerd_app_inbound as inbound;
5+
use linkerd_app_outbound as outbound;
6+
use std::{
7+
cmp::{Eq, PartialEq},
8+
fmt::Debug,
9+
hash::Hash,
910
};
10-
use linkerd_app_outbound::{self as outbound, Outbound};
1111

1212
mod gateway;
1313
#[cfg(test)]
1414
mod tests;
1515

16-
pub use linkerd_app_core::proxy::http::*;
16+
pub(crate) use self::gateway::NewHttpGateway;
1717

18-
/// Builds an outbound HTTP stack.
18+
/// Target for outbound HTTP gateway stacks.
19+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
20+
pub struct Target<T = ()> {
21+
addr: GatewayAddr,
22+
target: outbound::http::Logical,
23+
version: http::Version,
24+
parent: T,
25+
}
26+
27+
/// Implements `svc::router::SelectRoute` for outbound HTTP requests. An
28+
/// `OutboundHttp` target is returned for each request using the request's HTTP
29+
/// version.
1930
///
20-
/// A gateway-specififc module is inserted to requests from looping through
21-
/// gateways. Discovery errors are lifted into the HTTP stack so that individual
22-
/// requests are failed with an HTTP-level error repsonse.
23-
pub(super) fn stack<O, R>(
24-
local_id: identity::LocalId,
25-
outbound: Outbound<O>,
26-
resolve: R,
27-
) -> svc::ArcNewService<
28-
OutboundHttp,
29-
impl svc::Service<
30-
http::Request<http::BoxBody>,
31-
Response = http::Response<http::BoxBody>,
32-
Error = Error,
33-
Future = impl Send,
34-
>,
35-
>
31+
/// The request's HTTP version may not match the target's original HTTP version
32+
/// when proxies use HTTP/2 to transport HTTP/1 requests.
33+
#[derive(Clone, Debug)]
34+
struct ByRequestVersion<T>(Target<T>);
35+
36+
impl Gateway {
37+
/// Wrap the provided outbound HTTP stack with an HTTP server, inbound
38+
/// authorization, and gateway request routing.
39+
pub fn http<T, I, N, NSvc>(&self, inner: N) -> svc::Stack<svc::ArcNewTcp<Http<T>, I>>
40+
where
41+
// Target describing an inbound gateway connection.
42+
T: svc::Param<GatewayAddr>,
43+
T: svc::Param<OrigDstAddr>,
44+
T: svc::Param<Remote<ClientAddr>>,
45+
T: svc::Param<tls::ConditionalServerTls>,
46+
T: svc::Param<tls::ClientId>,
47+
T: svc::Param<inbound::policy::AllowPolicy>,
48+
T: svc::Param<profiles::LookupAddr>,
49+
T: Clone + Send + Sync + Unpin + 'static,
50+
// Server-side socket.
51+
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
52+
I: Send + Unpin + 'static,
53+
// HTTP outbound stack.
54+
N: svc::NewService<Target, Service = NSvc>,
55+
N: Clone + Send + Sync + Unpin + 'static,
56+
NSvc: svc::Service<
57+
http::Request<http::BoxBody>,
58+
Response = http::Response<http::BoxBody>,
59+
Error = Error,
60+
>,
61+
NSvc: Send + Unpin + 'static,
62+
NSvc::Future: Send + 'static,
63+
{
64+
let http = svc::stack(inner)
65+
// Discard `T` and its associated client-specific metadata.
66+
.push_map_target(Target::discard_parent)
67+
// Add headers to prevent loops.
68+
.push(NewHttpGateway::layer(identity::LocalId(
69+
self.inbound.identity().name().clone(),
70+
)))
71+
.push_on_service(svc::LoadShed::layer())
72+
.lift_new()
73+
// After protocol-downgrade, we need to build an inner stack for
74+
// each request-level HTTP version.
75+
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
76+
ByRequestVersion(t.clone())
77+
}))
78+
// Only permit gateway traffic to endpoints for which we have
79+
// discovery information.
80+
.push_filter(
81+
|(_, parent): (_, Http<T>)| -> Result<_, GatewayDomainInvalid> {
82+
let target = {
83+
let profile = svc::Param::<Option<profiles::Receiver>>::param(&parent)
84+
.ok_or(GatewayDomainInvalid)?;
85+
86+
if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() {
87+
outbound::http::Logical::Route(addr, profile)
88+
} else if let Some((addr, metadata)) = profile.endpoint() {
89+
outbound::http::Logical::Forward(Remote(ServerAddr(addr)), metadata)
90+
} else {
91+
return Err(GatewayDomainInvalid);
92+
}
93+
};
94+
95+
Ok(Target {
96+
target,
97+
addr: (*parent).param(),
98+
version: svc::Param::param(&parent),
99+
parent: (**parent).clone(),
100+
})
101+
},
102+
)
103+
// Authorize requests to the gateway.
104+
.push(self.inbound.authorize_http());
105+
106+
self.inbound
107+
.clone()
108+
.with_stack(http.into_inner())
109+
// Teminates HTTP connections.
110+
// XXX Sets an identity header -- this should probably not be done
111+
// in the gateway, though the value will be stripped by meshed
112+
// servers.
113+
.push_http_server()
114+
.into_stack()
115+
}
116+
}
117+
118+
// === impl ByRequestVersion ===
119+
120+
impl<B, T: Clone> svc::router::SelectRoute<http::Request<B>> for ByRequestVersion<T> {
121+
type Key = Target<T>;
122+
type Error = http::version::Unsupported;
123+
124+
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
125+
let mut t = self.0.clone();
126+
t.version = req.version().try_into()?;
127+
Ok(t)
128+
}
129+
}
130+
131+
// === impl Target ===
132+
133+
impl<T> Target<T> {
134+
fn discard_parent(self) -> Target {
135+
Target {
136+
addr: self.addr,
137+
target: self.target,
138+
version: self.version,
139+
parent: (),
140+
}
141+
}
142+
}
143+
144+
impl<T> svc::Param<GatewayAddr> for Target<T> {
145+
fn param(&self) -> GatewayAddr {
146+
self.addr.clone()
147+
}
148+
}
149+
150+
impl<T> svc::Param<http::Version> for Target<T> {
151+
fn param(&self) -> http::Version {
152+
self.version
153+
}
154+
}
155+
156+
impl<T> svc::Param<tls::ClientId> for Target<T>
36157
where
37-
O: Clone + Send + Sync + Unpin + 'static,
38-
O: svc::MakeConnection<outbound::tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
39-
O::Connection: Send + Unpin,
40-
O::Future: Send + Unpin + 'static,
41-
R: Resolve<outbound::http::Concrete, Endpoint = Metadata, Error = Error>,
158+
T: svc::Param<tls::ClientId>,
42159
{
43-
let endpoint = outbound.push_tcp_endpoint().push_http_endpoint();
44-
endpoint
45-
.clone()
46-
.push_http_concrete(resolve)
47-
.push_http_logical()
48-
.into_stack()
49-
.push_switch(Ok::<_, Infallible>, endpoint.into_stack())
50-
.push(NewHttpGateway::layer(local_id))
51-
.push(svc::ArcNewService::layer())
52-
.check_new::<OutboundHttp>()
53-
.into_inner()
160+
fn param(&self) -> tls::ClientId {
161+
self.parent.param()
162+
}
163+
}
164+
165+
impl<T> svc::Param<outbound::http::Logical> for Target<T> {
166+
fn param(&self) -> outbound::http::Logical {
167+
self.target.clone()
168+
}
54169
}

0 commit comments

Comments
 (0)