Skip to content

Commit f5daf1f

Browse files
authored
outbound: configure failure accrual policies from stack targets (#2354)
Depends on #2353. PR #2353 adds middleware for implementing request-level circuit breaking. This branch adds the circuit breaking middleware to the outbound concrete stack, and adds plumbing for configuring a concrete stack's circuit breaker based on params provided by the target. A new `FailureAccrual` enum is added in `linkerd2-proxy-client-policy` to represent the failure accrual policy for a circuit breaker. Currently, no actual implementations of failure accrual policies exist in the proxy, so the only available variant of `FailureAccrual` is `FailureAccrual::None`, which disables circuit breaking. Circuit breaking middleware (a `Gate`/`BroadcastClassification` pair) is still constructed, but no failure accural task is spawned to actually open and shut the gate, so no circuit breaking is actually performed. Subsequent branches will actually implement failure accrual policies. Failure accrual policies are configured at the protocol level, rather than per-route or per-backend. This is because a given policy may contain multiple routes referencing the same backend, and a single concrete stack is constructed for that backend that's shared across all distributions that include it. If failure accrual policies were configured at the `RouteBackend` level, we would need to build separate client stacks if the same backend is referenced by `RouteBackend`s that have different failure accrual policies. Currently, failure accrual policies are not present in the proxy API, so all `ClientPolicy` instances have the default policy, `FailureAccrual::None`. Once the `OutboundPolicy` proxy API actually provides failure accrual configurations, a subsequent branch will populate this configuration from discovery.
1 parent 6d897d7 commit f5daf1f

File tree

14 files changed

+146
-31
lines changed

14 files changed

+146
-31
lines changed

linkerd/app/core/src/classify.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::profiles;
2+
pub use classify::gate;
23
use linkerd_error::Error;
34
use linkerd_proxy_client_policy as client_policy;
45
use linkerd_proxy_http::{classify, HasH2Reason, ResponseTimeoutError};

linkerd/app/outbound/src/discover.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,12 @@ pub fn synthesize_forward_policy(
238238
timeout,
239239
http1: policy::http::Http1 {
240240
routes: routes.clone(),
241+
failure_accrual: Default::default(),
242+
},
243+
http2: policy::http::Http2 {
244+
routes,
245+
failure_accrual: Default::default(),
241246
},
242-
http2: policy::http::Http2 { routes },
243247
opaque,
244248
};
245249

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use super::{balance, client, handle_proxy_error_headers};
55
use crate::{http, stack_labels, Outbound};
66
use linkerd_app_core::{
7-
metrics, profiles,
7+
classify, metrics, profiles,
88
proxy::{
99
api_resolve::{ConcreteAddr, Metadata, ProtocolHint},
1010
core::Resolve,
@@ -15,6 +15,7 @@ use linkerd_app_core::{
1515
transport::{self, addrs::*},
1616
Error, Infallible, NameAddr,
1717
};
18+
use linkerd_proxy_client_policy::FailureAccrual;
1819
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
1920
use tracing::info_span;
2021

@@ -88,6 +89,8 @@ impl<N> Outbound<N> {
8889
// Concrete target type.
8990
T: svc::Param<Dispatch>,
9091
// TODO(ver) T: svc::Param<svc::queue::Capacity> + svc::Param<svc::queue::Timeout>,
92+
// Failure accrual policy.
93+
T: svc::Param<FailureAccrual>,
9194
T: Clone + Debug + Send + Sync + 'static,
9295
// Endpoint resolution.
9396
R: Resolve<ConcreteAddr, Error = Error, Endpoint = Metadata>,
@@ -125,6 +128,20 @@ impl<N> Outbound<N> {
125128
)
126129
.instrument(|e: &Endpoint<T>| info_span!("endpoint", addr = %e.addr));
127130

131+
let mk_breaker = |target: &Balance<T>| {
132+
match target.parent.param() {
133+
FailureAccrual::None => |_: &(SocketAddr, Metadata)| {
134+
// Construct a gate channel, dropping the controller
135+
// side of the channel such that response summaries
136+
// are never read. The failure accrual gate never
137+
// closes in this configuration.
138+
tracing::trace!("No failure accrual policy enabled");
139+
let (prms, _, _) = classify::gate::Params::channel(1);
140+
prms
141+
},
142+
}
143+
};
144+
128145
let balance = endpoint
129146
.push_map_target({
130147
let inbound_ips = inbound_ips.clone();
@@ -143,7 +160,11 @@ impl<N> Outbound<N> {
143160
}
144161
}
145162
})
163+
.push_on_service(svc::MapErr::layer_boxed())
146164
.lift_new_with_target()
165+
.push(
166+
http::NewClassifyGateSet::<classify::Response, _, _, _>::layer_via(mk_breaker),
167+
)
147168
.push(http::NewBalancePeakEwma::layer(resolve))
148169
.push(svc::NewMapErr::layer_from_target::<ConcreteError, _>())
149170
.push_on_service(http::BoxResponse::layer())

linkerd/app/outbound/src/http/logical.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct Concrete<T> {
3838
target: concrete::Dispatch,
3939
authority: Option<http::uri::Authority>,
4040
parent: T,
41+
failure_accrual: policy::FailureAccrual,
4142
}
4243

4344
#[derive(Debug, thiserror::Error)]
@@ -169,6 +170,7 @@ where
169170
target: concrete::Dispatch::Forward(remote, meta),
170171
authority: None,
171172
parent,
173+
failure_accrual: Default::default(),
172174
})
173175
}
174176
RouterParams::Profile(profile) => {
@@ -254,6 +256,12 @@ impl<T> svc::Param<concrete::Dispatch> for Concrete<T> {
254256
}
255257
}
256258

