Skip to content

Commit 9318075

Browse files
hawkwolix0r
andauthored
retry: only wrap bodies when a request can be retried (#1039)
Currently, the decision of whether or not a request can be retried is made in the `clone_request` implementation for the `Retry` type in `linkerd_app_core`. This is problematic, because by the time `clone_request` is called, the request body has _already_ been wrapped in a `ReplayBody`. The `ReplayBody` will buffer _regardless_ of whether or not it's cloned. This means that the maximum length to buffer doesn't _actually_ stop us from buffering --- it just determines whether we will actually attempt to retry on failure, and whether we will retain that buffer past when the request ends. The request will still buffer all body data, but it will be dropped when the single instance of the body is dropped. This means that for very long streaming bodies, we may buffer increasingly large amounts of data as long as that request is in flight. This branch fixes this by moving the determination of whether a request is retryable to *before* we wrap it in a `ReplayBody`, so that requests over the maximum buffered length are not buffered. We do this by introducing a new trait that serves as a predicate for whether or not a request is retryable, and checking that predicate before wrapping the body _or_ calling `clone_request`. Co-authored-by: Oliver Gould <[email protected]>
1 parent f4a2a6b commit 9318075

File tree

2 files changed

+47
-63
lines changed

2 files changed

+47
-63
lines changed

linkerd/app/core/src/retry.rs

Lines changed: 33 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ use super::http_metrics::retries::Handle;
44
use super::metrics::HttpRouteRetry;
55
use crate::profiles;
66
use futures::future;
7-
use http_body::Body;
87
use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse};
98
use linkerd_stack::Param;
10-
use std::marker::PhantomData;
119
use std::sync::Arc;
1210
use tower::retry::budget::Budget;
1311

@@ -17,44 +15,31 @@ pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer<NewRetry> {
1715
NewRetryLayer::new(NewRetry::new(metrics))
1816
}
1917

