Skip to content

Commit cbb3390

Browse files
authored
Use the connection's HTTP version in transport header (#1533)
Currently, when the outbound proxy communicates with a meshed endpoint that uses opaque transport (i.e. multi-cluster gateways), it *always* sets the gateway header's session protocol to HTTP/2, since the meshed endpoint supports HTTP/2 protocol upgrading. But the HTTP client may choose not to use HTTP/2 if the request includes the `Upgrade` header, as it does for WebSocket connections. In these cases, the transport header should indicate that the connection is HTTP/1. This change modifies the HTTP client to pass the used protocol version when building a connection. This value is then used to set the session protocol header when it is required.. Signed-off-by: Oliver Gould <[email protected]>
1 parent 28f176b commit cbb3390

File tree

12 files changed

+156
-49
lines changed

12 files changed

+156
-49
lines changed

linkerd/app/core/src/control.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl Config {
8383
svc::stack(ConnectTcp::new(self.connect.keepalive))
8484
.push(tls::Client::layer(identity))
8585
.push_connect_timeout(self.connect.timeout)
86+
.push_map_target(|(_version, target)| target)
8687
.push(self::client::layer())
8788
.push_on_service(svc::MapErr::layer(Into::into))
8889
.into_new_service()

linkerd/app/inbound/src/http/router.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl<C> Inbound<C> {
100100
.check_service::<Http>()
101101
.push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone()))
102102
.check_service::<Http>()
103+
.push_map_target(|(_version, target)| target)
103104
.push(http::client::layer(
104105
config.proxy.connect.h1_settings,
105106
config.proxy.connect.h2_settings,

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
3838
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = ["test-util"] }
3939
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
4040
parking_lot = "0.12"
41-
tokio = { version = "1", features = ["time", "macros"] }
41+
tokio = { version = "1", features = ["macros", "sync", "time"] }
4242
tokio-test = "0.4"

linkerd/app/outbound/src/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl<S> Outbound<S> {
218218
{
219219
let http = self
220220
.clone()
221-
.push_tcp_endpoint::<http::Endpoint>()
221+
.push_tcp_endpoint::<http::Connect>()
222222
.push_http_endpoint()
223223
.push_http_server()
224224
.into_inner();

linkerd/app/outbound/src/http.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ use linkerd_app_core::{
1919
profiles::{self, LogicalAddr},
2020
proxy::{api_resolve::ProtocolHint, tap},
2121
svc::Param,
22-
tls,
23-
transport_header::SessionProtocol,
24-
Addr, Conditional, CANONICAL_DST_HEADER,
22+
tls, Addr, Conditional, CANONICAL_DST_HEADER,
2523
};
2624
use std::{net::SocketAddr, str::FromStr};
2725

@@ -30,6 +28,8 @@ pub type Logical = crate::logical::Logical<Version>;
3028
pub type Concrete = crate::logical::Concrete<Version>;
3129
pub type Endpoint = crate::endpoint::Endpoint<Version>;
3230

31+
pub type Connect = self::endpoint::Connect<Endpoint>;
32+
3333
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
3434
struct Route {
3535
logical: Logical,
@@ -150,18 +150,6 @@ impl Param<client::Settings> for Endpoint {
150150
}
151151
}
152152

153-
impl Param<Option<SessionProtocol>> for Endpoint {
154-
fn param(&self) -> Option<SessionProtocol> {
155-
match self.protocol {
156-
Version::H2 => Some(SessionProtocol::Http2),
157-
Version::Http1 => match self.metadata.protocol_hint() {
158-
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
159-
ProtocolHint::Unknown => Some(SessionProtocol::Http1),
160-
},
161-
}
162-
}
163-
}
164-
165153
impl tap::Inspect for Endpoint {
166154
fn src_addr<B>(&self, req: &Request<B>) -> Option<SocketAddr> {
167155
req.extensions().get::<ClientHandle>().map(|c| c.addr)

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 134 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
use super::{NewRequireIdentity, NewStripProxyError, ProxyConnectionClose};
2-
use crate::Outbound;
2+
use crate::{tcp::opaque_transport, Outbound};
33
use linkerd_app_core::{
44
classify, config, errors, http_tracing, metrics,
55
proxy::{http, tap},
66
svc::{self, ExtractParam},
7-
tls, Error, Result, CANONICAL_DST_HEADER,
7+
tls,
8+
transport::{self, Remote, ServerAddr},
9+
transport_header::SessionProtocol,
10+
Error, Result, CANONICAL_DST_HEADER,
811
};
912

1013
#[derive(Copy, Clone, Debug)]
1114
struct ClientRescue {
1215
emit_headers: bool,
1316
}
1417

18+
#[derive(Clone, Debug)]
19+
pub struct Connect<T> {
20+
version: http::Version,
21+
inner: T,
22+
}
23+
1524
impl<C> Outbound<C> {
1625
pub fn push_http_endpoint<T, B>(self) -> Outbound<svc::ArcNewHttp<T, B>>
1726
where
@@ -23,7 +32,7 @@ impl<C> Outbound<C> {
2332
+ tap::Inspect,
2433
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
2534
B::Data: Send + 'static,
26-
C: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
35+
C: svc::MakeConnection<Connect<T>> + Clone + Send + Sync + Unpin + 'static,
2736
C::Connection: Send + Unpin,
2837
C::Metadata: Send + Unpin,
2938
C::Future: Send + Unpin + 'static,
@@ -39,7 +48,9 @@ impl<C> Outbound<C> {
3948
// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
4049
// is typically used (i.e. when communicating with other proxies); though
4150
// HTTP/1.x fallback is supported as needed.
42-
connect
51+
svc::stack(connect.into_inner().into_service())
52+
.check_service::<Connect<T>>()
53+
.push_map_target(|(version, inner)| Connect { version, inner })
4354
.push(http::client::layer(h1_settings, h2_settings))
4455
.push_on_service(svc::MapErr::layer(Into::<Error>::into))
4556
.check_service::<T>()
@@ -132,17 +143,72 @@ impl errors::HttpRescue<Error> for ClientRescue {
132143
}
133144
}
134145

146+
// === impl Connect ===
147+
148+
impl<T> svc::Param<Option<SessionProtocol>> for Connect<T> {
149+
#[inline]
150+
fn param(&self) -> Option<SessionProtocol> {
151+
match self.version {
152+
http::Version::Http1 => Some(SessionProtocol::Http1),
153+
http::Version::H2 => Some(SessionProtocol::Http2),
154+
}
155+
}
156+
}
157+
158+
impl<T: svc::Param<Remote<ServerAddr>>> svc::Param<Remote<ServerAddr>> for Connect<T> {
159+
#[inline]
160+
fn param(&self) -> Remote<ServerAddr> {
161+
self.inner.param()
162+
}
163+
}
164+
165+
impl<T: svc::Param<tls::ConditionalClientTls>> svc::Param<tls::ConditionalClientTls>
166+
for Connect<T>
167+
{
168+
#[inline]
169+
fn param(&self) -> tls::ConditionalClientTls {
170+
self.inner.param()
171+
}
172+
}
173+
174+
impl<T: svc::Param<Option<opaque_transport::PortOverride>>>
175+
svc::Param<Option<opaque_transport::PortOverride>> for Connect<T>
176+
{
177+
#[inline]
178+
fn param(&self) -> Option<opaque_transport::PortOverride> {
179+
self.inner.param()
180+
}
181+
}
182+
183+
impl<T: svc::Param<Option<http::AuthorityOverride>>> svc::Param<Option<http::AuthorityOverride>>
184+
for Connect<T>
185+
{
186+
#[inline]
187+
fn param(&self) -> Option<http::AuthorityOverride> {
188+
self.inner.param()
189+
}
190+
}
191+
192+
impl<T: svc::Param<transport::labels::Key>> svc::Param<transport::labels::Key> for Connect<T> {
193+
#[inline]
194+
fn param(&self) -> transport::labels::Key {
195+
self.inner.param()
196+
}
197+
}
198+
135199
#[cfg(test)]
136200
mod test {
137201
use super::*;
138-
use crate::{http, test_util::*, transport::addrs::*};
202+
use crate::{http, test_util::*};
203+
use ::http::header::{CONNECTION, UPGRADE};
139204
use linkerd_app_core::{
140205
io,
141206
proxy::api_resolve::Metadata,
142207
svc::{NewService, ServiceExt},
143208
Infallible,
144209
};
145210
use std::net::SocketAddr;
211+
use support::resolver::ProtocolHint;
146212

147213
static WAS_ORIG_PROTO: &str = "request-orig-proto";
148214

@@ -154,7 +220,7 @@ mod test {
154220
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);
155221

156222
let connect = support::connect()
157-
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_11));
223+
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_11));
158224

159225
// Build the outbound server
160226
let (rt, _shutdown) = runtime();
@@ -191,7 +257,7 @@ mod test {
191257
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2042);
192258

193259
let connect = support::connect()
194-
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
260+
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));
195261

196262
// Build the outbound server
197263
let (rt, _shutdown) = runtime();
@@ -230,7 +296,7 @@ mod test {
230296

231297
// Pretend the upstream is a proxy that supports proto upgrades...
232298
let connect = support::connect()
233-
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
299+
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));
234300

235301
// Build the outbound server
236302
let (rt, _shutdown) = runtime();
@@ -245,13 +311,7 @@ mod test {
245311
logical_addr: None,
246312
opaque_protocol: false,
247313
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
248-
metadata: Metadata::new(
249-
None,
250-
support::resolver::ProtocolHint::Http2,
251-
None,
252-
None,
253-
None,
254-
),
314+
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
255315
});
256316

