Skip to content

Commit d17b239

Browse files
authored
retry: retry requests with <=64KB bodies (#1017)
Linkerd's proxy does not currently support retries on requests with payloads. This precludes Linkerd from retrying gRPC requests, which is a substantial limitation. This PR adds support for retries on requests with bodies, if and only if the request has a `Content-length` header and the content length is <= 64 KB. In order to retry requests with payloads, we need to buffer the body data for that request so that it can be sent again if a retry is necessary. This is implemented by wrapping profile requests in a new `Body` type which lazily buffers each chunk of data polled from the inner `Body`. The buffered data is shared with a clone of the request, and when the original body is dropped, ownership of the buffered data is transferred to the clone. When the cloned request is sent, polling its body will yield the buffered data. If the server returns an error _before_ an entire streaming body has been read, the replay body will continue reading from the initial request body after playing back the buffered portion. Data is buffered by calling `Buf::copy_to_bytes` on each chunk of data. Although we call this method on an arbitrary `Buf` type, all data chunks in the proxy are actually `Bytes`, and their `copy_to_bytes` methods are therefore a cheap reference-count bump on the `Bytes`. After calling `copy_to_bytes`, we can clone the returned `Bytes` and store it in a vector. This allows us to buffer the body without actually copying the bytes --- we just increase the reference count on the original buffer. This buffering strategy also has the advantage of allowing us to write out the entire buffered body in one big `writev` call. Because we store the buffered body as a list of distinct buffers for each chunk, we can expand the buffered body to a large number of scatter-gather buffers in `Buf::bytes_vectored`. This should make replaying the body more efficient, as we don't have to make a separate `write` call for each chunk. I've also added several tests for the new buffering body. In particular, there are tests for a number of potential edge cases, including: - when a retry is started before the entire initial body has been read (i.e. if the server returns an error before the request completes), - when a retry body is cloned multiple times, including if the client's body has not yet completed, - dropping clones prior to completion Closes linkerd/linkerd2#6130. Signed-off-by: Eliza Weisman <[email protected]>
1 parent c188444 commit d17b239

File tree

11 files changed

+995
-32
lines changed

11 files changed

+995
-32
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,9 +1262,17 @@ dependencies = [
12621262
name = "linkerd-retry"
12631263
version = "0.1.0"
12641264
dependencies = [
1265+
"bytes",
1266+
"futures",
1267+
"http",
1268+
"http-body",
1269+
"hyper",
12651270
"linkerd-error",
12661271
"linkerd-stack",
1272+
"linkerd-tracing",
1273+
"parking_lot",
12671274
"pin-project",
1275+
"tokio",
12681276
"tower",
12691277
"tracing",
12701278
]

linkerd/app/core/src/retry.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use super::classify;
22
use super::dst::Route;
3-
// use super::handle_time;
43
use super::http_metrics::retries::Handle;
54
use super::metrics::HttpRouteRetry;
65
use crate::profiles;
76
use futures::future;
8-
use hyper::body::HttpBody;
7+
use http_body::Body;
98
use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse};
109
use linkerd_retry::NewRetryLayer;
1110
use linkerd_stack::Param;
1211
use std::marker::PhantomData;
1312
use std::sync::Arc;
1413
use tower::retry::budget::Budget;
1514

15+
pub use linkerd_retry::*;
16+
1617
pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer<NewRetry> {
1718
NewRetryLayer::new(NewRetry::new(metrics))
1819
}
@@ -22,18 +23,21 @@ pub trait CloneRequest<Req> {
2223
}
2324

2425
#[derive(Clone, Debug)]
25-
pub struct NewRetry<C = ()> {
26+
pub struct NewRetry<C = WithBody> {
2627
metrics: HttpRouteRetry,
2728
_clone_request: PhantomData<C>,
2829
}
2930

30-
pub struct Retry<C = ()> {
31+
pub struct Retry<C = WithBody> {
3132
metrics: Handle,
3233
budget: Arc<Budget>,
3334
response_classes: profiles::http::ResponseClasses,
3435
_clone_request: PhantomData<C>,
3536
}
3637

38+
#[derive(Clone, Debug)]
39+
pub struct WithBody;
40+
3741
impl NewRetry {
3842
pub fn new(metrics: HttpRouteRetry) -> Self {
3943
Self {
@@ -116,23 +120,47 @@ impl<C> Clone for Retry<C> {
116120
}
117121
}
118122

119-
impl<B: Default + HttpBody> CloneRequest<http::Request<B>> for () {
120-
fn clone_request(req: &http::Request<B>) -> Option<http::Request<B>> {
121-
if !req.body().is_end_stream() {
123+
// === impl WithBody ===
124+
125+
impl WithBody {
126+
/// Allow buffering requests up to 64 kb
127+
pub const MAX_BUFFERED_BYTES: usize = 64 * 1024;
128+
}
129+
130+
impl<B: Body + Unpin> CloneRequest<http::Request<ReplayBody<B>>> for WithBody {
131+
fn clone_request(req: &http::Request<ReplayBody<B>>) -> Option<http::Request<ReplayBody<B>>> {
132+
let content_length = |req: &http::Request<_>| {
133+
req.headers()
134+
.get(http::header::CONTENT_LENGTH)
135+
.and_then(|value| value.to_str().ok()?.parse::<usize>().ok())
136+
};
137+
138+
// Requests without bodies can always be retried, as we will not need to
139+
// buffer the body. If the request *does* have a body, retry it if and
140+
// only if the request contains a `content-length` header and the
141+
// content length is >= 64 kb.
142+
let has_body = !req.body().is_end_stream();
143+
if has_body && content_length(&req).unwrap_or(usize::MAX) > Self::MAX_BUFFERED_BYTES {
144+
tracing::trace!(
145+
req.has_body = has_body,
146+
req.content_length = ?content_length(&req),
147+
"not retryable",
148+
);
122149
return None;
123150
}
124151

125-
let mut clone = http::Request::new(B::default());
152+
tracing::trace!(
153+
req.has_body = has_body,
154+
req.content_length = ?content_length(&req),
155+
"retryable",
156+
);
157+
158+
let mut clone = http::Request::new(req.body().clone());
126159
*clone.method_mut() = req.method().clone();
127160
*clone.uri_mut() = req.uri().clone();
128161
*clone.headers_mut() = req.headers().clone();
129162
*clone.version_mut() = req.version();
130163

131-
// // Count retries toward the request's total handle time.
132-
// if let Some(ext) = req.extensions().get::<handle_time::Tracker>() {
133-
// clone.extensions_mut().insert(ext.clone());
134-
// }
135-
136164
Some(clone)
137165
}
138166
}

linkerd/app/inbound/fuzz/Cargo.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,8 +1120,13 @@ dependencies = [
11201120
name = "linkerd-retry"
11211121
version = "0.1.0"
11221122
dependencies = [
1123+
"bytes",
1124+
"futures",
1125+
"http",
1126+
"http-body",
11231127
"linkerd-error",
11241128
"linkerd-stack",
1129+
"parking_lot",
11251130
"pin-project",
11261131
"tower",
11271132
"tracing",

linkerd/app/integration/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tracing::instrument::Instrument;
1111
use webpki::{DNSName, DNSNameRef};
1212

1313
type ClientError = hyper::Error;
14-
type Request = http::Request<Bytes>;
14+
type Request = http::Request<hyper::Body>;
1515
type Response = http::Response<hyper::Body>;
1616
type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Response, ClientError>>)>;
1717

@@ -138,7 +138,7 @@ impl Client {
138138
&self,
139139
builder: http::request::Builder,
140140
) -> impl Future<Output = Result<Response, ClientError>> + Send + Sync + 'static {
141-
self.send_req(builder.body(Bytes::new()).unwrap())
141+
self.send_req(builder.body(Bytes::new().into()).unwrap())
142142
}
143143

144144
pub async fn request_body(&self, req: Request) -> Response {
@@ -175,7 +175,7 @@ impl Client {
175175
}
176176
tracing::debug!(headers = ?req.headers(), "request");
177177
let (tx, rx) = oneshot::channel();
178-
let _ = self.tx.send((req, tx));
178+
let _ = self.tx.send((req.map(Into::into), tx));
179179
async { rx.await.expect("request cancelled") }.in_current_span()
180180
}
181181

linkerd/app/integration/src/tap.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ where
215215
};
216216
http::Request::from_parts(parts, body)
217217
});
218-
Box::pin(self.0.send_req(req).map_err(|err| err.to_string()))
218+
Box::pin(
219+
self.0
220+
.send_req(req.map(Into::into))
221+
.map_err(|err| err.to_string()),
222+
)
219223
}
220224
}

linkerd/app/integration/src/tests/profiles.rs

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,23 @@ macro_rules! profile_test {
4343
.body("slept".into())
4444
.unwrap()
4545
})
46-
.route_fn("/0.5", move |_req| {
47-
if counter.fetch_add(1, Ordering::Relaxed) % 2 == 0 {
48-
Response::builder()
49-
.status(533)
50-
.body("nope".into())
51-
.unwrap()
52-
} else {
53-
Response::builder()
54-
.status(200)
55-
.body("retried".into())
56-
.unwrap()
46+
.route_async("/0.5", move |req| {
47+
let fail = counter.fetch_add(1, Ordering::Relaxed) % 2 == 0;
48+
async move {
49+
// Read the entire body before responding, so that the
50+
// client doesn't fail when writing it out.
51+
let _body = hyper::body::aggregate(req.into_body()).await;
52+
Ok::<_, Error>(if fail {
53+
Response::builder()
54+
.status(533)
55+
.body("nope".into())
56+
.unwrap()
57+
} else {
58+
Response::builder()
59+
.status(200)
60+
.body("retried".into())
61+
.unwrap()
62+
})
5763
}
5864
})
5965
.route_fn("/0.5/sleep", move |_req| {
@@ -174,6 +180,48 @@ async fn retry_uses_budget() {
174180
}
175181
}
176182

183+
#[tokio::test]
184+
async fn retry_with_small_post_body() {
185+
profile_test! {
186+
routes: [
187+
controller::route()
188+
.request_any()
189+
.response_failure(500..600)
190+
.retryable(true)
191+
],
192+
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
193+
with_client: |client: client::Client| async move {
194+
let req = client.request_builder("/0.5")
195+
.method("POST")
196+
.body("req has a body".into())
197+
.unwrap();
198+
let res = client.request_body(req).await;
199+
assert_eq!(res.status(), 200);
200+
}
201+
}
202+
}
203+
204+
#[tokio::test]
205+
async fn retry_with_small_put_body() {
206+
profile_test! {
207+
routes: [
208+
controller::route()
209+
.request_any()
210+
.response_failure(500..600)
211+
.retryable(true)
212+
],
213+
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
214+
with_client: |client: client::Client| async move {
215+
let req = client.request_builder("/0.5")
216+
.method("PUT")
217+
.body("req has a body".into())
218+
.unwrap();
219+
let res = client.request_body(req).await;
220+
assert_eq!(res.status(), 200);
221+
}
222+
}
223+
}
224+
177225
#[tokio::test]
178226
async fn does_not_retry_if_request_does_not_match() {
179227
profile_test! {
@@ -211,7 +259,7 @@ async fn does_not_retry_if_earlier_response_class_is_success() {
211259
}
212260

213261
#[tokio::test]
214-
async fn does_not_retry_if_request_has_body() {
262+
async fn does_not_retry_if_body_is_too_long() {
215263
profile_test! {
216264
routes: [
217265
controller::route()
@@ -223,14 +271,40 @@ async fn does_not_retry_if_request_has_body() {
223271
with_client: |client: client::Client| async move {
224272
let req = client.request_builder("/0.5")
225273
.method("POST")
226-
.body("req has a body".into())
274+
.body(hyper::Body::from(&[1u8; 64 * 1024 + 1][..]))
227275
.unwrap();
228276
let res = client.request_body(req).await;
229277
assert_eq!(res.status(), 533);
230278
}
231279
}
232280
}
233281

282+
#[tokio::test]
283+
async fn does_not_retry_if_body_lacks_known_length() {
284+
profile_test! {
285+
routes: [
286+
controller::route()
287+
.request_any()
288+
.response_failure(500..600)
289+
.retryable(true)
290+
],
291+
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
292+
with_client: |client: client::Client| async move {
293+
let (mut tx, body) = hyper::body::Body::channel();
294+
let req = client.request_builder("/0.5")
295+
.method("POST")
296+
.body(body)
297+
.unwrap();
298+
let res = tokio::spawn(async move { client.request_body(req).await });
299+
let _ = tx.send_data(Bytes::from_static(b"hello"));
300+
let _ = tx.send_data(Bytes::from_static(b"world"));
301+
drop(tx);
302+
let res = res.await.unwrap();
303+
assert_eq!(res.status(), 533);
304+
}
305+
}
306+
}
307+
234308
#[tokio::test]
235309
async fn does_not_retry_if_missing_retry_budget() {
236310
profile_test! {

linkerd/app/outbound/src/http/logical.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tracing::debug_span;
1515
impl<E> Outbound<E> {
1616
pub fn push_http_logical<B, ESvc, R>(self, resolve: R) -> Outbound<svc::BoxNewHttp<Logical, B>>
1717
where
18-
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
18+
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Unpin + Send + 'static,
1919
B::Data: Send + 'static,
2020
E: svc::NewService<Endpoint, Service = ESvc> + Clone + Send + Sync + 'static,
2121
ESvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
@@ -122,6 +122,7 @@ impl<E> Outbound<E> {
122122
)
123123
// Sets an optional retry policy.
124124
.push(retry::layer(rt.metrics.http_route_retry.clone()))
125+
.push_on_response(retry::replay::layer())
125126
// Sets an optional request timeout.
126127
.push(http::MakeTimeoutLayer::default())
127128
// Records per-route metrics.

linkerd/retry/Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,19 @@ edition = "2018"
77
publish = false
88

99
[dependencies]
10+
bytes = "1"
11+
futures = { version = "0.3", default-features = false }
12+
http-body = "0.4"
13+
http = "0.2"
1014
linkerd-error = { path = "../error" }
1115
linkerd-stack = { path = "../stack" }
16+
pin-project = "1"
17+
parking_lot = "0.11"
1218
tower = { version = "0.4.7", default-features = false, features = ["retry", "util"] }
1319
tracing = "0.1.23"
14-
pin-project = "1"
20+
21+
[dev-dependencies]
22+
hyper = "0.14"
23+
linkerd-tracing = { path = "../tracing", features = ["ansi"] }
24+
tokio = { version = "1", features = ["macros", "rt"]}
25+

linkerd/retry/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ pub use tower::retry::{budget::Budget, Policy};
1212
use tower::util::{Oneshot, ServiceExt};
1313
use tracing::trace;
1414

15+
pub mod replay;
16+
pub use self::replay::ReplayBody;
17+
1518
/// A strategy for obtaining per-target retry polices.
1619
pub trait NewPolicy<T> {
1720
type Policy;

0 commit comments

Comments
 (0)