Skip to content

Commit ebb0496

Browse files
authored
outbound: Fix incorrect l5d-proxy-connection logs (#2344)
We've received reports of proxies inexplicibly emiting log lines including "Received unmeshed response with l5d-proxy-connection set". These messages may arise when the endpoint stack returns a synthesized response. Furthermore, we have a note in the code explaining that this connection closure logic should not apply to load balanced requests, though it currently is. This change fixes both of these issues: * The proxy_connection_close module is renamed to handle_proxy_error_headers. * It is now only applied in the endpoint stack. It doesn't make any sense for it to be applied in the server stack, since we'll already have cleared any headers set by peers. Removing this module prevents the application of teardown logic on synthetic responses. * `NewHandleProxyErrorHeaders` is now configured by a `Closable` parameter that determines whether teardown logic should be applied. This parameter is only enabled when forwarding to a single endpoint. No teardown logic is applied when load balancing. * In a future change, we should stop emitting l5d-proxy-connection when synthesizing outbound responses.
1 parent 818c17c commit ebb0496

File tree

8 files changed

+285
-273
lines changed

8 files changed

+285
-273
lines changed

linkerd/app/outbound/src/http.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use self::{
2-
proxy_connection_close::ProxyConnectionClose, require_id_header::NewRequireIdentity,
3-
strip_proxy_error::NewStripProxyError,
4-
};
1+
use self::require_id_header::NewRequireIdentity;
52
use crate::Outbound;
63
use linkerd_app_core::{
74
proxy::{
@@ -17,12 +14,11 @@ use tokio::sync::watch;
1714

1815
pub mod concrete;
1916
mod endpoint;
17+
mod handle_proxy_error_headers;
2018
pub mod logical;
21-
mod proxy_connection_close;
2219
mod require_id_header;
2320
mod retry;
2421
mod server;
25-
mod strip_proxy_error;
2622

2723
pub use self::logical::{policy, profile, LogicalAddr, Routes};
2824
pub(crate) use self::require_id_header::IdentityRequired;

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A stack that (optionally) resolves a service to a set of endpoint replicas
22
//! and distributes HTTP requests among them.
33
4-
use super::{balance, client};
4+
use super::{balance, client, handle_proxy_error_headers};
55
use crate::{http, stack_labels, Outbound};
66
use linkerd_app_core::{
77
metrics, profiles,
@@ -41,6 +41,7 @@ pub struct Endpoint<T> {
4141
is_local: bool,
4242
metadata: Metadata,
4343
parent: T,
44+
close_server_connection_on_remote_proxy_error: bool,
4445
}
4546

4647
/// A target configuring a load balancer stack.
@@ -129,6 +130,10 @@ impl<N> Outbound<N> {
129130
metadata,
130131
is_local,
131132
parent: target.parent,
133+
// We don't close server-side connections when we
134+
// get `l5d-proxy-connection: close` response headers
135+
// going through the balancer.
136+
close_server_connection_on_remote_proxy_error: false,
132137
}
133138
}
134139
})
@@ -158,6 +163,7 @@ impl<N> Outbound<N> {
158163
addr,
159164
metadata,
160165
parent,
166+
close_server_connection_on_remote_proxy_error: true,
161167
}
162168
}),
163169
})
@@ -220,6 +226,14 @@ impl<T> svc::Param<Option<http::AuthorityOverride>> for Endpoint<T> {
220226
}
221227
}
222228

