Skip to content

Commit 7a2f2b2

Browse files
authored
outbound: add stack tests for http (#765)
This branch adds some new stack tests for simple HTTP/1 and HTTP/2 meshed and unmeshed outbound requests. These tests also assert that TCP load balancers are not constructed when HTTP traffic is detected. The hope was that the panic on constructing TCP balancers would catch a bug that exists in #744, but that issue appears to be related to the cache instead. Still, it's probably worth having these tests. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 72f0e3d commit 7a2f2b2

File tree

4 files changed

+309
-18
lines changed

4 files changed

+309
-18
lines changed

linkerd/app/outbound/src/http/tests.rs

Lines changed: 240 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,31 @@ use crate::test_util::{
44
*,
55
};
66
use crate::Config;
7+
use hyper::{
8+
body::Buf,
9+
client::conn::{Builder as ClientBuilder, SendRequest},
10+
Body, Request, Response,
11+
};
712
use linkerd2_app_core::{
813
drain, metrics,
914
proxy::{identity::Name, tap},
1015
svc::{self, NewService},
11-
transport::{io, listen},
16+
transport::{
17+
io::{self, BoxedIo},
18+
listen,
19+
},
1220
Addr, Error,
1321
};
14-
use std::{net::SocketAddr, str::FromStr, time::Duration};
22+
use std::{
23+
net::SocketAddr,
24+
pin::Pin,
25+
str::FromStr,
26+
task::{Context, Poll},
27+
time::Duration,
28+
};
1529
use tokio::time;
1630
use tower::{Service, ServiceExt};
31+
use tracing::Instrument;
1732

1833
fn build_server<I>(
1934
cfg: Config,
@@ -53,11 +68,11 @@ where
5368
resolver.clone(),
5469
metrics.outbound.clone(),
5570
);
56-
let svc = crate::server::stack(
71+
let svc = crate::server::stack_with_tcp_balancer(
5772
&cfg,
5873
profiles,
59-
resolver,
6074
support::connect::NoRawTcp,
75+
NoTcpBalancer,
6176
router,
6277
metrics.outbound,
6378
None,
@@ -66,6 +81,32 @@ where
6681
(svc, drain_tx)
6782
}
6883

84+
#[derive(Clone, Debug)]
85+
struct NoTcpBalancer;
86+
87+
impl svc::NewService<crate::tcp::Concrete> for NoTcpBalancer {
88+
type Service = Self;
89+
fn new_service(&mut self, target: crate::tcp::Concrete) -> Self::Service {
90+
panic!(
91+
"no TCP load balancer should be created in this test!\n\ttarget = {:?}",
92+
target
93+
);
94+
}
95+
}
96+
97+
impl<I> svc::Service<I> for NoTcpBalancer {
98+
type Response = ();
99+
type Error = Error;
100+
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
101+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102+
unreachable!("no TCP load balancer should be created in this test!");
103+
}
104+
105+
fn call(&mut self, _: I) -> Self::Future {
106+
unreachable!("no TCP load balancer should be created in this test!");
107+
}
108+
}
109+
69110
#[tokio::test(flavor = "current_thread")]
70111
async fn profile_endpoint_propagates_conn_errors() {
71112
// This test asserts that when profile resolution returns an endpoint, and
@@ -120,7 +161,7 @@ async fn profile_endpoint_propagates_conn_errors() {
120161
tracing::info!(?res, "Server complete");
121162
res
122163
});
123-
let (mut client, conn) = hyper::client::conn::Builder::new()
164+
let (mut client, conn) = ClientBuilder::new()
124165
.handshake(client_io)
125166
.await
126167
.expect("Client must connect");
@@ -135,7 +176,7 @@ async fn profile_endpoint_propagates_conn_errors() {
135176
.await
136177
.expect("Client must not fail")
137178
.call(
138-
hyper::Request::builder()
179+
Request::builder()
139180
.header("Host", "foo.ns1.service.cluster.local")
140181
.body(hyper::Body::default())
141182
.unwrap(),
@@ -159,3 +200,196 @@ async fn profile_endpoint_propagates_conn_errors() {
159200
drop(client);
160201
drop(shutdown);
161202
}
203+
204+
#[tokio::test(flavor = "current_thread")]
205+
async fn unmeshed_http1_hello_world() {
206+
let mut server = hyper::server::conn::Http::new();
207+
server.http1_only(true);
208+
let client = ClientBuilder::new();
209+
unmeshed_hello_world(server, client).await;
210+
}
211+
212+
#[tokio::test(flavor = "current_thread")]
213+
async fn unmeshed_http2_hello_world() {
214+
let mut server = hyper::server::conn::Http::new();
215+
server.http2_only(true);
216+
let mut client = ClientBuilder::new();
217+
client.http2_only(true);
218+
unmeshed_hello_world(server, client).await;
219+
}
220+
221+
#[tokio::test(flavor = "current_thread")]
222+
async fn meshed_hello_world() {
223+
let _trace = support::trace_init();
224+
225+
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
226+
let addrs = listen::Addrs::new(
227+
([127, 0, 0, 1], 4140).into(),
228+
([127, 0, 0, 1], 666).into(),
229+
Some(ep1),
230+
);
231+
232+
let cfg = default_config(ep1);
233+
let id_name = Name::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
234+
.expect("hostname is invalid");
235+
let svc_name = profile::Name::from_str("foo.ns1.svc.example.com").unwrap();
236+
let meta = support::resolver::Metadata::new(
237+
Default::default(),
238+
support::resolver::ProtocolHint::Http2,
239+
Some(id_name.clone()),
240+
10_000,
241+
None,
242+
);
243+
244+
// Pretend the upstream is a proxy that supports proto upgrades...
245+
let mut server_settings = hyper::server::conn::Http::new();
246+
server_settings.http2_only(true);
247+
let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings));
248+
249+
let profiles = profile::resolver().profile(
250+
ep1,
251+
profile::Profile {
252+
name: Some(svc_name.clone()),
253+
..Default::default()
254+
},
255+
);
256+
257+
let resolver = support::resolver::<Addr, support::resolver::Metadata>();
258+
let mut dst = resolver.endpoint_tx((svc_name, ep1.port()));
259+
dst.add(Some((ep1, meta.clone())))
260+
.expect("still listening to resolution");
261+
262+
// Build the outbound server
263+
let (mut s, _shutdown) = build_server(cfg, profiles, resolver, connect);
264+
let server = s.new_service(addrs);
265+
let (mut client, bg) = connect_client(&mut ClientBuilder::new(), server).await;
266+
267+
let rsp = http_request(&mut client, Request::default()).await;
268+
assert_eq!(rsp.status(), http::StatusCode::OK);
269+
let mut body = hyper::body::aggregate(rsp.into_body())
270+
.await
271+
.expect("body shouldn't error");
272+
let mut buf = vec![0u8; body.remaining()];
273+
body.copy_to_slice(&mut buf[..]);
274+
assert_eq!(std::str::from_utf8(&buf[..]), Ok("Hello world!"));
275+
276+
drop(client);
277+
bg.await.unwrap();
278+
}
279+
280+
async fn connect_client<S>(
281+
client_settings: &mut ClientBuilder,
282+
server: S,
283+
) -> (
284+
hyper::client::conn::SendRequest<Body>,
285+
tokio::task::JoinHandle<()>,
286+
)
287+
where
288+
S: svc::Service<support::io::DuplexStream> + Send + Sync + 'static,
289+
S::Error: Into<Error>,
290+
S::Response: std::fmt::Debug + Send + Sync + 'static,
291+
S::Future: Send,
292+
{
293+
tracing::info!(settings = ?client_settings, "connecting client with");
294+
let (client_io, server_io) = support::io::duplex(4096);
295+
let proxy = server
296+
.oneshot(server_io)
297+
.map(|res| {
298+
let res = res.map_err(Into::into);
299+
tracing::info!(?res, "Server complete");
300+
res.expect("proxy failed");
301+
})
302+
.instrument(tracing::info_span!("proxy"));
303+
let (client, conn) = client_settings
304+
.handshake(client_io)
305+
.await
306+
.expect("Client must connect");
307+
let client_bg = conn
308+
.map(|res| {
309+
tracing::info!(?res, "Client background complete");
310+
res.expect("client bg task failed");
311+
})
312+
.instrument(tracing::info_span!("client_bg"));
313+
let bg = tokio::spawn(async move {
314+
tokio::join! {
315+
proxy,
316+
client_bg,
317+
};
318+
});
319+
(client, bg)
320+
}
321+
322+
#[tracing::instrument(skip(client))]
323+
async fn http_request(client: &mut SendRequest<Body>, request: Request<Body>) -> Response<Body> {
324+
let rsp = client
325+
.ready_and()
326+
.await
327+
.expect("Client must not fail")
328+
.call(request)
329+
.await
330+
.expect("Request must succeed");
331+
332+
tracing::info!(?rsp);
333+
334+
rsp
335+
}
336+
337+
async fn unmeshed_hello_world(
338+
server_settings: hyper::server::conn::Http,
339+
mut client_settings: ClientBuilder,
340+
) {
341+
let _trace = support::trace_init();
342+
343+
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
344+
let addrs = listen::Addrs::new(
345+
([127, 0, 0, 1], 4140).into(),
346+
([127, 0, 0, 1], 666).into(),
347+
Some(ep1),
348+
);
349+
350+
let cfg = default_config(ep1);
351+
// Build a mock "connector" that returns the upstream "server" IO.
352+
let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings));
353+
354+
let profiles = profile::resolver();
355+
let profile_tx = profiles.profile_tx(ep1);
356+
profile_tx.send(profile::Profile::default()).unwrap();
357+
358+
let resolver = support::resolver::<Addr, support::resolver::Metadata>();
359+
360+
// Build the outbound server
361+
let (mut s, _shutdown) = build_server(cfg, profiles, resolver, connect);
362+
let server = s.new_service(addrs);
363+
let (mut client, bg) = connect_client(&mut client_settings, server).await;
364+
365+
let rsp = http_request(&mut client, Request::default()).await;
366+
assert_eq!(rsp.status(), http::StatusCode::OK);
367+
let mut body = hyper::body::aggregate(rsp.into_body())
368+
.await
369+
.expect("body shouldn't error");
370+
let mut buf = vec![0u8; body.remaining()];
371+
body.copy_to_slice(&mut buf[..]);
372+
assert_eq!(std::str::from_utf8(&buf[..]), Ok("Hello world!"));
373+
374+
drop(client);
375+
bg.await.unwrap();
376+
}
377+
378+
#[tracing::instrument]
379+
fn hello_server(http: hyper::server::conn::Http) -> impl Fn(Endpoint) -> Result<BoxedIo, Error> {
380+
move |endpoint| {
381+
let span = tracing::info_span!("hello_server", ?endpoint);
382+
let _e = span.enter();
383+
tracing::info!("mock connecting");
384+
let (client_io, server_io) = support::io::duplex(4096);
385+
let hello_svc = hyper::service::service_fn(|request: Request<Body>| async move {
386+
tracing::info!(?request);
387+
Ok::<_, Error>(Response::new(Body::from("Hello world!")))
388+
});
389+
tokio::spawn(
390+
http.serve_connection(server_io, hello_svc)
391+
.in_current_span(),
392+
);
393+
Ok(BoxedIo::new(client_io))
394+
}
395+
}

