Skip to content

Commit 6962794

Browse files
authored
Load balance requests to the control plane (#594)
Control plane clients only establish a single connection using the service's load balancer IP. This presents problems in the face of pod or node failure. This change modifies these clients to balance requests across all available control plane pods. The load balancer is configured by resolving SRV records to find each pod endpoint. If SRV records are not present, the client reverts to A-record lookup (as it did previously).
1 parent e6beab7 commit 6962794

File tree

17 files changed

+317
-214
lines changed

17 files changed

+317
-214
lines changed

Cargo.lock

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,6 @@ dependencies = [
795795
name = "linkerd2-app-core"
796796
version = "0.1.0"
797797
dependencies = [
798-
"async-trait",
799798
"bytes 0.5.4",
800799
"futures 0.3.5",
801800
"html-escape",
@@ -826,6 +825,7 @@ dependencies = [
826825
"linkerd2-proxy-api-resolve",
827826
"linkerd2-proxy-core",
828827
"linkerd2-proxy-discover",
828+
"linkerd2-proxy-dns-resolve",
829829
"linkerd2-proxy-http",
830830
"linkerd2-proxy-identity",
831831
"linkerd2-proxy-resolve",
@@ -850,7 +850,6 @@ dependencies = [
850850
"regex 1.3.9",
851851
"serde_json",
852852
"tokio",
853-
"tokio-test",
854853
"tokio-timer",
855854
"tokio-trace",
856855
"tonic",
@@ -998,6 +997,7 @@ version = "0.1.0"
998997
dependencies = [
999998
"futures 0.3.5",
1000999
"linkerd2-dns-name",
1000+
"linkerd2-error",
10011001
"linkerd2-stack",
10021002
"pin-project",
10031003
"tokio",
@@ -1258,6 +1258,22 @@ dependencies = [
12581258
"tracing-futures",
12591259
]
12601260

1261+
[[package]]
1262+
name = "linkerd2-proxy-dns-resolve"
1263+
version = "0.1.0"
1264+
dependencies = [
1265+
"futures 0.3.5",
1266+
"linkerd2-addr",
1267+
"linkerd2-dns",
1268+
"linkerd2-error",
1269+
"linkerd2-proxy-core",
1270+
"tokio",
1271+
"tokio-test",
1272+
"tower",
1273+
"tracing",
1274+
"tracing-futures",
1275+
]
1276+
12611277
[[package]]
12621278
name = "linkerd2-proxy-http"
12631279
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ members = [
3131
"linkerd/metrics",
3232
"linkerd/opencensus",
3333
"linkerd/proxy/api-resolve",
34+
"linkerd/proxy/dns-resolve",
3435
"linkerd/proxy/core",
3536
"linkerd/proxy/discover",
3637
"linkerd/proxy/http",

linkerd/app/core/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ independently of the inbound and outbound proxy logic.
1515
mock-orig-dst = ["linkerd2-proxy-transport/mock-orig-dst"]
1616

1717
[dependencies]
18-
async-trait = "0.1"
1918
bytes = "0.5"
2019
http = "0.2"
2120
http-body = "0.3"
@@ -47,6 +46,7 @@ linkerd2-proxy-discover = { path = "../../proxy/discover" }
4746
linkerd2-proxy-identity = { path = "../../proxy/identity" }
4847
linkerd2-proxy-http = { path = "../../proxy/http" }
4948
linkerd2-proxy-resolve = { path = "../../proxy/resolve" }
49+
linkerd2-proxy-dns-resolve = { path = "../../proxy/dns-resolve" }
5050
linkerd2-proxy-tap = { path = "../../proxy/tap" }
5151
linkerd2-proxy-tcp = { path = "../../proxy/tcp" }
5252
linkerd2-proxy-transport = { path = "../../proxy/transport" }
@@ -102,5 +102,3 @@ procinfo = "0.4.2"
102102
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13", features = ["arbitrary"] }
103103
prost-types = "0.6.0"
104104
quickcheck = { version = "0.9", default-features = false }
105-
tokio = { version = "0.2", features = ["time"] }
106-
tokio-test = "0.2"

linkerd/app/core/src/control.rs

Lines changed: 51 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ pub struct ControlAddr {
77
pub identity: crate::transport::tls::PeerIdentity,
88
}
99

10+
impl Into<Addr> for ControlAddr {
11+
fn into(self) -> Addr {
12+
self.addr
13+
}
14+
}
15+
1016
impl fmt::Display for ControlAddr {
1117
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1218
fmt::Display::fmt(&self.addr, f)
@@ -145,174 +151,74 @@ pub mod add_origin {
145151
}
146152
}
147153

148-
/// Resolves the controller's `addr` once before building a client.
149154
pub mod resolve {
150-
use super::{client, ControlAddr};
151-
use crate::svc;
152-
use futures::{ready, TryFuture};
153-
use linkerd2_addr::Addr;
154-
use linkerd2_dns as dns;
155-
use pin_project::pin_project;
156-
use std::future::Future;
155+
use super::client::Target;
156+
use crate::{
157+
dns,
158+
proxy::{discover, dns_resolve::DnsResolve, resolve::map_endpoint},
159+
svc,
160+
};
157161
use std::net::SocketAddr;
158-
use std::pin::Pin;
159-
use std::task::{Context, Poll};
160-
use std::{error, fmt};
161162

162-
#[derive(Clone, Debug)]
163-
pub struct Layer {
163+
pub fn layer<M>(
164164
dns: dns::Resolver,
165+
) -> impl svc::Layer<
166+
M,
167+
Service = discover::MakeEndpoint<
168+
discover::FromResolve<map_endpoint::Resolve<IntoTarget, DnsResolve>, Target>,
169+
M,
170+
>,
171+
> {
172+
discover::resolve(map_endpoint::Resolve::new(
173+
IntoTarget(()),
174+
DnsResolve::new(dns),
175+
))
165176
}
166177

167-
#[derive(Clone, Debug)]
168-
pub struct Resolve<M> {
169-
dns: dns::Resolver,
170-
inner: M,
171-
}
172-
173-
#[pin_project]
174-
pub struct Init<M>
175-
where
176-
M: tower::Service<client::Target>,
177-
{
178-
#[pin]
179-
state: State<M>,
180-
}
181-
182-
#[pin_project(project = StateProj)]
183-
enum State<M>
184-
where
185-
M: tower::Service<client::Target>,
186-
{
187-
Resolve(#[pin] dns::IpAddrFuture, Option<(M, ControlAddr)>),
188-
NotReady(M, Option<(SocketAddr, ControlAddr)>),
189-
Inner(#[pin] M::Future),
190-
}
191-
192-
#[derive(Debug)]
193-
pub enum Error<I> {
194-
Dns(dns::Error),
195-
Inner(I),
196-
}
197-
198-
// === impl Layer ===
199-
200-
pub fn layer<M>(dns: dns::Resolver) -> impl svc::Layer<M, Service = Resolve<M>> + Clone
201-
where
202-
M: tower::Service<client::Target> + Clone,
203-
{
204-
svc::layer::mk(move |inner| Resolve {
205-
dns: dns.clone(),
206-
inner,
207-
})
208-
}
178+
#[derive(Copy, Clone, Debug)]
179+
pub struct IntoTarget(());
209180

210-
// === impl Resolve ===
181+
impl map_endpoint::MapEndpoint<super::ControlAddr, ()> for IntoTarget {
182+
type Out = Target;
211183

212-
impl<M> tower::Service<ControlAddr> for Resolve<M>
213-
where
214-
M: tower::Service<client::Target> + Clone,
215-
{
216-
type Response = M::Response;
217-
type Error = <Init<M> as TryFuture>::Error;
218-
type Future = Init<M>;
219-
220-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
221-
self.inner.poll_ready(cx).map_err(Error::Inner)
222-
}
223-
224-
fn call(&mut self, target: ControlAddr) -> Self::Future {
225-
let state = match target.addr {
226-
Addr::Socket(sa) => State::make_inner(sa, &target, &mut self.inner),
227-
Addr::Name(ref na) => {
228-
// The inner service is ready, but we are going to do
229-
// additional work before using it. In case the inner
230-
// service has acquired resources (like a lock), we
231-
// relinquish our claim on the service by replacing it.
232-
self.inner = self.inner.clone();
233-
234-
let future = self.dns.resolve_one_ip(na.name());
235-
State::Resolve(future, Some((self.inner.clone(), target.clone())))
236-
}
237-
};
238-
239-
Init { state }
240-
}
241-
}
242-
243-
// === impl Init ===
244-
245-
impl<M> Future for Init<M>
246-
where
247-
M: tower::Service<client::Target>,
248-
{
249-
type Output = Result<M::Response, Error<M::Error>>;
250-
251-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252-
let mut this = self.project();
253-
loop {
254-
match this.state.as_mut().project() {
255-
StateProj::Resolve(fut, stack) => {
256-
let ip = ready!(fut.poll(cx).map_err(Error::Dns))?;
257-
let (svc, config) = stack.take().unwrap();
258-
let addr = SocketAddr::from((ip, config.addr.port()));
259-
this.state
260-
.as_mut()
261-
.set(State::NotReady(svc, Some((addr, config))));
262-
}
263-
StateProj::NotReady(svc, cfg) => {
264-
ready!(svc.poll_ready(cx).map_err(Error::Inner))?;
265-
let (addr, config) = cfg.take().unwrap();
266-
let state = State::make_inner(addr, &config, svc);
267-
this.state.as_mut().set(state);
268-
}
269-
StateProj::Inner(fut) => return fut.poll(cx).map_err(Error::Inner),
270-
};
271-
}
184+
fn map_endpoint(&self, control: &super::ControlAddr, addr: SocketAddr, _: ()) -> Self::Out {
185+
Target::new(addr, control.identity.clone())
272186
}
273187
}
188+
}
274189

275-
impl<M> State<M>
276-
where
277-
M: tower::Service<client::Target>,
278-
{
279-
fn make_inner(addr: SocketAddr, dst: &ControlAddr, mk_svc: &mut M) -> Self {
280-
let target = client::Target {
281-
addr,
282-
server_name: dst.identity.clone(),
283-
};
284-
285-
State::Inner(mk_svc.call(target))
286-
}
287-
}
190+
pub mod balance {
191+
use crate::proxy::http;
192+
use std::time::Duration;
288193

289-
// === impl Error ===
194+
const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30);
195+
const EWMA_DECAY: Duration = Duration::from_secs(10);
290196

291-
impl<I: fmt::Display> fmt::Display for Error<I> {
292-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293-
match self {
294-
Error::Dns(dns::Error::NoAddressesFound) => write!(f, "no addresses found"),
295-
Error::Dns(e) => fmt::Display::fmt(&e, f),
296-
Error::Inner(ref e) => fmt::Display::fmt(&e, f),
297-
}
298-
}
197+
pub fn layer<A, B>() -> http::balance::Layer<A, B> {
198+
http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)
299199
}
300-
301-
impl<I: fmt::Debug + fmt::Display> error::Error for Error<I> {}
302200
}
303201

304202
/// Creates a client suitable for gRPC.
305203
pub mod client {
306204
use crate::transport::{connect, tls};
307205
use crate::{proxy::http, svc};
308206
use linkerd2_proxy_http::h2::Settings as H2Settings;
309-
use std::net::SocketAddr;
310-
use std::task::{Context, Poll};
207+
use std::{
208+
net::SocketAddr,
209+
task::{Context, Poll},
210+
};
311211

312-
#[derive(Clone, Debug)]
212+
#[derive(Clone, Hash, Debug, Eq, PartialEq)]
313213
pub struct Target {
314-
pub(super) addr: SocketAddr,
315-
pub(super) server_name: tls::PeerIdentity,
214+
addr: SocketAddr,
215+
server_name: tls::PeerIdentity,
216+
}
217+
218+
impl Target {
219+
pub(super) fn new(addr: SocketAddr, server_name: tls::PeerIdentity) -> Self {
220+
Self { addr, server_name }
221+
}
316222
}
317223

318224
#[derive(Debug)]
@@ -356,12 +262,10 @@ pub mod client {
356262
type Error = <http::h2::Connect<C, B> as tower::Service<Target>>::Error;
357263
type Future = <http::h2::Connect<C, B> as tower::Service<Target>>::Future;
358264

359-
#[inline]
360265
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
361266
self.inner.poll_ready(cx)
362267
}
363268

364-
#[inline]
365269
fn call(&mut self, target: Target) -> Self::Future {
366270
self.inner.call(target)
367271
}

linkerd/app/core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
//! - Admin interfaces
77
//! - Tap
88
//! - Metric labeling
9-
#![type_length_limit = "1586225"]
9+
1010
#![deny(warnings, rust_2018_idioms)]
1111

1212
pub use linkerd2_addr::{self as addr, Addr, NameAddr};
1313
pub use linkerd2_admit as admit;
1414
pub use linkerd2_cache as cache;
1515
pub use linkerd2_conditional::Conditional;
16+
pub use linkerd2_dns;
1617
pub use linkerd2_drain as drain;
1718
pub use linkerd2_error::{Error, Never, Recover};
1819
pub use linkerd2_exp_backoff as exp_backoff;

linkerd/app/core/src/proxy/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub use linkerd2_proxy_api_resolve as api_resolve;
44
pub use linkerd2_proxy_core as core;
55
pub use linkerd2_proxy_discover as discover;
6+
pub use linkerd2_proxy_dns_resolve as dns_resolve;
67
pub use linkerd2_proxy_http as http;
78
pub use linkerd2_proxy_identity as identity;
89
pub use linkerd2_proxy_resolve as resolve;

linkerd/app/src/identity.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl Config {
5555
.push_timeout(control.connect.timeout)
5656
.push(control::client::layer())
5757
.push(control::resolve::layer(dns))
58+
.push_on_response(control::balance::layer())
5859
.push(reconnect::layer(Recover(control.connect.backoff)))
5960
.push(metrics.into_layer::<classify::Response>())
6061
.push(control::add_origin::Layer::new())

0 commit comments

Comments
 (0)