Skip to content

Commit 7b78212

Browse files
zaharidichevolix0r
andauthored
fix(outbound): apply opaq filters in connection handler (linkerd#3378)
The initial implementation of opaq filters uses `push_filter` so that errors originate from `Service::poll_ready` rather than `Service::call`. This causes the transport metrics middleware to not count failed connections. By changing the filter contract so that filters are applied in `Service::call`, we ensure that middlewares see response failures rather than readiness failures. Signed-off-by: Zahari Dichev <zaharidichev@gmail.com> Co-authored-by: Oliver Gould <ver@buoyant.io>
1 parent 0a4ccac commit 7b78212

File tree

4 files changed

+170
-36
lines changed

4 files changed

+170
-36
lines changed

linkerd/app/outbound/src/opaq/logical/route.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ where
9191
.push_map_target(|t| t)
9292
.push_map_target(|b: Backend<T>| b.concrete)
9393
// apply backend filters
94-
.push_filter(filters::apply)
94+
.push(filters::NewApplyFilters::layer())
9595
.lift_new()
9696
.push(NewDistribute::layer())
9797
// The router does not take the backend's availability into
9898
// consideration, so we must eagerly fail requests to prevent
9999
// leaking tasks onto the runtime.
100100
.push_on_service(svc::LoadShed::layer())
101101
// apply route level filters
102-
.push_filter(filters::apply)
102+
.push(filters::NewApplyFilters::layer())
103103
.push(svc::NewMapErr::layer_with(|rt: &Self| {
104104
let route = rt.params.route_ref.clone();
105105
move |source| RouteError {

linkerd/app/outbound/src/opaq/logical/route/filters.rs

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,97 @@
1-
use linkerd_app_core::{svc, Error};
1+
use futures::{future, TryFutureExt};
2+
use linkerd_app_core::{io, svc, Error};
23
use linkerd_proxy_client_policy::opaq;
3-
use std::{fmt::Debug, sync::Arc};
4+
use std::{
5+
fmt::Debug,
6+
sync::Arc,
7+
task::{Context, Poll},
8+
};
49

5-
pub(crate) fn apply<T>(t: T) -> Result<T, Error>
10+
#[derive(Clone, Debug)]
11+
pub struct NewApplyFilters<N> {
12+
inner: N,
13+
}
14+
15+
#[derive(Clone, Debug)]
16+
pub struct ApplyFilters<S> {
17+
inner: S,
18+
filters: Arc<[opaq::Filter]>,
19+
}
20+
21+
// === impl NewApplyFilters ===
22+
23+
impl<N> NewApplyFilters<N> {
24+
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone {
25+
svc::layer::mk(move |inner| Self { inner })
26+
}
27+
}
28+
29+
impl<T, N, S> svc::NewService<T> for NewApplyFilters<N>
630
where
7-
T: Clone,
31+
N: svc::NewService<T, Service = S>,
832
T: svc::Param<Arc<[opaq::Filter]>>,
933
{
10-
let filters: &[opaq::Filter] = &t.param();
11-
if let Some(filter) = filters.iter().next() {
12-
match filter {
13-
opaq::Filter::Forbidden => {
14-
return Err(errors::TCPForbiddenRoute.into());
15-
}
34+
type Service = ApplyFilters<S>;
1635

17-
opaq::Filter::Invalid(message) => {
18-
return Err(errors::TCPInvalidBackend(message.clone()).into());
19-
}
36+
fn new_service(&self, target: T) -> Self::Service {
37+
let filters: Arc<[opaq::Filter]> = target.param();
38+
let svc = self.inner.new_service(target);
39+
ApplyFilters {
40+
inner: svc,
41+
filters,
42+
}
43+
}
44+
}
2045

21-
opaq::Filter::InternalError(message) => {
22-
return Err(errors::TCPInvalidPolicy(message).into());
46+
// === impl ApplyFilters ===
47+
48+
impl<S> ApplyFilters<S> {
49+
fn apply_filters(&self) -> Result<(), Error> {
50+
if let Some(filter) = self.filters.iter().next() {
51+
match filter {
52+
opaq::Filter::Forbidden => {
53+
return Err(errors::TCPForbiddenRoute.into());
54+
}
55+
56+
opaq::Filter::Invalid(message) => {
57+
return Err(errors::TCPInvalidBackend(message.clone()).into());
58+
}
59+
60+
opaq::Filter::InternalError(message) => {
61+
return Err(errors::TCPInvalidPolicy(message).into());
62+
}
2363
}
2464
}
65+
66+
Ok(())
67+
}
68+
}
69+
70+
impl<I, S> svc::Service<I> for ApplyFilters<S>
71+
where
72+
I: io::AsyncRead + io::AsyncWrite + Send + 'static,
73+
S: svc::Service<I> + Send + Clone + 'static,
74+
S::Error: Into<Error>,
75+
S::Future: Send,
76+
{
77+
type Response = S::Response;
78+
type Error = Error;
79+
type Future = future::Either<
80+
future::ErrInto<S::Future, Error>,
81+
future::Ready<Result<S::Response, Error>>,
82+
>;
83+
84+
#[inline]
85+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86+
self.inner.poll_ready(cx).map_err(Into::into)
2587
}
2688

27-
Ok(t)
89+
fn call(&mut self, io: I) -> Self::Future {
90+
if let Err(e) = self.apply_filters() {
91+
return future::Either::Right(future::err(e));
92+
}
93+
future::Either::Left(self.inner.call(io).err_into())
94+
}
2895
}
2996

3097
pub mod errors {

linkerd/app/outbound/src/tls/logical/route.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ where
9595
.push_map_target(|t| t)
9696
.push_map_target(|b: Backend<T>| b.concrete)
9797
// apply backend filters
98-
.push_filter(filters::apply)
98+
.push(filters::NewApplyFilters::layer())
9999
.lift_new()
100100
.push(NewDistribute::layer())
101101
// The router does not take the backend's availability into
102102
// consideration, so we must eagerly fail requests to prevent
103103
// leaking tasks onto the runtime.
104104
.push_on_service(svc::LoadShed::layer())
105105
// apply route level filters
106-
.push_filter(filters::apply)
106+
.push(filters::NewApplyFilters::layer())
107107
.push(svc::NewMapErr::layer_with(|rt: &Self| {
108108
let route = rt.params.route_ref.clone();
109109
move |source| RouteError {

linkerd/app/outbound/src/tls/logical/route/filters.rs

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,97 @@
1-
use linkerd_app_core::{svc, Error};
1+
use futures::{future, TryFutureExt};
2+
use linkerd_app_core::{io, svc, Error};
23
use linkerd_proxy_client_policy::tls;
3-
use std::{fmt::Debug, sync::Arc};
4+
use std::{
5+
fmt::Debug,
6+
sync::Arc,
7+
task::{Context, Poll},
8+
};
49

5-
pub(crate) fn apply<T>(t: T) -> Result<T, Error>
10+
#[derive(Clone, Debug)]
11+
pub struct NewApplyFilters<N> {
12+
inner: N,
13+
}
14+
15+
#[derive(Clone, Debug)]
16+
pub struct ApplyFilters<S> {
17+
inner: S,
18+
filters: Arc<[tls::Filter]>,
19+
}
20+
21+
// === impl NewApplyFilters ===
22+
23+
impl<N> NewApplyFilters<N> {
24+
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> + Clone {
25+
svc::layer::mk(move |inner| Self { inner })
26+
}
27+
}
28+
29+
impl<T, N, S> svc::NewService<T> for NewApplyFilters<N>
630
where
7-
T: Clone,
31+
N: svc::NewService<T, Service = S>,
832
T: svc::Param<Arc<[tls::Filter]>>,
933
{
10-
let filters: &[tls::Filter] = &t.param();
11-
if let Some(filter) = filters.iter().next() {
12-
match filter {
13-
tls::Filter::Forbidden => {
14-
return Err(errors::TLSForbiddenRoute.into());
15-
}
34+
type Service = ApplyFilters<S>;
1635

17-
tls::Filter::Invalid(message) => {
18-
return Err(errors::TLSInvalidBackend(message.clone()).into());
19-
}
36+
fn new_service(&self, target: T) -> Self::Service {
37+
let filters: Arc<[tls::Filter]> = target.param();
38+
let svc = self.inner.new_service(target);
39+
ApplyFilters {
40+
inner: svc,
41+
filters,
42+
}
43+
}
44+
}
2045

21-
tls::Filter::InternalError(message) => {
22-
return Err(errors::TLSInvalidPolicy(message).into());
46+
// === impl ApplyFilters ===
47+
48+
impl<S> ApplyFilters<S> {
49+
fn apply_filters(&self) -> Result<(), Error> {
50+
if let Some(filter) = self.filters.iter().next() {
51+
match filter {
52+
tls::Filter::Forbidden => {
53+
return Err(errors::TLSForbiddenRoute.into());
54+
}
55+
56+
tls::Filter::Invalid(message) => {
57+
return Err(errors::TLSInvalidBackend(message.clone()).into());
58+
}
59+
60+
tls::Filter::InternalError(message) => {
61+
return Err(errors::TLSInvalidPolicy(message).into());
62+
}
2363
}
2464
}
65+
66+
Ok(())
67+
}
68+
}
69+
70+
impl<I, S> svc::Service<I> for ApplyFilters<S>
71+
where
72+
I: io::AsyncRead + io::AsyncWrite + Send + 'static,
73+
S: svc::Service<I> + Send + Clone + 'static,
74+
S::Error: Into<Error>,
75+
S::Future: Send,
76+
{
77+
type Response = S::Response;
78+
type Error = Error;
79+
type Future = future::Either<
80+
future::ErrInto<S::Future, Error>,
81+
future::Ready<Result<S::Response, Error>>,
82+
>;
83+
84+
#[inline]
85+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86+
self.inner.poll_ready(cx).map_err(Into::into)
2587
}
2688

27-
Ok(t)
89+
fn call(&mut self, io: I) -> Self::Future {
90+
if let Err(e) = self.apply_filters() {
91+
return future::Either::Right(future::err(e));
92+
}
93+
future::Either::Left(self.inner.call(io).err_into())
94+
}
2895
}
2996

3097
pub mod errors {

0 commit comments

Comments
 (0)