linkerd/app/outbound/src/server.rs

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,59 @@ where
5353
P: profiles::GetProfile<SocketAddr> + Unpin + Clone + Send + Sync + 'static,
5454
P::Future: Unpin + Send,
5555
P::Error: Send,
56+
{
57+
let tcp_balance = tcp::balance::stack(&config.proxy, tcp_connect.clone(), resolve);
58+
stack_with_tcp_balancer(
59+
config,
60+
profiles,
61+
tcp_connect,
62+
tcp_balance,
63+
http_router,
64+
metrics,
65+
span_sink,
66+
drain,
67+
)
68+
}
69+
70+
pub fn stack_with_tcp_balancer<P, C, T, H, S, I>(
71+
config: &Config,
72+
profiles: P,
73+
tcp_connect: C,
74+
tcp_balance: T,
75+
http_router: H,
76+
metrics: metrics::Proxy,
77+
span_sink: Option<mpsc::Sender<oc::Span>>,
78+
drain: drain::Watch,
79+
) -> impl svc::NewService<
80+
listen::Addrs,
81+
Service = impl tower::Service<
82+
I,
83+
Response = (),
84+
Error = impl Into<Error>,
85+
Future = impl Send + 'static,
86+
> + Send
87+
+ 'static,
88+
> + Send
89+
+ 'static
90+
where
91+
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Unpin + Send + 'static,
92+
C: tower::Service<tcp::Endpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
93+
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
94+
C::Future: Unpin + Send,
95+
T: svc::NewService<tcp::Concrete> + Clone + Unpin + Send + 'static,
96+
T::Service: tower::Service<transport::io::PrefixedIo<transport::metrics::SensorIo<I>>, Response = (), Error = Error> + Unpin + Send + 'static,
97+
<T::Service as tower::Service<transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>>::Future: Unpin + Send + 'static,
98+
H: svc::NewService<http::Logical, Service = S> + Unpin + Clone + Send + Sync + 'static,
99+
S: tower::Service<
100+
http::Request<http::boxed::BoxBody>,
101+
Response = http::Response<http::boxed::BoxBody>,
102+
Error = Error,
103+
> + Send
104+
+ 'static,
105+
S::Future: Send,
106+
P: profiles::GetProfile<SocketAddr> + Unpin + Clone + Send + Sync + 'static,
107+
P::Future: Unpin + Send,
108+
P::Error: Send,
56109
{
57110
let ProxyConfig {
58111
server: ServerConfig { h2_settings, .. },
@@ -106,11 +159,7 @@ where
106159
.into_inner();
107160

108161
// Load balances TCP streams that cannot be decoded as HTTP.
109-
let tcp_balance = svc::stack(tcp::balance::stack(
110-
&config.proxy,
111-
tcp_connect.clone(),
112-
resolve,
113-
))
162+
let tcp_balance = svc::stack(tcp_balance)
114163
.push_map_target(tcp::Concrete::from)
115164
.push(profiles::split::layer())
116165
.check_new_service::<tcp::Logical, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()

linkerd/app/outbound/src/tcp/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ impl Into<Box<dyn FnMut(Endpoint) -> ConnectFuture + Send + 'static>> for Connec
804804
.read(b"world")
805805
.read_error(std::io::ErrorKind::ConnectionReset.into())
806806
.build();
807-
Box::pin(async move { Ok(io) })
807+
Box::pin(async move { Ok(io::BoxedIo::new(io)) })
808808
})
809809
}
810810
}

0 commit comments

Comments
 (0)