20-
pub trait CloneRequest<Req> {
21-
fn clone_request(req: &Req) -> Option<Req>;
22-
}
23-
2418
#[derive(Clone, Debug)]
25-
pub struct NewRetry<C = WithBody> {
19+
pub struct NewRetry {
2620
metrics: HttpRouteRetry,
27-
_clone_request: PhantomData<C>,
2821
}
2922

30-
pub struct Retry<C = WithBody> {
23+
#[derive(Clone, Debug)]
24+
pub struct Retry {
3125
metrics: Handle,
3226
budget: Arc<Budget>,
3327
response_classes: profiles::http::ResponseClasses,
34-
_clone_request: PhantomData<C>,
3528
}
3629

37-
#[derive(Clone, Debug)]
38-
pub struct WithBody;
30+
/// Allow buffering requests up to 64 kb
31+
const MAX_BUFFERED_BYTES: usize = 64 * 1024;
32+
33+
// === impl NewRetry ===
3934

4035
impl NewRetry {
4136
pub fn new(metrics: HttpRouteRetry) -> Self {
42-
Self {
43-
metrics,
44-
_clone_request: PhantomData,
45-
}
46-
}
47-
48-
pub fn clone_requests_via<C>(self) -> NewRetry<C> {
49-
NewRetry {
50-
metrics: self.metrics,
51-
_clone_request: PhantomData,
52-
}
37+
Self { metrics }
5338
}
5439
}
5540

56-
impl<C> NewPolicy<Route> for NewRetry<C> {
57-
type Policy = Retry<C>;
41+
impl NewPolicy<Route> for NewRetry {
42+
type Policy = Retry;
5843

5944
fn new_policy(&self, route: &Route) -> Option<Self::Policy> {
6045
let retries = route.route.retries().cloned()?;
@@ -64,14 +49,15 @@ impl<C> NewPolicy<Route> for NewRetry<C> {
6449
metrics,
6550
budget: retries.budget().clone(),
6651
response_classes: route.route.response_classes().clone(),
67-
_clone_request: self._clone_request,
6852
})
6953
}
7054
}
7155

72-
impl<C, A, B, E> Policy<http::Request<A>, http::Response<B>, E> for Retry<C>
56+
// === impl Retry ===
57+
58+
impl<A, B, E> Policy<http::Request<A>, http::Response<B>, E> for Retry
7359
where
74-
C: CloneRequest<http::Request<A>>,
60+
A: http_body::Body + Clone,
7561
{
7662
type Future = future::Ready<Self>;
7763

@@ -104,30 +90,27 @@ where
10490
}
10591

10692
fn clone_request(&self, req: &http::Request<A>) -> Option<http::Request<A>> {
107-
C::clone_request(req)
108-
}
109-
}
110-
111-
impl<C> Clone for Retry<C> {
112-
fn clone(&self) -> Self {
113-
Self {
114-
metrics: self.metrics.clone(),
115-
budget: self.budget.clone(),
116-
response_classes: self.response_classes.clone(),
117-
_clone_request: self._clone_request,
93+
let can_retry = self.can_retry(&req);
94+
debug_assert!(
95+
can_retry,
96+
"The retry policy attempted to clone an un-retryable request. This is unexpected."
97+
);
98+
if !can_retry {
99+
return None;
118100
}
119-
}
120-
}
121101

122-
// === impl WithBody ===
102+
let mut clone = http::Request::new(req.body().clone());
103+
*clone.method_mut() = req.method().clone();
104+
*clone.uri_mut() = req.uri().clone();
105+
*clone.headers_mut() = req.headers().clone();
106+
*clone.version_mut() = req.version();
123107

124-
impl WithBody {
125-
/// Allow buffering requests up to 64 kb
126-
pub const MAX_BUFFERED_BYTES: usize = 64 * 1024;
108+
Some(clone)
109+
}
127110
}
128111

129-
impl<B: Body + Unpin> CloneRequest<http::Request<ReplayBody<B>>> for WithBody {
130-
fn clone_request(req: &http::Request<ReplayBody<B>>) -> Option<http::Request<ReplayBody<B>>> {
112+
impl<A: http_body::Body> CanRetry<A> for Retry {
113+
fn can_retry(&self, req: &http::Request<A>) -> bool {
131114
let content_length = |req: &http::Request<_>| {
132115
req.headers()
133116
.get(http::header::CONTENT_LENGTH)
@@ -139,27 +122,20 @@ impl<B: Body + Unpin> CloneRequest<http::Request<ReplayBody<B>>> for WithBody {
139122
// only if the request contains a `content-length` header and the
140123
// content length is >= 64 kb.
141124
let has_body = !req.body().is_end_stream();
142-
if has_body && content_length(&req).unwrap_or(usize::MAX) > Self::MAX_BUFFERED_BYTES {
125+
if has_body && content_length(&req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES {
143126
tracing::trace!(
144127
req.has_body = has_body,
145128
req.content_length = ?content_length(&req),
146129
"not retryable",
147130
);
148-
return None;
131+
return false;
149132
}
150133

151134
tracing::trace!(
152135
req.has_body = has_body,
153136
req.content_length = ?content_length(&req),
154137
"retryable",
155138
);
156-
157-
let mut clone = http::Request::new(req.body().clone());
158-
*clone.method_mut() = req.method().clone();
159-
*clone.uri_mut() = req.uri().clone();
160-
*clone.headers_mut() = req.headers().clone();
161-
*clone.version_mut() = req.version();
162-
163-
Some(clone)
139+
true
164140
}
165141
}

linkerd/http-retry/src/lib.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ pub struct Retry<P, S> {
4343
policy: Option<P>,
4444
inner: S,
4545
}
46+
47+
pub trait CanRetry<B> {
48+
/// Returns `true` if a request can be retried.
49+
fn can_retry(&self, req: &http::Request<B>) -> bool;
50+
}
51+
4652
#[pin_project(project = ResponseFutureProj)]
4753
pub enum ResponseFuture<R, P, S, B>
4854
where
@@ -108,7 +114,7 @@ where
108114

109115
impl<R, P, B, S> Proxy<http::Request<B>, S> for Retry<R, P>
110116
where
111-
R: tower::retry::Policy<http::Request<ReplayBody<B>>, P::Response, Error> + Clone,
117+
R: tower::retry::Policy<http::Request<ReplayBody<B>>, P::Response, Error> + CanRetry<B> + Clone,
112118
P: Proxy<http::Request<BoxBody>, S> + Clone,
113119
S: tower::Service<P::Request> + Clone,
114120
S::Error: Into<Error>,
@@ -125,11 +131,13 @@ where
125131
trace!(retryable = %self.policy.is_some());
126132

127133
if let Some(policy) = self.policy.as_ref() {
128-
let inner = self.inner.clone().wrap_service(svc.clone()).map_request(
129-
(|req: http::Request<ReplayBody<B>>| req.map(BoxBody::new)) as fn(_) -> _,
130-
);
131-
let retry = tower::retry::Retry::new(policy.clone(), inner);
132-
return ResponseFuture::Retry(retry.oneshot(req.map(ReplayBody::new)));
134+
if policy.can_retry(&req) {
135+
let inner = self.inner.clone().wrap_service(svc.clone()).map_request(
136+
(|req: http::Request<ReplayBody<B>>| req.map(BoxBody::new)) as fn(_) -> _,
137+
);
138+
let retry = tower::retry::Retry::new(policy.clone(), inner);
139+
return ResponseFuture::Retry(retry.oneshot(req.map(ReplayBody::new)));
140+
}
133141
}
134142

135143
ResponseFuture::Disabled(self.inner.proxy(svc, req.map(BoxBody::new)))

0 commit comments

Comments
 (0)