259+
impl<T> svc::Param<policy::FailureAccrual> for Concrete<T> {
260+
fn param(&self) -> policy::FailureAccrual {
261+
self.failure_accrual
262+
}
263+
}
264+
257265
// === impl CanonicalDstHeader ===
258266

259267
impl From<CanonicalDstHeader> for http::HeaderPair {

linkerd/app/outbound/src/http/logical/policy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub use self::{
1111
route::errors,
1212
router::{GrpcParams, HttpParams},
1313
};
14-
pub use linkerd_proxy_client_policy::ClientPolicy;
14+
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
1515

1616
/// HTTP or gRPC policy route parameters.
1717
#[derive(Clone, Debug, PartialEq, Eq)]

linkerd/app/outbound/src/http/logical/policy/router.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct Params<M, F, E> {
1313
pub addr: Addr,
1414
pub routes: Arc<[http_route::Route<M, policy::RoutePolicy<F, E>>]>,
1515
pub backends: Arc<[policy::Backend]>,
16+
pub failure_accrual: policy::FailureAccrual,
1617
}
1718

1819
pub type HttpParams =
@@ -114,6 +115,7 @@ where
114115
addr,
115116
routes,
116117
backends,
118+
failure_accrual,
117119
} = rts;
118120

119121
let mk_concrete = {
@@ -130,6 +132,7 @@ where
130132
target,
131133
authority,
132134
parent: parent.clone(),
135+
failure_accrual,
133136
}
134137
}
135138
};

linkerd/app/outbound/src/http/logical/policy/tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async fn header_based_route() {
7575
],
7676
}]),
7777
backends: std::iter::once(default).chain(Some(special)).collect(),
78+
failure_accrual: Default::default(),
7879
}
7980
});
8081

@@ -167,6 +168,7 @@ async fn http_filter_request_headers() {
167168
}],
168169
}]),
169170
backends: std::iter::once(backend).collect(),
171+
failure_accrual: Default::default(),
170172
}
171173
});
172174

linkerd/app/outbound/src/http/logical/profile.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ where
123123
target: concrete::Dispatch::Balance(addr.clone(), DEFAULT_EWMA),
124124
authority: Some(addr.as_http_authority()),
125125
parent: parent.clone(),
126+
failure_accrual: Default::default(),
126127
};
127128
let backends = std::iter::once(concrete.clone()).collect();
128129
let distribution = Distribution::first_available(std::iter::once(concrete));
@@ -134,6 +135,7 @@ where
134135
target: concrete::Dispatch::Balance(t.addr.clone(), DEFAULT_EWMA),
135136
authority: Some(t.addr.as_http_authority()),
136137
parent: parent.clone(),
138+
failure_accrual: Default::default(),
137139
})
138140
.collect();
139141
let distribution = Distribution::random_available(targets.iter().cloned().map(
@@ -142,6 +144,7 @@ where
142144
authority: Some(addr.as_http_authority()),
143145
target: concrete::Dispatch::Balance(addr, DEFAULT_EWMA),
144146
parent: parent.clone(),
147+
failure_accrual: Default::default(),
145148
};
146149
(concrete, weight)
147150
},

