Skip to content

Commit c7918cf

Browse files
authored
outbound: handle Opaque protocol hints on endpoints (#2237)
Currently, when the outbound proxy makes a direct connection prefixed with a `TransportHeader` in order to send HTTP traffic, it will always send a `SessionProtocol` hint with the HTTP version as part of the header. This instructs the inbound proxy to use that protocol, even if the target port has a ServerPolicy that marks that port as opaque, which can result in incorrect handling of that connection. See linkerd/linkerd2#9888 for details. In order to prevent this, linkerd/linkerd2-proxy-api#197 adds a new `ProtocolHint` value to the protobuf endpoint metadata message. This will allow the Destination controller to explicitly indicate to the outbound proxy that a given endpoint is known to handle all connections to a port as an opaque TCP stream, and that the proxy should not perform a protocol upgrade or send a `SessionProtocol` in the transport header. This branch updates the proxy to handle this new hint value, and adds tests that the outbound proxy behaves as expected. Along with linkerd/linkerd2#10301, this will fix linkerd/linkerd2#9888. I opened a new PR for this change rather than attempting to rebase my previous PR #2209, as it felt a bit easier to start with a new branch and just make the changes that were still relevant. Therefore, this closes #2209.
1 parent abab7fd commit c7918cf

File tree

8 files changed

+202
-78
lines changed

8 files changed

+202
-78
lines changed

linkerd/app/integration/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod client_policy;
2+
mod direct;
23
mod discovery;
34
mod identity;
45
mod orig_proto;

linkerd/app/integration/src/tests/direct.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::*;
22

33
#[tokio::test]
4-
async fn tagged_transport_http() {
4+
async fn h2_hinted() {
55
let _trace = trace_init();
66

77
// identity is always required for direct connections
@@ -39,6 +39,56 @@ async fn tagged_transport_http() {
3939
assert_eq!(client.get("/").await, "hello");
4040
}
4141

42+
/// Reproduces linkerd/linkerd2#9888. A proxy receives HTTP traffic direct
43+
/// traffic with a transport header, and the port is also in the
44+
/// `INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` env var.
45+
/// TODO(eliza): add a similar test where the policy on the opaque port is
46+
/// discovered from the policy controller.
47+
#[tokio::test]
48+
async fn opaque_hinted() {
49+
let _trace = trace_init();
50+
51+
// identity is always required for direct connections
52+
let in_svc_acct = "foo.ns1.serviceaccount.identity.linkerd.cluster.local";
53+
let in_identity = identity::Identity::new("foo-ns1", in_svc_acct.to_string());
54+
55+
let out_svc_acct = "bar.ns1.serviceaccount.identity.linkerd.cluster.local";
56+
let out_identity = identity::Identity::new("bar-ns1", out_svc_acct.to_string());
57+
58+
let srv = server::http1().route("/", "hello").run().await;
59+
let srv_addr = srv.addr;
60+
let dst = format!("opaque.test.svc.cluster.local:{}", srv_addr.port());
61+
62+
let (inbound, _profile_in) = {
63+
let id_svc = in_identity.service();
64+
let mut env = in_identity.env;
65+
env.put(
66+
app::env::ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION,
67+
srv_addr.port().to_string(),
68+
);
69+
let (proxy, profile) = mk_inbound(srv, id_svc, &dst).await;
70+
let proxy = proxy.run_with_test_env(env).await;
71+
(proxy, profile)
72+
};
73+
74+
let (outbound, _profile_out, _dst) = {
75+
let ctrl = controller::new();
76+
let dst = ctrl.destination_tx(dst);
77+
dst.send(
78+
controller::destination_add(srv_addr)
79+
.hint(controller::Hint::Opaque)
80+
.opaque_port(inbound.inbound.port())
81+
.identity(in_svc_acct),
82+
);
83+
let (proxy, profile) = mk_outbound(srv_addr, ctrl, out_identity).await;
84+
(proxy, profile, dst)
85+
};
86+
87+
let client = client::http1(outbound.outbound, "opaque.test.svc.cluster.local");
88+
89+
assert_eq!(client.get("/").await, "hello");
90+
}
91+
4292
async fn mk_inbound(
4393
srv: server::Listening,
4494
id: identity::Controller,
@@ -54,8 +104,7 @@ async fn mk_inbound(
54104
.controller(ctrl)
55105
.identity(id.run().await)
56106
.inbound(srv)
57-
.inbound_direct()
58-
.named("inbound");
107+
.inbound_direct();
59108
(proxy, profile)
60109
}
61110

@@ -73,7 +122,6 @@ async fn mk_outbound(
73122
.controller(ctrl)
74123
.identity(out_identity.service().run().await)
75124
.outbound_ip(srv_addr)
76-
.named("outbound")
77125
.run_with_test_env(out_identity.env)
78126
.await;
79127
(proxy, profile)

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,13 +408,22 @@ where
408408
match self.param() {
409409
http::Version::H2 => client::Settings::H2,
410410
http::Version::Http1 => match self.metadata.protocol_hint() {
411-
ProtocolHint::Unknown => client::Settings::Http1,
411+
// If the protocol hint is unknown or indicates that the
412+
// endpoint's proxy will treat connections as opaque, do not
413+
// perform a protocol upgrade to HTTP/2.
414+
ProtocolHint::Unknown | ProtocolHint::Opaque => client::Settings::Http1,
412415
ProtocolHint::Http2 => client::Settings::OrigProtoUpgrade,
413416
},
414417
}
415418
}
416419
}
417420

421+
impl<T> svc::Param<ProtocolHint> for Endpoint<T> {
422+
fn param(&self) -> ProtocolHint {
423+
self.metadata.protocol_hint()
424+
}
425+
}
426+
418427
// TODO(ver) move this into the endpoint stack?
419428
impl<T> tap::Inspect for Endpoint<T> {
420429
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use super::{
77
use crate::{tcp::tagged_transport, Outbound};
88
use linkerd_app_core::{
99
classify, config, errors, http_tracing, metrics,
10-
proxy::{http, tap},
10+
proxy::{api_resolve::ProtocolHint, http, tap},
1111
svc::{self, ExtractParam},
1212
tls,
1313
transport::{self, Remote, ServerAddr},
@@ -206,9 +206,19 @@ impl errors::HttpRescue<Error> for ClientRescue {
206206

207207
// === impl Connect ===
208208

209-
impl<T> svc::Param<Option<SessionProtocol>> for Connect<T> {
209+
impl<T> svc::Param<Option<SessionProtocol>> for Connect<T>
210+
where
211+
T: svc::Param<ProtocolHint>,
212+
{
210213
#[inline]
211214
fn param(&self) -> Option<SessionProtocol> {
215+
// The discovered protocol hint indicates that this endpoint will treat
216+
// all connections as opaque TCP streams. Don't send our detected
217+
// session protocol as part of a transport header.
218+
if self.inner.param() == ProtocolHint::Opaque {
219+
return None;
220+
}
221+
212222
match self.version {
213223
http::Version::Http1 => Some(SessionProtocol::Http1),
214224
http::Version::H2 => Some(SessionProtocol::Http2),

linkerd/app/outbound/src/http/endpoint/tests.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,19 @@ impl svc::Param<http::client::Settings> for Endpoint {
314314
match self.version {
315315
http::Version::H2 => http::client::Settings::H2,
316316
http::Version::Http1 => match self.hint {
317-
ProtocolHint::Unknown => http::client::Settings::Http1,
317+
ProtocolHint::Unknown | ProtocolHint::Opaque => http::client::Settings::Http1,
318318
ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade,
319319
},
320320
}
321321
}
322322
}
323323

324+
impl svc::Param<ProtocolHint> for Endpoint {
325+
fn param(&self) -> ProtocolHint {
326+
self.hint
327+
}
328+
}
329+
324330
impl tap::Inspect for Endpoint {
325331
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
326332
req.extensions().get::<http::ClientHandle>().map(|c| c.addr)

linkerd/app/outbound/src/tcp/tagged_transport.rs

Lines changed: 113 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ mod test {
155155
port_override: Option<u16>,
156156
authority: Option<http::uri::Authority>,
157157
server_id: Option<tls::ServerId>,
158+
proto: Option<SessionProtocol>,
158159
}
159160

160161
impl svc::Param<tls::ConditionalClientTls> for Endpoint {
@@ -197,7 +198,28 @@ mod test {
197198

198199
impl svc::Param<Option<SessionProtocol>> for Endpoint {
199200
fn param(&self) -> Option<SessionProtocol> {
200-
None
201+
self.proto.clone()
202+
}
203+
}
204+
205+
fn expect_header(
206+
header: TransportHeader,
207+
) -> impl Fn(Connect) -> futures::future::Ready<Result<(tokio_test::io::Mock, ConnectMeta), io::Error>>
208+
{
209+
move |ep| {
210+
let Remote(ServerAddr(sa)) = ep.addr;
211+
assert_eq!(sa.port(), 4143);
212+
assert!(ep.tls.is_some());
213+
let buf = header.encode_prefaced_buf().expect("Must encode");
214+
let io = tokio_test::io::Builder::new()
215+
.write(&buf[..])
216+
.write(b"hello")
217+
.build();
218+
let meta = tls::ConnectMeta {
219+
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
220+
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
221+
};
222+
future::ready(Ok::<_, io::Error>((io, meta)))
201223
}
202224
}
203225

@@ -230,26 +252,11 @@ mod test {
230252
let _trace = linkerd_tracing::test::trace_init();
231253

232254
let svc = TaggedTransport {
233-
inner: service_fn(|ep: Connect| {
234-
let Remote(ServerAddr(sa)) = ep.addr;
235-
assert_eq!(sa.port(), 4143);
236-
assert!(ep.tls.is_some());
237-
let hdr = TransportHeader {
238-
port: 4321,
239-
name: None,
240-
protocol: None,
241-
};
242-
let buf = hdr.encode_prefaced_buf().expect("Must encode");
243-
let io = tokio_test::io::Builder::new()
244-
.write(&buf[..])
245-
.write(b"hello")
246-
.build();
247-
let meta = tls::ConnectMeta {
248-
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
249-
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
250-
};
251-
future::ready(Ok::<_, io::Error>((io, meta)))
252-
}),
255+
inner: service_fn(expect_header(TransportHeader {
256+
port: 4321,
257+
name: None,
258+
protocol: None,
259+
})),
253260
};
254261

255262
let e = Endpoint {
@@ -258,6 +265,7 @@ mod test {
258265
identity::Name::from_str("server.id").unwrap(),
259266
)),
260267
authority: None,
268+
proto: None,
261269
};
262270
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
263271
io.write_all(b"hello").await.expect("Write must succeed");
@@ -268,26 +276,11 @@ mod test {
268276
let _trace = linkerd_tracing::test::trace_init();
269277

270278
let svc = TaggedTransport {
271-
inner: service_fn(|ep: Connect| {
272-
let Remote(ServerAddr(sa)) = ep.addr;
273-
assert_eq!(sa.port(), 4143);
274-
assert!(ep.tls.is_some());
275-
let hdr = TransportHeader {
276-
port: 5555,
277-
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
278-
protocol: None,
279-
};
280-
let buf = hdr.encode_prefaced_buf().expect("Must encode");
281-
let io = tokio_test::io::Builder::new()
282-
.write(&buf[..])
283-
.write(b"hello")
284-
.build();
285-
let meta = tls::ConnectMeta {
286-
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
287-
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
288-
};
289-
future::ready(Ok::<_, io::Error>((io, meta)))
290-
}),
279+
inner: service_fn(expect_header(TransportHeader {
280+
port: 5555,
281+
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
282+
protocol: None,
283+
})),
291284
};
292285

293286
let e = Endpoint {
@@ -296,6 +289,7 @@ mod test {
296289
identity::Name::from_str("server.id").unwrap(),
297290
)),
298291
authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()),
292+
proto: None,
299293
};
300294
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
301295
io.write_all(b"hello").await.expect("Write must succeed");
@@ -306,26 +300,83 @@ mod test {
306300
let _trace = linkerd_tracing::test::trace_init();
307301

308302
let svc = TaggedTransport {
309-
inner: service_fn(|ep: Connect| {
310-
let Remote(ServerAddr(sa)) = ep.addr;
311-
assert_eq!(sa.port(), 4143);
312-
assert!(ep.tls.is_some());
313-
let hdr = TransportHeader {
314-
port: 4321,
315-
name: None,
316-
protocol: None,
317-
};
318-
let buf = hdr.encode_prefaced_buf().expect("Must encode");
319-
let io = tokio_test::io::Builder::new()
320-
.write(&buf[..])
321-
.write(b"hello")
322-
.build();
323-
let meta = tls::ConnectMeta {
324-
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
325-
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
326-
};
327-
future::ready(Ok::<_, io::Error>((io, meta)))
328-
}),
303+
inner: service_fn(expect_header(TransportHeader {
304+
port: 4321,
305+
name: None,
306+
protocol: None,
307+
})),
308+
};
309+
310+
let e = Endpoint {
311+
port_override: Some(4143),
312+
server_id: Some(tls::ServerId(
313+
identity::Name::from_str("server.id").unwrap(),
314+
)),
315+
authority: None,
316+
proto: None,
317+
};
318+
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
319+
io.write_all(b"hello").await.expect("Write must succeed");
320+
}
321+
322+
#[tokio::test(flavor = "current_thread")]
323+
async fn http_no_name() {
324+
let _trace = linkerd_tracing::test::trace_init();
325+
326+
let svc = TaggedTransport {
327+
inner: service_fn(expect_header(TransportHeader {
328+
port: 4321,
329+
name: None,
330+
protocol: Some(SessionProtocol::Http1),
331+
})),
332+
};
333+
334+
let e = Endpoint {
335+
port_override: Some(4143),
336+
server_id: Some(tls::ServerId(
337+
identity::Name::from_str("server.id").unwrap(),
338+
)),
339+
authority: None,
340+
proto: Some(SessionProtocol::Http1),
341+
};
342+
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
343+
io.write_all(b"hello").await.expect("Write must succeed");
344+
}
345+
346+
#[tokio::test(flavor = "current_thread")]
347+
async fn http_named_with_port() {
348+
let _trace = linkerd_tracing::test::trace_init();
349+
350+
let svc = TaggedTransport {
351+
inner: service_fn(expect_header(TransportHeader {
352+
port: 5555,
353+
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
354+
protocol: Some(SessionProtocol::Http1),
355+
})),
356+
};
357+
358+
let e = Endpoint {
359+
port_override: Some(4143),
360+
server_id: Some(tls::ServerId(
361+
identity::Name::from_str("server.id").unwrap(),
362+
)),
363+
authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()),
364+
proto: Some(SessionProtocol::Http1),
365+
};
366+
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
367+
io.write_all(b"hello").await.expect("Write must succeed");
368+
}
369+
370+
#[tokio::test(flavor = "current_thread")]
371+
async fn http_named_no_port() {
372+
let _trace = linkerd_tracing::test::trace_init();
373+
374+
let svc = TaggedTransport {
375+
inner: service_fn(expect_header(TransportHeader {
376+
port: 4321,
377+
name: None,
378+
protocol: Some(SessionProtocol::Http1),
379+
})),
329380
};
330381

331382
let e = Endpoint {
@@ -334,6 +385,7 @@ mod test {
334385
identity::Name::from_str("server.id").unwrap(),
335386
)),
336387
authority: None,
388+
proto: Some(SessionProtocol::Http1),
337389
};
338390
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
339391
io.write_all(b"hello").await.expect("Write must succeed");

0 commit comments

Comments
 (0)