Skip to content

Commit 99e3c06

Browse files
authored
retry: only buffer request bodies when retries are enabled (#1020)
PR #1017 added support for retries on requests with bodies. This requires buffering the body data so that it can be sent as part of a retry request. In #1017, all requests in the profile HTTP route stack have their bodies wrapped in a `ReplayBody` type that buffers the request data. Although the buffering strategy is lazy (the request can be sent before the client finishes sending the body, so we don't introduce the latency of waiting to buffer the entire body) and fairly lightweight (the buffered body data is not copied an additional time, and is cloned only by increasing the reference count on the buffer it was read into from the wire), there is still some additional overhead that could be avoided when retries are not configured on a route. This branch updates the `Retry` service so that buffering is only added on routes which have a retry policy. If there is no retry policy, we avoid wrapping the request's body in a `ReplayBody`. The `retry` crate is now called `http-retry`.
1 parent d17b239 commit 99e3c06

File tree

9 files changed

+84
-63
lines changed

9 files changed

+84
-63
lines changed

Cargo.lock

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ dependencies = [
660660
"linkerd-exp-backoff",
661661
"linkerd-http-classify",
662662
"linkerd-http-metrics",
663+
"linkerd-http-retry",
663664
"linkerd-identity",
664665
"linkerd-io",
665666
"linkerd-metrics",
@@ -675,7 +676,6 @@ dependencies = [
675676
"linkerd-proxy-tcp",
676677
"linkerd-proxy-transport",
677678
"linkerd-reconnect",
678-
"linkerd-retry",
679679
"linkerd-service-profiles",
680680
"linkerd-stack",
681681
"linkerd-stack-metrics",
@@ -781,9 +781,9 @@ dependencies = [
781781
"ipnet",
782782
"linkerd-app-core",
783783
"linkerd-app-test",
784+
"linkerd-http-retry",
784785
"linkerd-identity",
785786
"linkerd-io",
786-
"linkerd-retry",
787787
"linkerd-tracing",
788788
"pin-project",
789789
"thiserror",
@@ -995,6 +995,26 @@ dependencies = [
995995
"tracing",
996996
]
997997

998+
[[package]]
999+
name = "linkerd-http-retry"
1000+
version = "0.1.0"
1001+
dependencies = [
1002+
"bytes",
1003+
"futures",
1004+
"http",
1005+
"http-body",
1006+
"hyper",
1007+
"linkerd-error",
1008+
"linkerd-http-box",
1009+
"linkerd-stack",
1010+
"linkerd-tracing",
1011+
"parking_lot",
1012+
"pin-project",
1013+
"tokio",
1014+
"tower",
1015+
"tracing",
1016+
]
1017+
9981018
[[package]]
9991019
name = "linkerd-identity"
10001020
version = "0.1.0"
@@ -1258,25 +1278,6 @@ dependencies = [
12581278
"tracing",
12591279
]
12601280

1261-
[[package]]
1262-
name = "linkerd-retry"
1263-
version = "0.1.0"
1264-
dependencies = [
1265-
"bytes",
1266-
"futures",
1267-
"http",
1268-
"http-body",
1269-
"hyper",
1270-
"linkerd-error",
1271-
"linkerd-stack",
1272-
"linkerd-tracing",
1273-
"parking_lot",
1274-
"pin-project",
1275-
"tokio",
1276-
"tower",
1277-
"tracing",
1278-
]
1279-
12801281
[[package]]
12811282
name = "linkerd-service-profiles"
12821283
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"linkerd/http-box",
3131
"linkerd/http-classify",
3232
"linkerd/http-metrics",
33+
"linkerd/http-retry",
3334
"linkerd/identity",
3435
"linkerd/io",
3536
"linkerd/metrics",
@@ -45,7 +46,6 @@ members = [
4546
"linkerd/proxy/tcp",
4647
"linkerd/proxy/transport",
4748
"linkerd/reconnect",
48-
"linkerd/retry",
4949
"linkerd/service-profiles",
5050
"linkerd/signal",
5151
"linkerd/stack",

linkerd/app/core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ linkerd-error-respond = { path = "../../error-respond" }
3535
linkerd-exp-backoff = { path = "../../exp-backoff" }
3636
linkerd-http-classify = { path = "../../http-classify" }
3737
linkerd-http-metrics = { path = "../../http-metrics" }
38+
39+
linkerd-http-retry = { path = "../../http-retry" }
3840
linkerd-identity = { path = "../../identity" }
3941
linkerd-io = { path = "../../io" }
4042
linkerd-metrics = { path = "../../metrics" }
@@ -52,7 +54,6 @@ linkerd-proxy-tap = { path = "../../proxy/tap" }
5254
linkerd-proxy-tcp = { path = "../../proxy/tcp" }
5355
linkerd-proxy-transport = { path = "../../proxy/transport" }
5456
linkerd-reconnect = { path = "../../reconnect" }
55-
linkerd-retry = { path = "../../retry" }
5657
linkerd-timeout = { path = "../../timeout" }
5758
linkerd-tracing = { path = "../../tracing" }
5859
linkerd-service-profiles = { path = "../../service-profiles" }

linkerd/app/core/src/retry.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ use crate::profiles;
66
use futures::future;
77
use http_body::Body;
88
use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse};
9-
use linkerd_retry::NewRetryLayer;
109
use linkerd_stack::Param;
1110
use std::marker::PhantomData;
1211
use std::sync::Arc;
1312
use tower::retry::budget::Budget;
1413

15-
pub use linkerd_retry::*;
14+
pub use linkerd_http_retry::*;
1615

1716
pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer<NewRetry> {
1817
NewRetryLayer::new(NewRetry::new(metrics))
@@ -54,7 +53,7 @@ impl NewRetry {
5453
}
5554
}
5655

57-
impl<C> linkerd_retry::NewPolicy<Route> for NewRetry<C> {
56+
impl<C> NewPolicy<Route> for NewRetry<C> {
5857
type Policy = Retry<C>;
5958

6059
fn new_policy(&self, route: &Route) -> Option<Self::Policy> {
@@ -70,7 +69,7 @@ impl<C> linkerd_retry::NewPolicy<Route> for NewRetry<C> {
7069
}
7170
}
7271

73-
impl<C, A, B, E> linkerd_retry::Policy<http::Request<A>, http::Response<B>, E> for Retry<C>
72+
impl<C, A, B, E> Policy<http::Request<A>, http::Response<B>, E> for Retry<C>
7473
where
7574
C: CloneRequest<http::Request<A>>,
7675
{

linkerd/app/inbound/fuzz/Cargo.lock

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ dependencies = [
590590
"linkerd-exp-backoff",
591591
"linkerd-http-classify",
592592
"linkerd-http-metrics",
593+
"linkerd-http-retry",
593594
"linkerd-identity",
594595
"linkerd-io",
595596
"linkerd-metrics",
@@ -605,7 +606,6 @@ dependencies = [
605606
"linkerd-proxy-tcp",
606607
"linkerd-proxy-transport",
607608
"linkerd-reconnect",
608-
"linkerd-retry",
609609
"linkerd-service-profiles",
610610
"linkerd-stack",
611611
"linkerd-stack-metrics",
@@ -860,6 +860,23 @@ dependencies = [
860860
"tracing",
861861
]
862862

863+
[[package]]
864+
name = "linkerd-http-retry"
865+
version = "0.1.0"
866+
dependencies = [
867+
"bytes",
868+
"futures",
869+
"http",
870+
"http-body",
871+
"linkerd-error",
872+
"linkerd-http-box",
873+
"linkerd-stack",
874+
"parking_lot",
875+
"pin-project",
876+
"tower",
877+
"tracing",
878+
]
879+
863880
[[package]]
864881
name = "linkerd-identity"
865882
version = "0.1.0"
@@ -1116,22 +1133,6 @@ dependencies = [
11161133
"tracing",
11171134
]
11181135

1119-
[[package]]
1120-
name = "linkerd-retry"
1121-
version = "0.1.0"
1122-
dependencies = [
1123-
"bytes",
1124-
"futures",
1125-
"http",
1126-
"http-body",
1127-
"linkerd-error",
1128-
"linkerd-stack",
1129-
"parking_lot",
1130-
"pin-project",
1131-
"tower",
1132-
"tracing",
1133-
]
1134-
11351136
[[package]]
11361137
name = "linkerd-service-profiles"
11371138
version = "0.1.0"

linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ http = "0.2"
2020
futures = "0.3.9"
2121
indexmap = "1.0"
2222
linkerd-app-core = { path = "../core" }
23+
linkerd-http-retry = { path = "../../http-retry" }
2324
linkerd-identity = { path = "../../identity" }
24-
linkerd-retry = { path = "../../retry" }
2525
thiserror = "1.0"
2626
tokio = { version = "1", features = ["sync"] }
2727
tower = { version = "0.4.7", features = ["util"] }

linkerd/retry/Cargo.toml renamed to linkerd/http-retry/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "linkerd-retry"
2+
name = "linkerd-http-retry"
33
version = "0.1.0"
44
authors = ["Linkerd Developers <[email protected]>"]
55
license = "Apache-2.0"
@@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false }
1212
http-body = "0.4"
1313
http = "0.2"
1414
linkerd-error = { path = "../error" }
15+
linkerd-http-box = { path = "../http-box" }
1516
linkerd-stack = { path = "../stack" }
1617
pin-project = "1"
1718
parking_lot = "0.11"

linkerd/retry/src/lib.rs renamed to linkerd/http-retry/src/lib.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#![deny(warnings, rust_2018_idioms)]
22
#![forbid(unsafe_code)]
33
#![allow(clippy::inconsistent_struct_constructor)]
4+
#![allow(clippy::type_complexity)]
45

56
use linkerd_error::Error;
7+
use linkerd_http_box::BoxBody;
68
use linkerd_stack::{NewService, Proxy, ProxyService};
79
use pin_project::pin_project;
810
use std::future::Future;
@@ -41,17 +43,28 @@ pub struct Retry<P, S> {
4143
policy: Option<P>,
4244
inner: S,
4345
}
44-
4546
#[pin_project(project = ResponseFutureProj)]
46-
pub enum ResponseFuture<R, P, S, Req>
47+
pub enum ResponseFuture<R, P, S, B>
4748
where
48-
R: tower::retry::Policy<Req, P::Response, Error> + Clone,
49-
P: Proxy<Req, S> + Clone,
49+
R: tower::retry::Policy<http::Request<ReplayBody<B>>, P::Response, Error> + Clone,
50+
P: Proxy<http::Request<BoxBody>, S> + Clone,
5051
S: tower::Service<P::Request> + Clone,
5152
S::Error: Into<Error>,
5253
{
5354
Disabled(#[pin] P::Future),
54-
Retry(#[pin] Oneshot<tower::retry::Retry<R, ProxyService<P, S>>, Req>),
55+
Retry(
56+
#[pin]
57+
Oneshot<
58+
tower::retry::Retry<
59+
R,
60+
tower::util::MapRequest<
61+
ProxyService<P, S>,
62+
fn(http::Request<ReplayBody<B>>) -> http::Request<BoxBody>,
63+
>,
64+
>,
65+
http::Request<ReplayBody<B>>,
66+
>,
67+
),
5568
}
5669

5770
// === impl NewRetryLayer ===
@@ -93,35 +106,40 @@ where
93106

94107
// === impl Retry ===
95108

96-
impl<R, P, Req, S> Proxy<Req, S> for Retry<R, P>
109+
impl<R, P, B, S> Proxy<http::Request<B>, S> for Retry<R, P>
97110
where
98-
R: tower::retry::Policy<Req, P::Response, Error> + Clone,
99-
P: Proxy<Req, S> + Clone,
111+
R: tower::retry::Policy<http::Request<ReplayBody<B>>, P::Response, Error> + Clone,
112+
P: Proxy<http::Request<BoxBody>, S> + Clone,
100113
S: tower::Service<P::Request> + Clone,
101114
S::Error: Into<Error>,
115+
B: http_body::Body + Unpin + Send + 'static,
116+
B::Data: Send,
117+
B::Error: Into<Error>,
102118
{
103119
type Request = P::Request;
104120
type Response = P::Response;
105121
type Error = Error;
106-
type Future = ResponseFuture<R, P, S, Req>;
122+
type Future = ResponseFuture<R, P, S, B>;
107123

108-
fn proxy(&self, svc: &mut S, req: Req) -> Self::Future {
124+
fn proxy(&self, svc: &mut S, req: http::Request<B>) -> Self::Future {
109125
trace!(retryable = %self.policy.is_some());
110126

111127
if let Some(policy) = self.policy.as_ref() {
112-
let inner = self.inner.clone().wrap_service(svc.clone());
128+
let inner = self.inner.clone().wrap_service(svc.clone()).map_request(
129+
(|req: http::Request<ReplayBody<B>>| req.map(BoxBody::new)) as fn(_) -> _,
130+
);
113131
let retry = tower::retry::Retry::new(policy.clone(), inner);
114-
return ResponseFuture::Retry(retry.oneshot(req));
132+
return ResponseFuture::Retry(retry.oneshot(req.map(ReplayBody::new)));
115133
}
116134

117-
ResponseFuture::Disabled(self.inner.proxy(svc, req))
135+
ResponseFuture::Disabled(self.inner.proxy(svc, req.map(BoxBody::new)))
118136
}
119137
}
120138

121-
impl<R, P, S, Req> Future for ResponseFuture<R, P, S, Req>
139+
impl<R, P, S, B> Future for ResponseFuture<R, P, S, B>
122140
where
123-
R: tower::retry::Policy<Req, P::Response, Error> + Clone,
124-
P: Proxy<Req, S> + Clone,
141+
R: tower::retry::Policy<http::Request<ReplayBody<B>>, P::Response, Error> + Clone,
142+
P: Proxy<http::Request<BoxBody>, S> + Clone,
125143
S: tower::Service<P::Request> + Clone,
126144
S::Error: Into<Error>,
127145
{

linkerd/retry/src/replay.rs renamed to linkerd/http-retry/src/replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ mod tests {
754754
tx: Tx(tx),
755755
initial,
756756
replay,
757-
_trace: linkerd_tracing::test::with_default_filter("linkerd_retry=debug"),
757+
_trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"),
758758
}
759759
}
760760
}

0 commit comments

Comments
 (0)