linkerd/app/outbound/src/ingress.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -417,40 +417,53 @@ fn policy_routes(
417417
ref http2,
418418
..
419419
} => {
420-
let routes = match version {
421-
http::Version::Http1 => http1.routes.clone(),
422-
http::Version::H2 => http2.routes.clone(),
420+
let (routes, failure_accrual) = match version {
421+
http::Version::Http1 => (http1.routes.clone(), http1.failure_accrual),
422+
http::Version::H2 => (http2.routes.clone(), http2.failure_accrual),
423423
};
424424
Some(http::Routes::Policy(http::policy::Params::Http(
425425
http::policy::HttpParams {
426426
addr,
427427
backends: policy.backends.clone(),
428428
routes,
429+
failure_accrual,
429430
},
430431
)))
431432
}
432433
// TODO(eliza): what do we do here if the configured
433434
// protocol doesn't match the actual protocol for the
434435
// target? probably should make an error route instead?
435-
policy::Protocol::Http1(ref http1) => Some(http::Routes::Policy(
436-
http::policy::Params::Http(http::policy::HttpParams {
436+
policy::Protocol::Http1(policy::http::Http1 {
437+
ref routes,
438+
failure_accrual,
439+
}) => Some(http::Routes::Policy(http::policy::Params::Http(
440+
http::policy::HttpParams {
437441
addr,
438442
backends: policy.backends.clone(),
439-
routes: http1.routes.clone(),
440-
}),
441-
)),
442-
policy::Protocol::Http2(ref http2) => Some(http::Routes::Policy(
443-
http::policy::Params::Http(http::policy::HttpParams {
443+
routes: routes.clone(),
444+
failure_accrual,
445+
},
446+
))),
447+
policy::Protocol::Http2(policy::http::Http2 {
448+
ref routes,
449+
failure_accrual,
450+
}) => Some(http::Routes::Policy(http::policy::Params::Http(
451+
http::policy::HttpParams {
444452
addr,
445453
backends: policy.backends.clone(),
446-
routes: http2.routes.clone(),
447-
}),
448-
)),
449-
policy::Protocol::Grpc(ref grpc) => Some(http::Routes::Policy(http::policy::Params::Grpc(
454+
routes: routes.clone(),
455+
failure_accrual,
456+
},
457+
))),
458+
policy::Protocol::Grpc(policy::grpc::Grpc {
459+
ref routes,
460+
failure_accrual,
461+
}) => Some(http::Routes::Policy(http::policy::Params::Grpc(
450462
http::policy::GrpcParams {
451463
addr,
452464
backends: policy.backends.clone(),
453-
routes: grpc.routes.clone(),
465+
routes: routes.clone(),
466+
failure_accrual,
454467
},
455468
))),
456469
_ => None,

linkerd/app/outbound/src/sidecar.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,23 +212,33 @@ impl HttpSidecar {
212212
// protocol changes but remains HTTP-ish, we propagate those
213213
// changes. If the protocol flips to an opaque protocol, we ignore
214214
// the protocol update.
215-
let routes = match policy.protocol {
215+
let (routes, failure_accrual) = match policy.protocol {
216216
policy::Protocol::Detect {
217217
ref http1,
218218
ref http2,
219219
..
220220
} => match version {
221-
http::Version::Http1 => http1.routes.clone(),
222-
http::Version::H2 => http2.routes.clone(),
221+
http::Version::Http1 => (http1.routes.clone(), http1.failure_accrual),
222+
http::Version::H2 => (http2.routes.clone(), http2.failure_accrual),
223223
},
224-
policy::Protocol::Http1(ref http1) => http1.routes.clone(),
225-
policy::Protocol::Http2(ref http2) => http2.routes.clone(),
226-
policy::Protocol::Grpc(ref grpc) => {
224+
policy::Protocol::Http1(policy::http::Http1 {
225+
ref routes,
226+
failure_accrual,
227+
}) => (routes.clone(), failure_accrual),
228+
policy::Protocol::Http2(policy::http::Http2 {
229+
ref routes,
230+
failure_accrual,
231+
}) => (routes.clone(), failure_accrual),
232+
policy::Protocol::Grpc(policy::grpc::Grpc {
233+
ref routes,
234+
failure_accrual,
235+
}) => {
227236
return Some(http::Routes::Policy(http::policy::Params::Grpc(
228237
http::policy::GrpcParams {
229238
addr: orig_dst.into(),
230239
backends: policy.backends.clone(),
231-
routes: grpc.routes.clone(),
240+
routes: routes.clone(),
241+
failure_accrual,
232242
},
233243
)))
234244
}
@@ -245,6 +255,7 @@ impl HttpSidecar {
245255
addr: orig_dst.into(),
246256
routes,
247257
backends: policy.backends.clone(),
258+
failure_accrual,
248259
},
249260
)))
250261
}

0 commit comments

Comments
 (0)