Skip to content

Commit 095c199

Browse files
authored
Unify control plane client construction (#638)
We construct three control plane clients. All of the clients are basically the same (at least with regard to the stack/configuration). Rather than duplicate this logic throughout the code, we can unify it in the `control` module (hiding the details of how the client is built). This change modifies how the OpenCensus collector client is instantiated. The `SpanExporter` no longer constructs a new client each time it wishes to initiate a stream, opting to just clone a single client instead. This change also fixes reconnect logic for these clients. When the control client balancer was introduced, the reconnect layer incorrectly wrapped the balancer (instead of the endpoint). This has been corrected.
1 parent 6962794 commit 095c199

File tree

11 files changed

+118
-184
lines changed

11 files changed

+118
-184
lines changed

linkerd/app/core/src/config.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
pub use super::control::ControlAddr;
21
pub use crate::exp_backoff::ExponentialBackoff;
32
pub use crate::proxy::http::h2;
43
pub use crate::transport::{Bind, DefaultOrigDstAddr, NoOrigDstAddr, OrigDstAddr};
@@ -32,13 +31,6 @@ pub struct ProxyConfig {
3231
pub detect_protocol_timeout: Duration,
3332
}
3433

35-
#[derive(Clone, Debug)]
36-
pub struct ControlConfig {
37-
pub addr: ControlAddr,
38-
pub connect: ConnectConfig,
39-
pub buffer_capacity: usize,
40-
}
41-
4234
// === impl ServerConfig ===
4335

4436
impl<A: OrigDstAddr> ServerConfig<A> {

linkerd/app/core/src/control.rs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
1-
use linkerd2_addr::Addr;
1+
use crate::{
2+
classify, config, control, dns,
3+
proxy::http,
4+
reconnect,
5+
svc::{self, NewService},
6+
transport::tls,
7+
Addr, ControlHttpMetrics, Error,
8+
};
29
use std::fmt;
310

11+
#[derive(Clone, Debug)]
12+
pub struct Config {
13+
pub addr: ControlAddr,
14+
pub connect: config::ConnectConfig,
15+
pub buffer_capacity: usize,
16+
}
17+
418
#[derive(Clone, Debug)]
519
pub struct ControlAddr {
620
pub addr: Addr,
7-
pub identity: crate::transport::tls::PeerIdentity,
21+
pub identity: tls::PeerIdentity,
822
}
923

1024
impl Into<Addr> for ControlAddr {
@@ -19,8 +33,48 @@ impl fmt::Display for ControlAddr {
1933
}
2034
}
2135

36+
type BalanceBody =
37+
http::balance::PendingUntilFirstDataBody<tower::load::peak_ewma::Handle, http::glue::Body>;
38+
39+
type RspBody = linkerd2_http_metrics::requests::ResponseBody<BalanceBody, classify::Eos>;
40+
41+
pub type Client<B> = linkerd2_buffer::Buffer<http::Request<B>, http::Response<RspBody>>;
42+
43+
impl Config {
44+
pub fn build<B, I>(
45+
self,
46+
dns: dns::Resolver,
47+
metrics: ControlHttpMetrics,
48+
identity: tls::Conditional<I>,
49+
) -> Client<B>
50+
where
51+
B: http::HttpBody + Send + 'static,
52+
B::Data: Send,
53+
B::Error: Into<Error> + Send + Sync,
54+
I: Clone + tls::client::HasConfig + Send + 'static,
55+
{
56+
svc::connect(self.connect.keepalive)
57+
.push(tls::ConnectLayer::new(identity))
58+
.push_timeout(self.connect.timeout)
59+
.push(self::client::layer())
60+
.push(reconnect::layer({
61+
let backoff = self.connect.backoff;
62+
move |_| Ok(backoff.stream())
63+
}))
64+
.push_spawn_ready()
65+
.push(self::resolve::layer(dns))
66+
.push_on_response(self::control::balance::layer())
67+
.push(metrics.into_layer::<classify::Response>())
68+
.push(self::add_origin::Layer::new())
69+
.into_new_service()
70+
.check_new_service()
71+
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
72+
.new_service(self.addr)
73+
}
74+
}
75+
2276
/// Sets the request's URI from `Config`.
23-
pub mod add_origin {
77+
mod add_origin {
2478
use super::ControlAddr;
2579
use futures::{ready, TryFuture};
2680
use linkerd2_error::Error;
@@ -151,7 +205,7 @@ pub mod add_origin {
151205
}
152206
}
153207

154-
pub mod resolve {
208+
mod resolve {
155209
use super::client::Target;
156210
use crate::{
157211
dns,
@@ -187,7 +241,7 @@ pub mod resolve {
187241
}
188242
}
189243

190-
pub mod balance {
244+
mod balance {
191245
use crate::proxy::http;
192246
use std::time::Duration;
193247

@@ -200,7 +254,7 @@ pub mod balance {
200254
}
201255

202256
/// Creates a client suitable for gRPC.
203-
pub mod client {
257+
mod client {
204258
use crate::transport::{connect, tls};
205259
use crate::{proxy::http, svc};
206260
use linkerd2_proxy_http::h2::Settings as H2Settings;

linkerd/app/core/src/svc.rs

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -49,42 +49,6 @@ impl<T> NewService<T> for IdentityProxy {
4949
}
5050
}
5151

52-
#[derive(Clone, Debug)]
53-
pub struct WithTarget<S, T> {
54-
inner: S,
55-
target: T,
56-
}
57-
58-
impl<S, T> Service<()> for WithTarget<S, T>
59-
where
60-
S: Service<T>,
61-
T: Clone,
62-
{
63-
type Response = S::Response;
64-
type Future = S::Future;
65-
type Error = S::Error;
66-
67-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68-
self.inner.poll_ready(cx)
69-
}
70-
71-
fn call(&mut self, _: ()) -> Self::Future {
72-
self.inner.call(self.target.clone())
73-
}
74-
}
75-
76-
impl<S, T> NewService<()> for WithTarget<S, T>
77-
where
78-
S: NewService<T>,
79-
T: Clone,
80-
{
81-
type Service = S::Service;
82-
83-
fn new_service(&self, _: ()) -> Self::Service {
84-
self.inner.new_service(self.target.clone())
85-
}
86-
}
87-
8852
#[allow(dead_code)]
8953
impl<L> Layers<L> {
9054
pub fn push<O>(self, outer: O) -> Layers<Pair<L, O>> {
@@ -403,15 +367,6 @@ impl<S> Stack<S> {
403367
pub fn into_inner(self) -> S {
404368
self.0
405369
}
406-
407-
/// Transforms a `Service<T>` or `NewService<T>` into a `Service<()>` or
408-
/// `NewService<()>` by calling it with a fixed instance of `T`.
409-
pub fn with_fixed_target<T: Clone>(self, target: T) -> Stack<WithTarget<S, T>> {
410-
Stack(WithTarget {
411-
target,
412-
inner: self.0,
413-
})
414-
}
415370
}
416371

417372
impl<T, N> NewService<T> for Stack<N>

linkerd/app/src/dst/mod.rs

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
mod permit;
22
mod resolve;
33

4-
use http_body::Body as HttpBody;
54
use indexmap::IndexSet;
65
use linkerd2_app_core::{
7-
config::{ControlAddr, ControlConfig},
8-
dns, profiles, request_filter, svc, Error,
6+
control, dns, profiles, proxy::identity, request_filter, svc, transport::tls,
7+
ControlHttpMetrics, Error,
98
};
109
use permit::PermitConfiguredDsts;
1110
use std::time::Duration;
12-
use tonic::{
13-
body::{Body, BoxBody},
14-
client::GrpcService,
15-
};
11+
use tonic::body::BoxBody;
1612

1713
#[derive(Clone, Debug)]
1814
pub struct Config {
19-
pub control: ControlConfig,
15+
pub control: control::Config,
2016
pub context: String,
2117
pub get_suffixes: IndexSet<dns::Suffix>,
2218
pub get_networks: IndexSet<ipnet::IpNet>,
@@ -28,40 +24,36 @@ pub struct Config {
2824
/// Handles to destination service clients.
2925
///
3026
/// The addr is preserved for logging.
31-
pub struct Dst<S> {
32-
pub addr: ControlAddr,
27+
pub struct Dst {
28+
pub addr: control::ControlAddr,
3329
pub profiles: request_filter::Service<
3430
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
35-
profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
31+
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>,
3632
>,
37-
pub resolve: request_filter::Service<PermitConfiguredDsts, resolve::Resolve<S>>,
33+
pub resolve:
34+
request_filter::Service<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
3835
}
3936

4037
impl Config {
41-
// XXX This is unfortunate -- the service should be built here, but it's annoying to name.
42-
pub fn build<S>(self, svc: S) -> Result<Dst<S>, Error>
43-
where
44-
S: GrpcService<BoxBody> + Clone + Send + 'static,
45-
S::Error: Into<Error> + Send,
46-
S::ResponseBody: Send,
47-
<S::ResponseBody as Body>::Data: Send,
48-
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
49-
S::Future: Send,
50-
{
51-
let resolve = svc::stack(resolve::new(
52-
svc.clone(),
53-
&self.context,
54-
self.control.connect.backoff,
55-
))
56-
.push_request_filter(PermitConfiguredDsts::new(
57-
self.get_suffixes,
58-
self.get_networks,
59-
))
60-
.into_inner();
38+
pub fn build(
39+
self,
40+
dns: dns::Resolver,
41+
metrics: ControlHttpMetrics,
42+
identity: tls::Conditional<identity::Local>,
43+
) -> Result<Dst, Error> {
44+
let addr = self.control.addr.clone();
45+
let backoff = self.control.connect.backoff.clone();
46+
let svc = self.control.build(dns, metrics, identity);
47+
let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff))
48+
.push_request_filter(PermitConfiguredDsts::new(
49+
self.get_suffixes,
50+
self.get_networks,
51+
))
52+
.into_inner();
6153

6254
let profiles = svc::stack(profiles::Client::new(
6355
svc,
64-
resolve::BackoffUnlessInvalidArgument::from(self.control.connect.backoff),
56+
resolve::BackoffUnlessInvalidArgument::from(backoff),
6557
self.initial_profile_timeout,
6658
self.context,
6759
))
@@ -72,7 +64,7 @@ impl Config {
7264
.into_inner();
7365

7466
Ok(Dst {
75-
addr: self.control.addr,
67+
addr,
7668
resolve,
7769
profiles,
7870
})

linkerd/app/src/env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::core::{
22
addr,
33
config::*,
4+
control::{Config as ControlConfig, ControlAddr},
45
proxy::http::h2,
56
transport::{listen, tls},
67
Addr,

linkerd/app/src/identity.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,8 @@ pub use linkerd2_app_core::proxy::identity::{
22
certify, Crt, CrtKey, Csr, InvalidName, Key, Local, Name, TokenSource, TrustAnchors,
33
};
44
use linkerd2_app_core::{
5-
classify,
6-
config::{ControlAddr, ControlConfig},
75
control, dns,
86
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
9-
reconnect,
10-
svc::{self, NewService},
117
transport::tls,
128
ControlHttpMetrics as Metrics, Error,
139
};
@@ -19,15 +15,15 @@ use tracing::debug;
1915
pub enum Config {
2016
Disabled,
2117
Enabled {
22-
control: ControlConfig,
18+
control: control::Config,
2319
certify: certify::Config,
2420
},
2521
}
2622

2723
pub enum Identity {
2824
Disabled,
2925
Enabled {
30-
addr: ControlAddr,
26+
addr: control::ControlAddr,
3127
local: Local,
3228
task: Task,
3329
},
@@ -47,20 +43,12 @@ impl Config {
4743
Config::Enabled { control, certify } => {
4844
let (local, crt_store) = Local::new(&certify);
4945

50-
let addr = control.addr;
51-
let svc = svc::connect(control.connect.keepalive)
52-
.push(tls::ConnectLayer::new(tls::Conditional::Some(
53-
certify.trust_anchors.clone(),
54-
)))
55-
.push_timeout(control.connect.timeout)
56-
.push(control::client::layer())
57-
.push(control::resolve::layer(dns))
58-
.push_on_response(control::balance::layer())
59-
.push(reconnect::layer(Recover(control.connect.backoff)))
60-
.push(metrics.into_layer::<classify::Response>())
61-
.push(control::add_origin::Layer::new())
62-
.into_new_service()
63-
.new_service(addr.clone());
46+
let addr = control.addr.clone();
47+
let svc = control.build(
48+
dns,
49+
metrics,
50+
tls::Conditional::Some(certify.trust_anchors.clone()),
51+
);
6452

6553
// Save to be spawned on an auxiliary runtime.
6654
let task = {

0 commit comments

Comments
 (0)