257317
let req = http::Request::builder()
@@ -270,6 +330,63 @@ mod test {
270330
);
271331
}
272332

333+
#[tokio::test(flavor = "current_thread")]
334+
async fn orig_proto_skipped_on_http_upgrade() {
335+
let _trace = linkerd_tracing::test::trace_init();
336+
337+
let addr = SocketAddr::new([192, 0, 2, 41].into(), 2041);
338+
339+
// Pretend the upstream is a proxy that supports proto upgrades. The service needs to
340+
// support both HTTP/1 and HTTP/2 because an HTTP/2 connection is maintained by default and
341+
// HTTP/1 connections are created as-needed.
342+
let connect = support::connect().endpoint_fn_boxed(addr, |c: http::Connect| {
343+
serve(match svc::Param::param(&c) {
344+
Some(SessionProtocol::Http1) => ::http::Version::HTTP_11,
345+
Some(SessionProtocol::Http2) => ::http::Version::HTTP_2,
346+
None => unreachable!(),
347+
})
348+
});
349+
350+
// Build the outbound server
351+
let (rt, _shutdown) = runtime();
352+
let drain = rt.drain.clone();
353+
let stack = Outbound::new(default_config(), rt)
354+
.with_stack(connect)
355+
.push_http_endpoint::<_, http::BoxBody>()
356+
.into_stack()
357+
.push_on_service(http::BoxRequest::layer())
358+
// We need the server-side upgrade layer to annotate the request so that the client
359+
// knows that an HTTP upgrade is in progress.
360+
.push_on_service(svc::layer::mk(|svc| {
361+
http::upgrade::Service::new(svc, drain.clone())
362+
}))
363+
.into_inner();
364+
365+
let svc = stack.new_service(http::Endpoint {
366+
addr: Remote(ServerAddr(addr)),
367+
protocol: http::Version::Http1,
368+
logical_addr: None,
369+
opaque_protocol: false,
370+
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
371+
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
372+
});
373+
374+
let req = http::Request::builder()
375+
.version(::http::Version::HTTP_11)
376+
.uri("http://foo.example.com")
377+
.extension(http::ClientHandle::new(([192, 0, 2, 101], 40200).into()).0)
378+
// The request has upgrade headers
379+
.header(CONNECTION, "upgrade")
380+
.header(UPGRADE, "linkerdrocks")
381+
.body(hyper::Body::default())
382+
.unwrap();
383+
let rsp = svc.oneshot(req).await.unwrap();
384+
assert_eq!(rsp.status(), http::StatusCode::NO_CONTENT);
385+
// The request did NOT get a linkerd upgrade header.
386+
assert!(rsp.headers().get(WAS_ORIG_PROTO).is_none());
387+
assert_eq!(rsp.version(), ::http::Version::HTTP_11);
388+
}
389+
273390
/// Tests that the the HTTP endpoint stack ignores protocol upgrade hinting for HTTP/2 traffic.
274391
#[tokio::test(flavor = "current_thread")]
275392
async fn orig_proto_http2_noop() {
@@ -279,7 +396,7 @@ mod test {
279396

280397
// Pretend the upstream is a proxy that supports proto upgrades...
281398
let connect = support::connect()
282-
.endpoint_fn_boxed(addr, |_: http::Endpoint| serve(::http::Version::HTTP_2));
399+
.endpoint_fn_boxed(addr, |_: http::Connect| serve(::http::Version::HTTP_2));
283400

284401
// Build the outbound server
285402
let (rt, _shutdown) = runtime();
@@ -294,13 +411,7 @@ mod test {
294411
logical_addr: None,
295412
opaque_protocol: false,
296413
tls: tls::ConditionalClientTls::None(tls::NoClientTls::Disabled),
297-
metadata: Metadata::new(
298-
None,
299-
support::resolver::ProtocolHint::Http2,
300-
None,
301-
None,
302-
None,
303-
),
414+
metadata: Metadata::new(None, ProtocolHint::Http2, None, None, None),
304415
});
305416

306417
let req = http::Request::builder()

linkerd/proxy/http/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl<C, T, B> tower::Service<T> for MakeClient<C, B>
6767
where
6868
T: Clone + Send + Sync + 'static,
6969
T: Param<Settings>,
70-
C: MakeConnection<T> + Clone + Unpin + Send + Sync + 'static,
70+
C: MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
7171
C::Connection: Unpin + Send,
7272
C::Metadata: Send,
7373
C::Future: Unpin + Send + 'static,
@@ -133,7 +133,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
133133
impl<C, T, B> Service<http::Request<B>> for Client<C, T, B>
134134
where
135135
T: Clone + Send + Sync + 'static,
136-
C: MakeConnection<T> + Clone + Send + Sync + 'static,
136+
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
137137
C::Connection: Unpin + Send,
138138
C::Future: Unpin + Send + 'static,
139139
C::Error: Into<Error>,

linkerd/proxy/http/src/glue.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,15 @@ impl<C, T> HyperConnect<C, T> {
163163
pub(super) fn new(connect: C, target: T, absolute_form: bool) -> Self {
164164
HyperConnect {
165165
connect,
166-
absolute_form,
167166
target,
167+
absolute_form,
168168
}
169169
}
170170
}
171171

172172
impl<C, T> Service<hyper::Uri> for HyperConnect<C, T>
173173
where
174-
C: MakeConnection<T> + Clone + Send + Sync,
174+
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync,
175175
C::Connection: Unpin + Send,
176176
C::Future: Unpin + Send + 'static,
177177
T: Clone + Send + Sync,
@@ -186,7 +186,9 @@ where
186186

187187
fn call(&mut self, _dst: hyper::Uri) -> Self::Future {
188188
HyperConnectFuture {
189-
inner: self.connect.connect(self.target.clone()),
189+
inner: self
190+
.connect
191+
.connect((crate::Version::Http1, self.target.clone())),
190192
absolute_form: self.absolute_form,
191193
}
192194
}

linkerd/proxy/http/src/h1.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> +
6666
impl<C, T, B> Client<C, T, B>
6767
where
6868
T: Clone + Send + Sync + 'static,
69-
C: MakeConnection<T> + Clone + Send + Sync + 'static,
69+
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
7070
C::Connection: Unpin + Send,
7171
C::Future: Unpin + Send + 'static,
7272
B: hyper::body::HttpBody + Send + 'static,

linkerd/proxy/http/src/h2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Sen
6262

6363
impl<C, B, T> Service<T> for Connect<C, B>
6464
where
65-
C: MakeConnection<T>,
65+
C: MakeConnection<(crate::Version, T)>,
6666
C::Connection: Send + Unpin + 'static,
6767
C::Metadata: Send,
6868
C::Future: Send + 'static,
@@ -88,7 +88,7 @@ where
8888

8989
let connect = self
9090
.connect
91-
.connect(target)
91+
.connect((crate::Version::H2, target))
9292
.instrument(trace_span!("connect"));
9393

9494
Box::pin(

0 commit comments

Comments
 (0)