Skip to content

Commit ed394d9

Browse files
authored
outbound: Fix ingress-mode proxy (#2240)
In 91bbbd4, we added a `LoadShed` onto the ingress proxy. This causes errors while discovery is pending. This change removes the ingress `LoadShed` layer and documents the back-pressure and error handling semantics of the ingress stack. This change also adds a descriptive error wrapper onto the inbound server stack.
1 parent 2c91038 commit ed394d9

File tree

4 files changed

+156
-48
lines changed

4 files changed

+156
-48
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,7 @@ dependencies = [
956956
"tokio",
957957
"tokio-test",
958958
"tower",
959+
"tower-test",
959960
"tracing",
960961
]
961962

linkerd/app/inbound/src/http/server.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ use tracing::debug_span;
2020
#[derive(Copy, Clone, Debug)]
2121
struct ServerRescue;
2222

23+
#[derive(Debug, thiserror::Error)]
24+
#[error("client {client}: server: {dst}: {source}")]
25+
struct ServerError {
26+
client: Remote<ClientAddr>,
27+
dst: OrigDstAddr,
28+
#[source]
29+
source: Error,
30+
}
31+
2332
impl<H> Inbound<H> {
2433
/// Fails requests when the `HSvc`-typed inner service is not ready.
2534
pub fn push_http_server<T, I, HSvc>(self) -> Inbound<svc::ArcNewTcp<T, I>>
@@ -71,6 +80,8 @@ impl<H> Inbound<H> {
7180
// limit is reached.
7281
.push(svc::LoadShed::layer()),
7382
)
83+
.push(svc::NewMapErr::layer_from_target::<ServerError, _>())
84+
.push_on_service(svc::MapErr::layer_boxed())
7485
.push(rt.metrics.http_errors.to_layer())
7586
.push(ServerRescue::layer())
7687
.push_on_service(
@@ -96,6 +107,20 @@ impl<H> Inbound<H> {
96107
}
97108
}
98109

110+
impl<T> From<(&T, Error)> for ServerError
111+
where
112+
T: Param<OrigDstAddr>,
113+
T: Param<Remote<ClientAddr>>,
114+
{
115+
fn from((t, source): (&T, Error)) -> Self {
116+
Self {
117+
client: t.param(),
118+
dst: t.param(),
119+
source,
120+
}
121+
}
122+
}
123+
99124
// === impl ServerRescue ===
100125

101126
impl ServerRescue {

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,4 @@ linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
4343
parking_lot = "0.12"
4444
tokio = { version = "1", features = ["macros", "sync", "time"] }
4545
tokio-test = "0.4"
46+
tower-test = "0.4"

linkerd/app/outbound/src/ingress.rs

Lines changed: 129 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,9 @@ impl<N> Outbound<N> {
9393
/// This is only intended for Http configurations, where we assume all
9494
/// outbound traffic is HTTP and HTTP detection is **always** performed. If
9595
/// HTTP detection fails, we revert to using the provided `fallback` stack.
96-
//
97-
// Profile-based stacks are cached so that they can be reused across
98-
// multiple requests to the same logical destination (even if the
99-
// connections target individual endpoints in a service). No other caching
100-
// is employed here: per-endpoint stacks are uncached, and fallback stacks
101-
// are expected to be cached elsewhere, if necessary.
96+
///
97+
/// The inner stack is used to create a service for each HTTP request. This
98+
/// stack must handle its own caching.
10299
fn push_ingress<T, I, F, FSvc, NSvc>(self, fallback: F) -> Outbound<svc::ArcNewTcp<T, I>>
103100
where
104101
// Target type describing an outbound connection.
@@ -122,7 +119,7 @@ impl<N> Outbound<N> {
122119
NSvc: Send + Unpin + 'static,
123120
NSvc::Future: Send,
124121
{
125-
self.map_stack(|config, rt, discover| {
122+
self.map_stack(|config, rt, inner| {
126123
let detect_http = config.proxy.detect_http();
127124
let Config {
128125
proxy:
@@ -138,20 +135,21 @@ impl<N> Outbound<N> {
138135
// stack. Route requests without the header through the endpoint
139136
// stack.
140137
//
141-
// Stacks with an override are cached and reused. Endpoint stacks
142-
// are not.
143-
let http = discover
138+
// Errors are not handled gracefully by this stack -- they hit the
139+
// Hyper server.
140+
//
141+
// This stack creates one-off services for each request--so it is
142+
// important that the inner stack caches any state that should be
143+
// shared across requests.
144+
let http = inner
144145
.check_new_service::<Http<RequestTarget>, http::Request<http::BoxBody>>()
145146
.push_on_service(
146147
svc::layers()
147148
.push(http::BoxRequest::layer())
148149
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
149-
.push(svc::LoadShed::layer()),
150150
)
151151
.lift_new()
152-
.push(svc::NewOneshotRoute::layer_via(
153-
|t: &Http<T>| SelectTarget(t.clone()),
154-
))
152+
.push(svc::NewOneshotRoute::layer_via(|t: &Http<T>| SelectTarget(t.clone())))
155153
.check_new_service::<Http<T>, http::Request<_>>();
156154

157155
// HTTP detection is **always** performed. If detection fails, then we
@@ -183,6 +181,40 @@ impl<N> Outbound<N> {
183181
}
184182
}
185183

184+
// === impl SelectTarget ===
185+
186+
impl<B, T> svc::router::SelectRoute<http::Request<B>> for SelectTarget<T>
187+
where
188+
T: svc::Param<OrigDstAddr>,
189+
{
190+
type Key = Http<RequestTarget>;
191+
type Error = InvalidOverrideHeader;
192+
193+
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
194+
// Use either the override header or the original destination address.
195+
let target = http::authority_from_header(req, DST_OVERRIDE_HEADER)
196+
.map(|a| {
197+
NameAddr::from_authority_with_default_port(&a, 80)
198+
.map(RequestTarget::Named)
199+
.map_err(|_| InvalidOverrideHeader)
200+
})
201+
.transpose()?
202+
.unwrap_or_else(|| RequestTarget::Orig((*self.0).param()));
203+
204+
// Use the request's version.
205+
let version = match req.version() {
206+
::http::Version::HTTP_2 => http::Version::H2,
207+
::http::Version::HTTP_10 | ::http::Version::HTTP_11 => http::Version::Http1,
208+
_ => unreachable!("Only HTTP/1 and HTTP/2 are supported"),
209+
};
210+
211+
Ok(Http {
212+
version,
213+
parent: target,
214+
})
215+
}
216+
}
217+
186218
// === impl Http ===
187219

188220
impl<T> Param<http::Version> for Http<T> {
@@ -271,40 +303,6 @@ impl TryFrom<discover::Discovery<Http<RequestTarget>>> for Http<http::Logical> {
271303
}
272304
}
273305

274-
// === impl SelectTarget ===
275-
276-
impl<B, T> svc::router::SelectRoute<http::Request<B>> for SelectTarget<T>
277-
where
278-
T: svc::Param<OrigDstAddr>,
279-
{
280-
type Key = Http<RequestTarget>;
281-
type Error = InvalidOverrideHeader;
282-
283-
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
284-
// Use either the override header or the original destination address.
285-
let target = http::authority_from_header(req, DST_OVERRIDE_HEADER)
286-
.map(|a| {
287-
NameAddr::from_authority_with_default_port(&a, 80)
288-
.map(RequestTarget::Named)
289-
.map_err(|_| InvalidOverrideHeader)
290-
})
291-
.transpose()?
292-
.unwrap_or_else(|| RequestTarget::Orig((*self.0).param()));
293-
294-
// Use the request's version.
295-
let version = match req.version() {
296-
::http::Version::HTTP_2 => http::Version::H2,
297-
::http::Version::HTTP_10 | ::http::Version::HTTP_11 => http::Version::Http1,
298-
_ => unreachable!("Only HTTP/1 and HTTP/2 are supported"),
299-
};
300-
301-
Ok(Http {
302-
version,
303-
parent: target,
304-
})
305-
}
306-
}
307-
308306
// === impl Opaq ===
309307

310308
impl<T> std::ops::Deref for Opaq<T> {
@@ -343,3 +341,86 @@ where
343341
opaq::Logical::Forward(self.param(), Default::default())
344342
}
345343
}
344+
345+
#[cfg(test)]
346+
mod tests {
347+
use super::*;
348+
use svc::{NewService, ServiceExt};
349+
use tokio::{io::AsyncReadExt, io::AsyncWriteExt, time};
350+
use tower_test::mock;
351+
352+
/// The ingress stack must not require that inner HTTP stack is immediately
353+
/// ready.
354+
#[tokio::test(flavor = "current_thread")]
355+
async fn http_backpressure_ok() {
356+
let _trace = linkerd_tracing::test::trace_init();
357+
358+
// Create mocked inner services that are not ready.
359+
let (not_ready_http, mut http) = mock::pair();
360+
http.allow(0);
361+
let (not_ready_opaq, mut opaq) = mock::pair();
362+
opaq.allow(0);
363+
364+
let config = crate::test_util::default_config();
365+
let (runtime, _drain) = crate::test_util::runtime();
366+
let svc = Outbound::new(config, runtime)
367+
.with_stack(move |_: _| not_ready_http.clone())
368+
.push_ingress(move |_: _| not_ready_opaq.clone())
369+
.into_inner()
370+
.new_service(OrigDstAddr(([127, 0, 0, 1], 80).into()));
371+
372+
// Create a mocked IO stream that will be used to drive the service.
373+
let (mut client, server) = tokio::io::duplex(1000);
374+
let mut task = svc.oneshot(server);
375+
376+
tokio::select! {
377+
_ = client.write(b"GET / HTTP/1.1\r\n\r\nl5d-dst-override: foo\r\n\r\n") => {}
378+
_ = time::sleep(time::Duration::from_secs(1)) => panic!("write timed out"),
379+
_ = &mut task => panic!("task should not complete"),
380+
}
381+
let mut buf = bytes::BytesMut::with_capacity(1000);
382+
tokio::select! {
383+
_ = time::sleep(time::Duration::from_secs(10)) => {}
384+
_ = client.read_buf(&mut buf) => panic!("unexpected read"),
385+
_ = &mut task => panic!("task should not complete"),
386+
}
387+
}
388+
389+
/// The ingress stack must not require that inner opaque stack is immediately
390+
/// ready.
391+
#[tokio::test(flavor = "current_thread")]
392+
async fn test_opaq_backpressure_ok() {
393+
let _trace = linkerd_tracing::test::trace_init();
394+
time::pause(); // Run the test with a mocked clock.
395+
396+
// Create mocked inner services that are not ready.
397+
let (not_ready_http, mut http) = mock::pair();
398+
http.allow(0);
399+
let (not_ready_opaq, mut opaq) = mock::pair();
400+
opaq.allow(0);
401+
402+
let config = crate::test_util::default_config();
403+
let (runtime, _drain) = crate::test_util::runtime();
404+
let svc = Outbound::new(config, runtime)
405+
.with_stack(move |_: _| not_ready_http.clone())
406+
.push_ingress(move |_: _| not_ready_opaq.clone())
407+
.into_inner()
408+
.new_service(OrigDstAddr(([127, 0, 0, 1], 80).into()));
409+
410+
// Create a mocked IO stream that will be used to drive the service.
411+
let (mut client, server) = tokio::io::duplex(1000);
412+
let mut task = svc.oneshot(server);
413+
414+
tokio::select! {
415+
_ = client.write(b"foo.bar.baz/v1\r\n") => {}
416+
_ = time::sleep(time::Duration::from_secs(1)) => panic!("write timed out"),
417+
_ = &mut task => panic!("task should not complete"),
418+
}
419+
let mut buf = bytes::BytesMut::with_capacity(1000);
420+
tokio::select! {
421+
_ = time::sleep(time::Duration::from_secs(10)) => {}
422+
_ = client.read_buf(&mut buf) => panic!("unexpected read"),
423+
_ = &mut task => panic!("task should not complete"),
424+
}
425+
}
426+
}

0 commit comments

Comments
 (0)