Skip to content

Commit 7859b9a

Browse files
authored
outbound: Configure balancers with service metadata (#2374)
The OutboundPolicies API includes resource references with its responses. These references allow us to accurately identify an arbitrary (Kubernetes style) resource by its group, kind, namespace, and name. To support new metrics that include these resource coordinates as labels, we need access to the parent and backend references from the balancer stack. This change makes the following changes to accommodate this: 1. We introduce `ParentRef`, `RouteRef`, `BackendRef`, and `EndpointRef` new-types to serve as explicit markers for different types of metadata we may encounter. 3. The profile stack is updated to parse out metadata from service names in the form <name>.<ns>.svc.*. 4. We also support discovering pod metadata from the endpoint labels `dst_pod` and `dst_namespace`. 5. When we can't infer any metadata from a profile response, we use an "unknown" metadata. 6. When using policy, we use control-plane provided metadata. In practice, pod metadata won't be surfaced by the existing code path; but we have a relatively simple path forward to using it. This change splits the balancer stack into its own layer so it's distinct from the rest of the concrete stack configuration. There are only minor functional changes in this branch: 1. The `balance{addr=...}` tracing context is replace by `service{ns=..,name=...}` for clarity; 2. Errors returned from the balance stack are now rendered as: `Service {name}.{ns}: {error}`
1 parent 4cced58 commit 7859b9a

File tree

18 files changed

+409
-134
lines changed

18 files changed

+409
-134
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1882,7 +1882,7 @@ dependencies = [
18821882
[[package]]
18831883
name = "linkerd2-proxy-api"
18841884
version = "0.8.0"
1885-
source = "git+https://github.com/linkerd/linkerd2-proxy-api?branch=main#afd13fc7d5a592acd31b4ba822f854e5590e9600"
1885+
source = "git+https://github.com/linkerd/linkerd2-proxy-api?branch=main#ad750d839ceb18294406e0f118527122be56cf66"
18861886
dependencies = [
18871887
"h2",
18881888
"http",

linkerd/app/integration/src/policy.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy {
111111
let dst = dst.to_string();
112112
let route = outbound_default_http_route(dst.clone());
113113
outbound::OutboundPolicy {
114+
metadata: Some(api::meta::Metadata {
115+
kind: Some(api::meta::metadata::Kind::Default("default".to_string())),
116+
}),
114117
protocol: Some(outbound::ProxyProtocol {
115118
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
116119
timeout: Some(Duration::from_secs(10).try_into().unwrap()),

linkerd/app/integration/src/tests/client_policy.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ async fn empty_http1_route() {
5151
.outbound(
5252
srv.addr,
5353
outbound::OutboundPolicy {
54+
metadata: Some(api::meta::Metadata {
55+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
56+
}),
5457
protocol: Some(outbound::ProxyProtocol {
5558
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
5659
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -137,6 +140,9 @@ async fn empty_http2_route() {
137140
.outbound(
138141
srv.addr,
139142
outbound::OutboundPolicy {
143+
metadata: Some(api::meta::Metadata {
144+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
145+
}),
140146
protocol: Some(outbound::ProxyProtocol {
141147
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
142148
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -250,6 +256,9 @@ async fn header_based_routing() {
250256
.outbound(
251257
srv.addr,
252258
outbound::OutboundPolicy {
259+
metadata: Some(api::meta::Metadata {
260+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
261+
}),
253262
protocol: Some(outbound::ProxyProtocol {
254263
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
255264
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -427,6 +436,9 @@ async fn path_based_routing() {
427436
.outbound(
428437
srv.addr,
429438
outbound::OutboundPolicy {
439+
metadata: Some(api::meta::Metadata {
440+
kind: Some(api::meta::metadata::Kind::Default("test".to_string())),
441+
}),
430442
protocol: Some(outbound::ProxyProtocol {
431443
kind: Some(proxy_protocol::Kind::Detect(proxy_protocol::Detect {
432444
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
@@ -490,6 +502,7 @@ fn httproute_meta(name: impl ToString) -> api::meta::Metadata {
490502
name: name.to_string(),
491503
namespace: "test".to_string(),
492504
section: "".to_string(),
505+
port: 0,
493506
})),
494507
}
495508
}

linkerd/app/outbound/src/discover.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ pub fn synthesize_forward_policy(
248248
};
249249

250250
ClientPolicy {
251+
parent: meta.clone(),
251252
protocol: detect,
252253
backends: Arc::new([backend]),
253254
}

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 122 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! and distributes HTTP requests among them.
33
44
use super::{balance, breaker, client, handle_proxy_error_headers};
5-
use crate::{http, stack_labels, Outbound};
5+
use crate::{http, stack_labels, BackendRef, Outbound, ParentRef};
66
use linkerd_app_core::{
77
classify, metrics, profiles,
88
proxy::{
@@ -34,9 +34,9 @@ pub struct DispatcherFailed(Arc<str>);
3434

3535
/// Wraps errors encountered in this module.
3636
#[derive(Debug, thiserror::Error)]
37-
#[error("concrete service {addr}: {source}")]
38-
pub struct ConcreteError {
39-
addr: NameAddr,
37+
#[error("{} {}.{}: {source}", backend.0.kind(), backend.0.name(), backend.0.namespace())]
38+
pub struct BalanceError {
39+
backend: BackendRef,
4040
#[source]
4141
source: Error,
4242
}
@@ -87,9 +87,9 @@ impl<N> Outbound<N> {
8787
>
8888
where
8989
// Concrete target type.
90+
T: svc::Param<ParentRef>,
91+
T: svc::Param<BackendRef>,
9092
T: svc::Param<Dispatch>,
91-
// TODO(ver) T: svc::Param<svc::queue::Capacity> + svc::Param<svc::queue::Timeout>,
92-
// Failure accrual policy.
9393
T: svc::Param<FailureAccrual>,
9494
T: Clone + Debug + Send + Sync + 'static,
9595
// Endpoint resolution.
@@ -102,10 +102,6 @@ impl<N> Outbound<N> {
102102
NSvc::Error: Into<Error>,
103103
NSvc::Future: Send,
104104
{
105-
let resolve =
106-
svc::MapTargetLayer::new(|t: Balance<T>| -> ConcreteAddr { ConcreteAddr(t.addr) })
107-
.layer(resolve.into_service());
108-
109105
self.map_stack(|config, rt, inner| {
110106
let inbound_ips = config.inbound_ips.clone();
111107

@@ -119,71 +115,15 @@ impl<N> Outbound<N> {
119115
)
120116
.instrument(|e: &Endpoint<T>| info_span!("forward", addr = %e.addr));
121117

122-
let endpoint = inner
123-
.push_map_target({
124-
let inbound_ips = inbound_ips.clone();
125-
move |((addr, metadata), target): ((SocketAddr, Metadata), Balance<T>)| {
126-
tracing::trace!(%addr, ?metadata, ?target, "Resolved endpoint");
127-
let is_local = inbound_ips.contains(&addr.ip());
128-
Endpoint {
129-
addr: Remote(ServerAddr(addr)),
130-
metadata,
131-
is_local,
132-
parent: target.parent,
133-
// We don't close server-side connections when we
134-
// get `l5d-proxy-connection: close` response headers
135-
// going through the balancer.
136-
close_server_connection_on_remote_proxy_error: false,
137-
}
138-
}
139-
})
140-
.push_on_service(svc::MapErr::layer_boxed())
141-
.lift_new_with_target()
142-
.push(
143-
http::NewClassifyGateSet::<classify::Response, _, _, _>::layer_via({
144-
// This sets the capacity of the channel used to send
145-
// response classifications to the failure accrual task.
146-
// Since the number of messages in this channel should
147-
// be roughly the same as the number of requests, size
148-
// it to the request queue capacity.
149-
// TODO(eliza): when queue capacity is configured
150-
// per-target, extract this from the target instead.
151-
let channel_capacity = config.http_request_queue.capacity;
152-
move |target: &Balance<T>| breaker::Params {
153-
accrual: target.parent.param(),
154-
channel_capacity,
155-
}
156-
}),
157-
)
158-
.push_on_service(svc::OnServiceLayer::new(
159-
rt.metrics
160-
.proxy
161-
.stack
162-
.layer(stack_labels("http", "endpoint")),
163-
))
164-
.push_on_service(svc::NewInstrumentLayer::new(
165-
|(addr, _): &(SocketAddr, _)| info_span!("endpoint", %addr),
166-
));
167-
168-
let balance = endpoint
169-
.push(http::NewBalancePeakEwma::layer(resolve))
170-
.push(svc::NewMapErr::layer_from_target::<ConcreteError, _>())
171-
.push_on_service(http::BoxResponse::layer())
172-
.push_on_service(
173-
rt.metrics
174-
.proxy
175-
.stack
176-
.layer(stack_labels("http", "balance")),
177-
)
178-
.instrument(|t: &Balance<T>| info_span!("balance", addr = %t.addr));
179-
180118
let fail = svc::ArcNewService::new(|message: Arc<str>| {
181119
svc::mk(move |_| {
182120
let error = DispatcherFailed(message.clone());
183121
futures::future::ready(Err(error))
184122
})
185123
});
186-
balance
124+
125+
inner
126+
.push(Balance::layer(config, rt, resolve))
187127
.push_switch(Ok::<_, Infallible>, forward.into_inner())
188128
.push_switch(
189129
move |parent: T| -> Result<_, Infallible> {
@@ -206,23 +146,14 @@ impl<N> Outbound<N> {
206146
},
207147
fail,
208148
)
149+
// TODO(ver) Configure this queue from the target (i.e. from
150+
// discovery).
209151
.push(svc::NewQueue::layer_via(config.http_request_queue))
210152
.push(svc::ArcNewService::layer())
211153
})
212154
}
213155
}
214156

215-
// === impl ConcreteError ===
216-
217-
impl<T> From<(&Balance<T>, Error)> for ConcreteError {
218-
fn from((target, source): (&Balance<T>, Error)) -> Self {
219-
Self {
220-
addr: target.addr.clone(),
221-
source,
222-
}
223-
}
224-
}
225-
226157
// === impl Balance ===
227158

228159
impl<T> svc::Param<http::balance::EwmaConfig> for Balance<T> {
@@ -231,6 +162,117 @@ impl<T> svc::Param<http::balance::EwmaConfig> for Balance<T> {
231162
}
232163
}
233164

165+
impl<T> Balance<T>
166+
where
167+
// Parent target.
168+
T: svc::Param<ParentRef>,
169+
T: svc::Param<BackendRef>,
170+
T: svc::Param<FailureAccrual>,
171+
T: Clone + Debug + Send + Sync + 'static,
172+
{
173+
pub fn layer<N, NSvc, R>(
174+
config: &crate::Config,
175+
rt: &crate::Runtime,
176+
resolve: R,
177+
) -> impl svc::Layer<
178+
N,
179+
Service = svc::ArcNewService<
180+
Self,
181+
impl svc::Service<
182+
http::Request<http::BoxBody>,
183+
Response = http::Response<http::BoxBody>,
184+
Error = BalanceError,
185+
Future = impl Send,
186+
>,
187+
>,
188+
> + Clone
189+
where
190+
// Endpoint resolution.
191+
R: Resolve<ConcreteAddr, Error = Error, Endpoint = Metadata> + 'static,
192+
// Endpoint stack.
193+
N: svc::NewService<Endpoint<T>, Service = NSvc> + Clone + Send + Sync + 'static,
194+
NSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
195+
+ Send
196+
+ 'static,
197+
NSvc::Error: Into<Error>,
198+
NSvc::Future: Send,
199+
{
200+
let classify_channel_capacity = config.http_request_queue.capacity;
201+
let inbound_ips = config.inbound_ips.clone();
202+
let metrics = rt.metrics.clone();
203+
204+
let resolve = svc::MapTargetLayer::new(|t: Self| -> ConcreteAddr { ConcreteAddr(t.addr) })
205+
.layer(resolve.into_service());
206+
207+
svc::layer::mk(move |inner: N| {
208+
let endpoint = svc::stack(inner)
209+
.push_map_target({
210+
let inbound_ips = inbound_ips.clone();
211+
move |((addr, metadata), target): ((SocketAddr, Metadata), Self)| {
212+
tracing::trace!(%addr, ?metadata, ?target, "Resolved endpoint");
213+
let is_local = inbound_ips.contains(&addr.ip());
214+
Endpoint {
215+
addr: Remote(ServerAddr(addr)),
216+
metadata,
217+
is_local,
218+
parent: target.parent,
219+
// We don't close server-side connections when we
220+
// get `l5d-proxy-connection: close` response headers
221+
// going through the balancer.
222+
close_server_connection_on_remote_proxy_error: false,
223+
}
224+
}
225+
})
226+
.push_on_service(svc::MapErr::layer_boxed())
227+
.lift_new_with_target()
228+
.push(
229+
http::NewClassifyGateSet::<classify::Response, _, _, _>::layer_via({
230+
move |target: &Self| breaker::Params {
231+
accrual: target.parent.param(),
232+
// TODO configure channel capacities from target.
233+
channel_capacity: classify_channel_capacity,
234+
}
235+
}),
236+
)
237+
.push_on_service(svc::OnServiceLayer::new(
238+
metrics.proxy.stack.layer(stack_labels("http", "endpoint")),
239+
))
240+
.push_on_service(svc::NewInstrumentLayer::new(
241+
|(addr, _): &(SocketAddr, _)| info_span!("endpoint", %addr),
242+
));
243+
244+
endpoint
245+
.push(http::NewBalancePeakEwma::layer(resolve.clone()))
246+
.push(svc::NewMapErr::layer_from_target::<BalanceError, _>())
247+
.push_on_service(http::BoxResponse::layer())
248+
.push_on_service(metrics.proxy.stack.layer(stack_labels("http", "balance")))
249+
.instrument(|t: &Self| {
250+
let BackendRef(meta) = t.parent.param();
251+
info_span!(
252+
"service",
253+
ns = %meta.namespace(),
254+
name = %meta.name(),
255+
port = %meta.port().map(u16::from).unwrap_or(0),
256+
)
257+
})
258+
.push(svc::ArcNewService::layer())
259+
.into_inner()
260+
})
261+
}
262+
}
263+
264+
// === impl BalanceError ===
265+
266+
impl<T> From<(&Balance<T>, Error)> for BalanceError
267+
where
268+
T: svc::Param<BackendRef>,
269+
{
270+
fn from((target, source): (&Balance<T>, Error)) -> Self {
271+
let backend = target.parent.param();
272+
Self { backend, source }
273+
}
274+
}
275+
234276
// === impl Endpoint ===
235277

236278
impl<T> svc::Param<Remote<ServerAddr>> for Endpoint<T> {

0 commit comments

Comments
 (0)