Skip to content

Commit 2057ba9

Browse files
authored
Implement reconnect as a NewService (#1032)
In order to make the reconnect module more flexible--so that it can more easily be used in different parts of the stack--this change modifies the `reconnect` crate to be implemented in terms of `NewService` rather than a `MakeService`. Furthermore, tests are added to exercise reconnect. This change sets up changes to reconnect behavior so reconnection is only used when service discovery is responsible for evicting endpoints from a balancer.
1 parent d94f6ca commit 2057ba9

File tree

11 files changed

+381
-196
lines changed

11 files changed

+381
-196
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,8 +1289,13 @@ dependencies = [
12891289
"futures",
12901290
"linkerd-error",
12911291
"linkerd-stack",
1292+
"linkerd-tracing",
12921293
"pin-project",
1294+
"tokio",
1295+
"tokio-stream",
1296+
"tokio-test",
12931297
"tower",
1298+
"tower-test",
12941299
"tracing",
12951300
]
12961301

linkerd/app/core/src/control.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
classify, config, control, dns, metrics, proxy::http, reconnect, svc, tls,
3-
transport::ConnectTcp, Addr, Error,
2+
classify, config, control, dns, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr,
3+
Error,
44
};
55
use futures::future::Either;
66
use std::fmt;
@@ -55,11 +55,6 @@ impl Config {
5555
{
5656
let addr = self.addr;
5757

58-
let connect_backoff = {
59-
let backoff = self.connect.backoff;
60-
move |_| Ok(backoff.stream())
61-
};
62-
6358
// When a DNS resolution fails, log the error and use the TTL, if there
6459
// is one, to drive re-resolution attempts.
6560
let resolve_backoff = {
@@ -89,7 +84,9 @@ impl Config {
8984
.push(tls::Client::layer(identity))
9085
.push_timeout(self.connect.timeout)
9186
.push(self::client::layer())
92-
.push(reconnect::layer(connect_backoff))
87+
.push_on_response(svc::MapErrLayer::new(Into::into))
88+
.into_new_service()
89+
.push_new_reconnect(self.connect.backoff)
9390
// Ensure individual endpoints are driven to readiness so that the balancer need not
9491
// drive them all directly.
9592
.push_on_response(svc::layer::mk(svc::SpawnReady::new))

linkerd/app/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub use linkerd_http_metrics as http_metrics;
2323
pub use linkerd_identity as identity;
2424
pub use linkerd_io as io;
2525
pub use linkerd_opencensus as opencensus;
26-
pub use linkerd_reconnect as reconnect;
2726
pub use linkerd_service_profiles as profiles;
2827
pub use linkerd_stack_metrics as stack_metrics;
2928
pub use linkerd_stack_tracing as stack_tracing;

linkerd/app/core/src/svc.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
pub use crate::proxy::http;
44
use crate::{cache, Error};
55
pub use linkerd_concurrency_limit::ConcurrencyLimit;
6+
use linkerd_error::Recover;
7+
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
8+
pub use linkerd_reconnect::NewReconnect;
69
pub use linkerd_stack::{
710
self as stack, layer, BoxNewService, BoxService, BoxServiceLayer, Fail, Filter, MapTargetLayer,
811
NewRouter, NewService, Param, Predicate, UnwrapOr,
@@ -26,6 +29,9 @@ pub use tower::{
2629
Service, ServiceExt,
2730
};
2831

32+
#[derive(Copy, Clone, Debug)]
33+
pub struct AlwaysReconnect(ExponentialBackoff);
34+
2935
pub type Buffer<Req, Rsp, E> = TowerBuffer<BoxService<Req, Rsp, E>, Req>;
3036

3137
pub type BoxHttp<B = http::BoxBody> =
@@ -47,6 +53,8 @@ pub fn stack<S>(inner: S) -> Stack<S> {
4753
Stack(inner)
4854
}
4955

56+
// === impl IdentityProxy ===
57+
5058
pub fn proxies() -> Stack<IdentityProxy> {
5159
Stack(IdentityProxy(()))
5260
}
@@ -59,6 +67,8 @@ impl<T> NewService<T> for IdentityProxy {
5967
fn new_service(&mut self, _: T) -> Self::Service {}
6068
}
6169

70+
// === impl Layers ===
71+
6272
#[allow(dead_code)]
6373
impl<L> Layers<L> {
6474
pub fn push<O>(self, outer: O) -> Layers<Pair<L, O>> {
@@ -69,13 +79,6 @@ impl<L> Layers<L> {
6979
self.push(stack::MapTargetLayer::new(map_target))
7080
}
7181

72-
/// Wraps an inner `MakeService` to be a `NewService`.
73-
pub fn push_into_new_service(
74-
self,
75-
) -> Layers<Pair<L, stack::new_service::FromMakeServiceLayer>> {
76-
self.push(stack::new_service::FromMakeServiceLayer::default())
77-
}
78-
7982
/// Buffers requests in an mpsc, spawning the inner service onto a dedicated task.
8083
pub fn push_spawn_buffer<Req>(
8184
self,
@@ -105,6 +108,8 @@ impl<M, L: Layer<M>> Layer<M> for Layers<L> {
105108
}
106109
}
107110

111+
// === impl Stack ===
112+
108113
#[allow(dead_code)]
109114
impl<S> Stack<S> {
110115
pub fn push<L: Layer<S>>(self, layer: L) -> Stack<L::Service> {
@@ -137,7 +142,14 @@ impl<S> Stack<S> {
137142

138143
/// Wraps an inner `MakeService` to be a `NewService`.
139144
pub fn into_new_service(self) -> Stack<stack::new_service::FromMakeService<S>> {
140-
self.push(stack::new_service::FromMakeServiceLayer::default())
145+
self.push(stack::new_service::FromMakeService::layer())
146+
}
147+
148+
pub fn push_new_reconnect(
149+
self,
150+
backoff: ExponentialBackoff,
151+
) -> Stack<NewReconnect<AlwaysReconnect, S>> {
152+
self.push(NewReconnect::layer(AlwaysReconnect(backoff)))
141153
}
142154

143155
/// Buffer requests when when the next layer is out of capacity.
@@ -331,3 +343,13 @@ where
331343
self.0.call(t)
332344
}
333345
}
346+
347+
// === impl AlwaysReconnect ===
348+
349+
impl<E: Into<Error>> Recover<E> for AlwaysReconnect {
350+
type Backoff = ExponentialBackoffStream;
351+
352+
fn recover(&self, _: E) -> Result<Self::Backoff, E> {
353+
Ok(self.0.stream())
354+
}
355+
}

linkerd/app/inbound/src/http/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use linkerd_app_core::{
1717
config::{ProxyConfig, ServerConfig},
1818
dst, errors, http_tracing, identity, io, profiles,
1919
proxy::{http, tap},
20-
reconnect,
2120
svc::{self, Param},
2221
Error,
2322
};
@@ -140,10 +139,9 @@ where
140139
config.proxy.connect.h1_settings,
141140
config.proxy.connect.h2_settings,
142141
))
143-
.push(reconnect::layer({
144-
let backoff = config.proxy.connect.backoff;
145-
move |_| Ok(backoff.stream())
146-
}))
142+
.push_on_response(svc::MapErrLayer::new(Into::into))
143+
.into_new_service()
144+
.push_new_reconnect(config.proxy.connect.backoff)
147145
.check_new_service::<HttpEndpoint, http::Request<_>>();
148146

149147
let target = endpoint

linkerd/app/outbound/src/http/endpoint.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::Outbound;
33
use linkerd_app_core::{
44
classify, config, http_tracing, metrics,
55
proxy::{http, tap},
6-
reconnect, svc, tls, Error, CANONICAL_DST_HEADER,
6+
svc, tls, Error, CANONICAL_DST_HEADER,
77
};
88
use tokio::io;
99

@@ -40,12 +40,10 @@ impl<C> Outbound<C> {
4040
// HTTP/1.x fallback is supported as needed.
4141
let stack = connect
4242
.push(http::client::layer(h1_settings, h2_settings))
43+
.push_on_response(svc::MapErrLayer::new(Into::<Error>::into))
4344
.check_service::<T>()
44-
// Re-establishes a connection when the client fails.
45-
.push(reconnect::layer({
46-
let backoff = backoff;
47-
move |_| Ok(backoff.stream())
48-
}))
45+
.into_new_service()
46+
.push_new_reconnect(backoff)
4947
.push(tap::NewTapHttp::layer(rt.tap.clone()))
5048
.push(rt.metrics.http_endpoint.to_layer::<classify::Response, _>())
5149
.push_on_response(http_tracing::client(

linkerd/reconnect/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ futures = { version = "0.3", default-features = false }
1313
tower = { version = "0.4.7", default-features = false }
1414
tracing = "0.1.23"
1515
pin-project = "1"
16+
17+
[dev-dependencies]
18+
linkerd-tracing = { path = "../tracing" }
19+
tokio = { version = "1", features = ["macros", "rt", "time"] }
20+
tokio-stream = { version = "0.1", features = ["time"] }
21+
tokio-test = "0.4"
22+
tower-test = "0.4"

0 commit comments

Comments
 (0)