229+
impl<T> svc::Param<handle_proxy_error_headers::CloseServerConnection> for Endpoint<T> {
230+
fn param(&self) -> handle_proxy_error_headers::CloseServerConnection {
231+
handle_proxy_error_headers::CloseServerConnection(
232+
self.close_server_connection_on_remote_proxy_error,
233+
)
234+
}
235+
}
236+
223237
impl<T> svc::Param<transport::labels::Key> for Endpoint<T>
224238
where
225239
T: svc::Param<Option<http::uri::Authority>>,

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
//! A stack that sends requests to an HTTP endpoint.
22
3-
use super::{NewRequireIdentity, NewStripProxyError, ProxyConnectionClose};
3+
use super::{
4+
handle_proxy_error_headers::{self, NewHandleProxyErrorHeaders},
5+
NewRequireIdentity,
6+
};
47
use crate::{tcp::tagged_transport, Outbound};
58
use linkerd_app_core::{
69
classify, config, errors, http_tracing, metrics,
@@ -87,9 +90,10 @@ impl<N> Outbound<N> {
8790
pub fn push_http_endpoint<T, B, NSvc>(self) -> Outbound<svc::ArcNewHttp<T, B>>
8891
where
8992
// Http endpoint target.
90-
T: svc::Param<http::client::Settings>,
9193
T: svc::Param<Remote<ServerAddr>>,
94+
T: svc::Param<http::client::Settings>,
9295
T: svc::Param<Option<http::AuthorityOverride>>,
96+
T: svc::Param<handle_proxy_error_headers::CloseServerConnection>,
9397
T: svc::Param<metrics::EndpointLabels>,
9498
T: svc::Param<tls::ConditionalClientTls>,
9599
T: tap::Inspect,
@@ -119,17 +123,13 @@ impl<N> Outbound<N> {
119123
// actively polled.
120124
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
121125
.push_new_reconnect(backoff)
122-
// Set the TLS status on responses so that the stack can detect whether the request
123-
// was sent over a meshed connection.
124-
.push_http_response_insert_target::<tls::ConditionalClientTls>()
125126
.push(svc::NewMapErr::layer_from_target::<EndpointError, _>())
126-
// If the outbound proxy is not configured to emit headers, then strip the
127-
// `l5d-proxy-errors` header if set by the peer.
128-
.push(NewStripProxyError::layer(config.emit_headers))
129-
// Tear down server connections when a peer proxy generates an error.
130-
// TODO(ver) this should only be honored when forwarding and not when the connection
131-
// is part of a balancer.
132-
.push(ProxyConnectionClose::layer())
127+
.push_on_service(svc::MapErr::layer_boxed())
128+
// Tear down server connections when a peer proxy generates a
129+
// response with the `l5d-proxy-connection: close` header. This
130+
// is only done when the `Closable` parameter is set to true.
131+
// This module always strips error headers from responses.
132+
.push(NewHandleProxyErrorHeaders::layer())
133133
// Handle connection-level errors eagerly so that we can report 5XX failures in tap
134134
// and metrics. HTTP error metrics are not incremented here so that errors are not
135135
// double-counted--i.e., endpoint metrics track these responses and error metrics

linkerd/app/outbound/src/http/endpoint/tests.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,12 @@ impl svc::Param<Remote<ServerAddr>> for Endpoint {
256256
}
257257
}
258258

259+
impl svc::Param<http::handle_proxy_error_headers::CloseServerConnection> for Endpoint {
260+
fn param(&self) -> http::handle_proxy_error_headers::CloseServerConnection {
261+
http::handle_proxy_error_headers::CloseServerConnection(true)
262+
}
263+
}
264+
259265
impl svc::Param<tls::ConditionalClientTls> for Endpoint {
260266
fn param(&self) -> tls::ConditionalClientTls {
261267
tls::ConditionalClientTls::None(tls::NoClientTls::Disabled)
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
use futures::prelude::*;
2+
use linkerd_app_core::{
3+
errors::respond::{L5D_PROXY_CONNECTION, L5D_PROXY_ERROR},
4+
proxy::http::ClientHandle,
5+
svc, tls,
6+
};
7+
use std::{
8+
future::Future,
9+
pin::Pin,
10+
task::{Context, Poll},
11+
};
12+
use tracing::debug;
13+
14+
#[derive(Copy, Clone, Debug)]
15+
pub struct CloseServerConnection(pub bool);
16+
17+
/// Close the accepted connection if the response from a peer proxy has the
18+
/// `l5d-proxy-connection: close` header. This means the peer proxy encountered
19+
/// an inbound connection error with its application and therefore the accepted
20+
/// connection should be torn down.
21+
#[derive(Clone, Debug)]
22+
pub struct NewHandleProxyErrorHeaders<X, N> {
23+
inner: N,
24+
extract: X,
25+
}
26+
27+
#[derive(Clone, Debug)]
28+
pub struct HandleProxyErrorHeaders<S> {
29+
inner: S,
30+
closable: bool,
31+
}
32+
33+
#[pin_project::pin_project]
34+
#[derive(Debug)]
35+
pub struct ResponseFuture<F> {
36+
#[pin]
37+
inner: F,
38+
client: ClientHandle,
39+
closable: bool,
40+
}
41+
42+
impl<X: Clone, N> NewHandleProxyErrorHeaders<X, N> {
43+
fn new(extract: X, inner: N) -> Self {
44+
Self { extract, inner }
45+
}
46+
47+
pub fn layer_via(extract: X) -> impl svc::layer::Layer<N, Service = Self> + Clone {
48+
svc::layer::mk(move |inner| Self::new(extract.clone(), inner))
49+
}
50+
}
51+
52+
impl<N> NewHandleProxyErrorHeaders<(), N> {
53+
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone {
54+
Self::layer_via(())
55+
}
56+
}
57+
58+
impl<T, X, N> svc::NewService<T> for NewHandleProxyErrorHeaders<X, N>
59+
where
60+
X: svc::ExtractParam<CloseServerConnection, T>,
61+
X: svc::ExtractParam<tls::ConditionalClientTls, T>,
62+
N: svc::NewService<T>,
63+
{
64+
type Service = HandleProxyErrorHeaders<N::Service>;
65+
66+
#[inline]
67+
fn new_service(&self, target: T) -> Self::Service {
68+
let CloseServerConnection(closable) = self.extract.extract_param(&target);
69+
let is_meshed = matches!(
70+
self.extract.extract_param(&target),
71+
tls::ConditionalClientTls::Some(_)
72+
);
73+
let inner = self.inner.new_service(target);
74+
HandleProxyErrorHeaders {
75+
inner,
76+
closable: closable && is_meshed,
77+
}
78+
}
79+
}
80+
81+
impl<S, A, B> svc::Service<http::Request<A>> for HandleProxyErrorHeaders<S>
82+
where
83+
S: svc::Service<http::Request<A>, Response = http::Response<B>>,
84+
S::Response: Send,
85+
S::Future: Send + 'static,
86+
B: Send + 'static,
87+
{
88+
type Response = S::Response;
89+
type Error = S::Error;
90+
type Future = ResponseFuture<S::Future>;
91+
92+
#[inline]
93+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94+
self.inner.poll_ready(cx)
95+
}
96+
97+
#[inline]
98+
fn call(&mut self, req: http::Request<A>) -> Self::Future {
99+
let client = req
100+
.extensions()
101+
.get::<ClientHandle>()
102+
.cloned()
103+
.expect("missing client handle");
104+
let inner = self.inner.call(req);
105+
ResponseFuture {
106+
inner,
107+
client,
108+
closable: self.closable,
109+
}
110+
}
111+
}
112+
113+
impl<B, F: TryFuture<Ok = http::Response<B>>> Future for ResponseFuture<F> {
114+
type Output = Result<http::Response<B>, F::Error>;
115+
116+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117+
let this = self.project();
118+
let mut rsp = futures::ready!(this.inner.try_poll(cx))?;
119+
if update_response(&mut rsp, *this.closable) {
120+
// Signal that the proxy's server-side connection should be
121+
// terminated. This handles the remote error as if the local proxy
122+
// encountered an error.
123+
debug!("Closing server connection");
124+
this.client.close.close();
125+
}
126+
Poll::Ready(Ok(rsp))
127+
}
128+
}
129+
130+
fn update_response<B>(rsp: &mut http::Response<B>, closable: bool) -> bool {
131+
// Clear the headers.
132+
let hdr = rsp.headers_mut().remove(L5D_PROXY_CONNECTION);
133+
let err = rsp.headers_mut().get(L5D_PROXY_ERROR);
134+
debug!(
135+
error = err
136+
.and_then(|v| v.to_str().ok())
137+
.map(tracing::field::display),
138+
"Remote proxy error"
139+
);
140+
141+
if !closable {
142+
return false;
143+
}
144+
145+
static CLOSE: http::HeaderValue = http::HeaderValue::from_static("close");
146+
if hdr.as_ref() != Some(&CLOSE) {
147+
return false;
148+
}
149+
150+
if rsp.version() == http::Version::HTTP_11 {
151+
// If the response is HTTP/1.1, we need to send a Connection: close
152+
// header to tell the application this connection is being closed.
153+
rsp.headers_mut().insert(
154+
http::header::CONNECTION,
155+
http::HeaderValue::from_static("close"),
156+
);
157+
}
158+
true
159+
}
160+
161+
#[cfg(test)]
162+
mod test {
163+
use super::*;
164+
use futures::future;
165+
use linkerd_app_core::{
166+
svc::{self, ServiceExt},
167+
Infallible,
168+
};
169+
use linkerd_tracing::test;
170+
use tokio::time;
171+
172+
impl<S> HandleProxyErrorHeaders<S> {
173+
fn for_test(closable: bool, inner: S) -> Self {
174+
Self { closable, inner }
175+
}
176+
}
177+
178+
#[tokio::test(flavor = "current_thread")]
179+
async fn connection_closes_after_meshed_response_header() {
180+
let _trace = test::trace_init();
181+
182+
// Build the client that should be closed after receiving a response
183+
// with the l5d-proxy-error header.
184+
let mut req = http::Request::builder()
185+
.uri("http://foo.example.com")
186+
.body(hyper::Body::default())
187+
.unwrap();
188+
let (handle, closed) = ClientHandle::new(([192, 0, 2, 3], 50000).into());
189+
req.extensions_mut().insert(handle);
190+
191+
let svc = HandleProxyErrorHeaders::for_test(
192+
true,
193+
svc::mk(|_: http::Request<hyper::Body>| {
194+
future::ok::<_, Infallible>(
195+
http::Response::builder()
196+
.status(http::StatusCode::BAD_GATEWAY)
197+
.header(L5D_PROXY_CONNECTION, "close")
198+
.body(hyper::Body::default())
199+
.unwrap(),
200+
)
201+
}),
202+
);
203+
204+
let rsp = svc.oneshot(req).await.expect("request must succeed");
205+
assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY);
206+
207+
// The client handle close future should fire.
208+
time::timeout(time::Duration::from_secs(10), closed)
209+
.await
210+
.expect("client handle must close");
211+
}
212+
213+
#[tokio::test(flavor = "current_thread")]
214+
async fn header_ignored_in_unmeshed_response_header() {
215+
let _trace = test::trace_init();
216+
217+
// Build the client that should be closed after receiving a response
218+
// with the l5d-proxy-error header.
219+
let mut req = http::Request::builder()
220+
.uri("http://foo.example.com")
221+
.body(hyper::Body::default())
222+
.unwrap();
223+
let (handle, closed) = ClientHandle::new(([192, 0, 2, 3], 50000).into());
224+
req.extensions_mut().insert(handle);
225+
226+
let svc = HandleProxyErrorHeaders::for_test(
227+
false,
228+
svc::mk(|_: http::Request<hyper::Body>| {
229+
future::ok::<_, Infallible>(
230+
http::Response::builder()
231+
.status(http::StatusCode::BAD_GATEWAY)
232+
.header(L5D_PROXY_CONNECTION, "close")
233+
.body(hyper::Body::default())
234+
.unwrap(),
235+
)
236+
}),
237+
);
238+
239+
let rsp = svc.oneshot(req).await.expect("request must succeed");
240+
assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY);
241+
242+
// The client handle close future should fire.
243+
tokio::select! {
244+
_ = time::sleep(time::Duration::from_secs(10)) => {},
245+
_ = closed => panic!("connection shouldn't close"),
246+
}
247+
}
248+
}

0 commit comments

Comments
 (0)