Skip to content

Commit f7a8ee9

Browse files
olix0rhawkw
andauthored
http: Teardown serverside connections on error (#747)
When the proxy encounters an error, it's usually the case that we want to gracefully teardown the serverside connection so that the application's client has an opportunity to re-resolve the endpoint before reconnecting. This change configures HTTP servers so that the error-handling layer initiates server shutdown when an error is not a request timeout. Socket errors, failfast errors, etc, are now met with a 502 Bad Gateway, as they were before; but after in-flight requests are responded to, the proxy's server closes its connection with the application client. Addresses linkerd/linkerd2#5209 Co-authored-by: Eliza Weisman <[email protected]>
1 parent 605fce9 commit f7a8ee9

File tree

15 files changed

+408
-167
lines changed

15 files changed

+408
-167
lines changed

Cargo.lock

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -732,9 +732,9 @@ dependencies = [
732732

733733
[[package]]
734734
name = "lazy_static"
735-
version = "1.3.0"
735+
version = "1.4.0"
736736
source = "registry+https://github.com/rust-lang/crates.io-index"
737-
checksum = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14"
737+
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
738738

739739
[[package]]
740740
name = "libc"
@@ -916,6 +916,7 @@ dependencies = [
916916
"bytes 0.5.4",
917917
"futures 0.3.5",
918918
"http 0.2.1",
919+
"hyper",
919920
"indexmap",
920921
"ipnet 1.0.0",
921922
"linkerd2-app-core",
@@ -979,7 +980,7 @@ dependencies = [
979980
"futures 0.3.5",
980981
"linkerd2-error",
981982
"linkerd2-stack",
982-
"parking_lot 0.11.0",
983+
"parking_lot",
983984
"tokio",
984985
"tower",
985986
"tracing",
@@ -1548,15 +1549,6 @@ dependencies = [
15481549
"tracing-subscriber",
15491550
]
15501551

1551-
[[package]]
1552-
name = "lock_api"
1553-
version = "0.3.4"
1554-
source = "registry+https://github.com/rust-lang/crates.io-index"
1555-
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
1556-
dependencies = [
1557-
"scopeguard",
1558-
]
1559-
15601552
[[package]]
15611553
name = "lock_api"
15621554
version = "0.4.1"
@@ -1808,39 +1800,15 @@ dependencies = [
18081800
"tonic-build",
18091801
]
18101802

1811-
[[package]]
1812-
name = "parking_lot"
1813-
version = "0.10.2"
1814-
source = "registry+https://github.com/rust-lang/crates.io-index"
1815-
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
1816-
dependencies = [
1817-
"lock_api 0.3.4",
1818-
"parking_lot_core 0.7.2",
1819-
]
1820-
18211803
[[package]]
18221804
name = "parking_lot"
18231805
version = "0.11.0"
18241806
source = "registry+https://github.com/rust-lang/crates.io-index"
18251807
checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733"
18261808
dependencies = [
18271809
"instant",
1828-
"lock_api 0.4.1",
1829-
"parking_lot_core 0.8.0",
1830-
]
1831-
1832-
[[package]]
1833-
name = "parking_lot_core"
1834-
version = "0.7.2"
1835-
source = "registry+https://github.com/rust-lang/crates.io-index"
1836-
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
1837-
dependencies = [
1838-
"cfg-if 0.1.10",
1839-
"cloudabi 0.0.3",
1840-
"libc",
1841-
"redox_syscall",
1842-
"smallvec",
1843-
"winapi 0.3.8",
1810+
"lock_api",
1811+
"parking_lot_core",
18441812
]
18451813

18461814
[[package]]
@@ -2522,9 +2490,9 @@ dependencies = [
25222490

25232491
[[package]]
25242492
name = "tokio"
2525-
version = "0.2.22"
2493+
version = "0.2.23"
25262494
source = "registry+https://github.com/rust-lang/crates.io-index"
2527-
checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
2495+
checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff"
25282496
dependencies = [
25292497
"bytes 0.5.4",
25302498
"fnv",
@@ -2537,7 +2505,7 @@ dependencies = [
25372505
"mio-named-pipes",
25382506
"mio-uds",
25392507
"num_cpus",
2540-
"parking_lot 0.10.2",
2508+
"parking_lot",
25412509
"pin-project-lite",
25422510
"signal-hook-registry",
25432511
"slab",
@@ -2578,9 +2546,9 @@ dependencies = [
25782546

25792547
[[package]]
25802548
name = "tokio-macros"
2581-
version = "0.2.5"
2549+
version = "0.2.6"
25822550
source = "registry+https://github.com/rust-lang/crates.io-index"
2583-
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
2551+
checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a"
25842552
dependencies = [
25852553
"proc-macro2",
25862554
"quote",
@@ -2825,7 +2793,7 @@ dependencies = [
28252793
"chrono",
28262794
"lazy_static",
28272795
"matchers",
2828-
"parking_lot 0.11.0",
2796+
"parking_lot",
28292797
"regex 1.3.9",
28302798
"serde",
28312799
"serde_json",

linkerd/app/core/src/admin/mod.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
//! * `/ready` -- returns 200 when the proxy is ready to participate in meshed traffic.
55
66
use crate::{
7-
proxy::http::{ClientAddr, SetClientAddr},
7+
proxy::http::{ClientHandle, SetClientHandle},
88
svc, trace,
99
transport::{io, tls},
1010
};
11-
use futures::{future, TryFutureExt};
11+
use futures::future;
1212
use http::StatusCode;
1313
use hyper::{Body, Request, Response};
1414
use linkerd2_error::{Error, Never};
@@ -98,8 +98,8 @@ impl<M> Admin<M> {
9898

9999
fn client_is_localhost<B>(req: &Request<B>) -> bool {
100100
req.extensions()
101-
.get::<ClientAddr>()
102-
.map(|a| a.as_ref().ip().is_loopback())
101+
.get::<ClientHandle>()
102+
.map(|a| a.addr.ip().is_loopback())
103103
.unwrap_or(false)
104104
}
105105
}
@@ -178,9 +178,20 @@ impl<M: FmtMetrics + Clone + Send + 'static> svc::Service<io::BoxedIo> for Serve
178178
// Since the `/proxy-log-level` controls access based on the
179179
// client's IP address, we wrap the service with a new service
180180
// that adds the remote IP as a request extension.
181-
let svc = SetClientAddr::new(meta.addrs.peer(), svc.clone());
181+
let (svc, closed) = SetClientHandle::new(meta.addrs.peer(), svc.clone());
182+
let mut conn = server.serve_connection(io, svc);
183+
184+
Box::pin(async move {
185+
tokio::select! {
186+
res = &mut conn => res?,
187+
() = closed => {
188+
Pin::new(&mut conn).graceful_shutdown();
189+
conn.await?;
190+
}
191+
}
182192

183-
Box::pin(server.serve_connection(io, svc).map_err(Into::into))
193+
Ok(())
194+
})
184195
}
185196
}
186197

linkerd/app/core/src/errors.rs

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd2_error::Error;
55
use linkerd2_error_metrics as metrics;
66
use linkerd2_error_respond as respond;
77
pub use linkerd2_error_respond::RespondLayer;
8-
use linkerd2_proxy_http::HasH2Reason;
8+
use linkerd2_proxy_http::{client_handle::Close, ClientHandle, HasH2Reason};
99
use linkerd2_timeout::{error::ResponseTimeout, FailFastError};
1010
use pin_project::pin_project;
1111
use std::pin::Pin;
@@ -51,10 +51,11 @@ pub enum Reason {
5151
#[derive(Copy, Clone, Debug)]
5252
pub struct NewRespond(());
5353

54-
#[derive(Copy, Clone, Debug)]
55-
pub enum Respond {
56-
Http1(http::Version),
57-
Http2 { is_grpc: bool },
54+
#[derive(Clone, Debug)]
55+
pub struct Respond {
56+
version: http::Version,
57+
is_grpc: bool,
58+
close: Option<Close>,
5859
}
5960

6061
#[pin_project(project = ResponseBodyProj)]
@@ -139,30 +140,39 @@ impl<ReqB, RspB: Default + hyper::body::HttpBody>
139140
type Respond = Respond;
140141

141142
fn new_respond(&self, req: &http::Request<ReqB>) -> Self::Respond {
143+
let close = req
144+
.extensions()
145+
.get::<ClientHandle>()
146+
.map(|h| h.close.clone());
142147
match req.version() {
143148
http::Version::HTTP_2 => {
144149
let is_grpc = req
145150
.headers()
146151
.get(http::header::CONTENT_TYPE)
147152
.and_then(|v| v.to_str().ok().map(|s| s.starts_with("application/grpc")))
148153
.unwrap_or(false);
149-
Respond::Http2 { is_grpc }
154+
Respond {
155+
is_grpc,
156+
close,
157+
version: http::Version::HTTP_2,
158+
}
150159
}
151-
version => Respond::Http1(version),
160+
version => Respond {
161+
version,
162+
close,
163+
is_grpc: false,
164+
},
152165
}
153166
}
154167
}
155168

156169
impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB>> for Respond {
157170
type Response = http::Response<ResponseBody<RspB>>;
158171

159-
fn respond(
160-
&self,
161-
reseponse: Result<http::Response<RspB>, Error>,
162-
) -> Result<Self::Response, Error> {
163-
match reseponse {
172+
fn respond(&self, res: Result<http::Response<RspB>, Error>) -> Result<Self::Response, Error> {
173+
match res {
164174
Ok(response) => Ok(response.map(|b| match *self {
165-
Respond::Http2 { is_grpc } if is_grpc == true => ResponseBody::Grpc {
175+
Respond { is_grpc: true, .. } => ResponseBody::Grpc {
166176
inner: b,
167177
trailers: None,
168178
},
@@ -171,33 +181,36 @@ impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB
171181
Err(error) => {
172182
warn!("Failed to proxy request: {}", error);
173183

174-
if let Respond::Http2 { is_grpc } = self {
184+
if self.version == http::Version::HTTP_2 {
175185
if let Some(reset) = error.h2_reason() {
176186
debug!(%reset, "Propagating HTTP2 reset");
177187
return Err(error);
178188
}
189+
}
179190

180-
if *is_grpc {
181-
let mut rsp = http::Response::builder()
182-
.version(http::Version::HTTP_2)
183-
.header(http::header::CONTENT_LENGTH, "0")
184-
.body(ResponseBody::default())
185-
.expect("app::errors response is valid");
186-
let code = set_grpc_status(&*error, rsp.headers_mut());
187-
debug!(?code, "Handling error with gRPC status");
188-
return Ok(rsp);
191+
// Gracefully teardown the server-side connection.
192+
if should_teardown_connection(&*error) {
193+
if let Some(c) = self.close.as_ref() {
194+
debug!("Closing server-side connection");
195+
c.close();
189196
}
190197
}
191198

192-
let version = match self {
193-
Respond::Http1(ref version) => version.clone(),
194-
Respond::Http2 { .. } => http::Version::HTTP_2,
195-
};
199+
if self.is_grpc {
200+
let mut rsp = http::Response::builder()
201+
.version(http::Version::HTTP_2)
202+
.header(http::header::CONTENT_LENGTH, "0")
203+
.body(ResponseBody::default())
204+
.expect("app::errors response is valid");
205+
let code = set_grpc_status(&*error, rsp.headers_mut());
206+
debug!(?code, "Handling error with gRPC status");
207+
return Ok(rsp);
208+
}
196209

197210
let status = http_status(&*error);
198-
debug!(%status, ?version, "Handling error with HTTP response");
211+
debug!(%status, version = ?self.version, "Handling error with HTTP response");
199212
Ok(http::Response::builder()
200-
.version(version)
213+
.version(self.version)
201214
.status(status)
202215
.header(http::header::CONTENT_LENGTH, "0")
203216
.body(ResponseBody::default())
@@ -207,6 +220,18 @@ impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB
207220
}
208221
}
209222

223+
fn should_teardown_connection(error: &(dyn std::error::Error + 'static)) -> bool {
224+
if error.is::<ResponseTimeout>() {
225+
false
226+
} else if error.is::<tower::timeout::error::Elapsed>() {
227+
false
228+
} else if let Some(e) = error.source() {
229+
should_teardown_connection(e)
230+
} else {
231+
true
232+
}
233+
}
234+
210235
fn http_status(error: &(dyn std::error::Error + 'static)) -> StatusCode {
211236
if let Some(HttpError { http, .. }) = error.downcast_ref::<HttpError>() {
212237
*http

linkerd/app/outbound/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ features = [
3030
]
3131

3232
[dev-dependencies]
33+
hyper = "0.13"
3334
ipnet = "1.0"
34-
linkerd2-io = { path = "../../io", features = ["tokio-test"] }
3535
linkerd2-app-test = { path = "../test" }
36+
linkerd2-io = { path = "../../io", features = ["tokio-test"] }
3637
tokio = { version = "0.2", features = ["full", "macros"]}
3738
tracing-futures = "0.2"

linkerd/app/outbound/src/http/logical.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,7 @@ where
123123
.push_switch(
124124
Logical::should_resolve,
125125
svc::stack(endpoint)
126-
.push_on_response(
127-
svc::layers()
128-
.push(svc::layer::mk(
129-
svc::stack::FailOnError::<std::io::Error, S>::new,
130-
))
131-
.box_http_request(),
132-
)
126+
.push_on_response(svc::layers().box_http_request())
133127
.push_map_target(Endpoint::from_logical(
134128
ReasonForNoPeerName::NotProvidedByServiceDiscovery,
135129
))

linkerd/app/outbound/src/http/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ pub mod endpoint;
22
pub mod logical;
33
mod require_identity_on_endpoint;
44

5+
#[cfg(test)]
6+
mod tests;
7+
58
use crate::tcp;
69
use indexmap::IndexMap;
710
pub use linkerd2_app_core::proxy::http::*;
811
use linkerd2_app_core::{
912
dst, profiles,
1013
proxy::{
1114
api_resolve::ProtocolHint,
12-
http::{self, CanOverrideAuthority, ClientAddr},
15+
http::{self, CanOverrideAuthority, ClientHandle},
1316
identity, tap,
1417
},
1518
transport::tls,
@@ -71,9 +74,7 @@ impl CanOverrideAuthority for Endpoint {
7174

7275
impl tap::Inspect for Endpoint {
7376
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
74-
req.extensions()
75-
.get::<ClientAddr>()
76-
.map(|c| c.as_ref().clone())
77+
req.extensions().get::<ClientHandle>().map(|c| c.addr)
7778
}
7879

7980
fn src_tls<'a, B>(

0 commit comments

Comments
 (0)