Skip to content

Commit 9870faa

Browse files
authored
outbound: Return a default endpoint on reject (#690)
When the resolver rejects resolution, we currently propagate that error so that it can be handled via fallback. And due to recent HTTP router changes, these resolution errors can propagate up across splits, etc. This change simplifies this behavior by isntead synthesizing a resolution with a default endpoint. The `not_http` reason has been removed, as it's no longer useful.
1 parent 86c71ea commit 9870faa

File tree

11 files changed

+134
-68
lines changed

11 files changed

+134
-68
lines changed

linkerd/app/core/src/lib.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -99,29 +99,6 @@ pub struct ProxyMetrics {
9999
pub transport: transport::Metrics,
100100
}
101101

102-
#[derive(Clone, Debug)]
103-
pub struct DiscoveryRejected(());
104-
105-
impl DiscoveryRejected {
106-
pub fn new() -> Self {
107-
DiscoveryRejected(())
108-
}
109-
}
110-
111-
impl std::fmt::Display for DiscoveryRejected {
112-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113-
write!(f, "discovery rejected")
114-
}
115-
}
116-
117-
impl std::error::Error for DiscoveryRejected {}
118-
119-
impl From<Addr> for DiscoveryRejected {
120-
fn from(_: Addr) -> Self {
121-
Self::new()
122-
}
123-
}
124-
125102
#[derive(Clone, Debug, Default)]
126103
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);
127104

linkerd/app/integration/src/tests/telemetry.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ mod transport {
932932
// Connection to the server should be a failure with the EXFULL error
933933
// code.
934934
assert_eventually_contains!(metrics.get("/metrics").await,
935-
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
935+
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
936936
// Connection from the client should have closed cleanly.
937937
assert_eventually_contains!(
938938
metrics.get("/metrics").await,
@@ -963,7 +963,7 @@ mod transport {
963963
// Connection to the server should be a failure with the EXFULL error
964964
// code.
965965
assert_eventually_contains!(metrics.get("/metrics").await,
966-
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
966+
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
967967
// Connection from the client should have closed cleanly.
968968
assert_eventually_contains!(metrics.get("/metrics").await,
969969
"tcp_close_total{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
@@ -1121,7 +1121,7 @@ mod transport {
11211121
tcp_client.write(TcpFixture::HELLO_MSG).await;
11221122
assert_eq!(tcp_client.read().await, TcpFixture::BYE_MSG.as_bytes());
11231123
let expected = format!(
1124-
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} 1",
1124+
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} 1",
11251125
proxy.outbound_server.as_ref().unwrap().addr,
11261126
);
11271127
assert_eventually_contains!(metrics.get("/metrics").await, &expected);
@@ -1184,7 +1184,7 @@ mod transport {
11841184
assert_eventually_contains!(out,
11851185
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
11861186
assert_eventually_contains!(out,
1187-
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
1187+
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");
11881188

11891189
let tcp_client = client.connect().await;
11901190

@@ -1194,14 +1194,14 @@ mod transport {
11941194
assert_eventually_contains!(out,
11951195
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
11961196
assert_eventually_contains!(out,
1197-
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
1197+
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");
11981198

11991199
tcp_client.shutdown().await;
12001200
let out = metrics.get("/metrics").await;
12011201
assert_eventually_contains!(out,
12021202
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 2");
12031203
assert_eventually_contains!(out,
1204-
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 2");
1204+
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 2");
12051205
}
12061206

12071207
#[tokio::test]
@@ -1217,7 +1217,7 @@ mod transport {
12171217
TcpFixture::BYE_MSG.len()
12181218
);
12191219
let dst_expected = format!(
1220-
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
1220+
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
12211221
proxy.outbound_server.as_ref().unwrap().addr,
12221222
TcpFixture::HELLO_MSG.len()
12231223
);
@@ -1246,7 +1246,7 @@ mod transport {
12461246
TcpFixture::HELLO_MSG.len()
12471247
);
12481248
let dst_expected = format!(
1249-
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
1249+
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
12501250
proxy.outbound_server.as_ref().unwrap().addr,
12511251
TcpFixture::BYE_MSG.len()
12521252
);

linkerd/app/outbound/src/endpoint.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ impl Into<Addr> for &'_ HttpConcrete {
136136
}
137137
}
138138

139+
impl Into<SocketAddr> for &'_ HttpConcrete {
140+
fn into(self) -> SocketAddr {
141+
self.dst
142+
.socket_addr()
143+
.unwrap_or_else(|| self.logical.orig_dst)
144+
}
145+
}
146+
139147
impl std::fmt::Display for HttpConcrete {
140148
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141149
self.dst.fmt(f)
@@ -201,7 +209,7 @@ impl From<HttpLogical> for HttpEndpoint {
201209
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(),
202210
),
203211
concrete: logical.into(),
204-
metadata: Metadata::empty(),
212+
metadata: Metadata::default(),
205213
}
206214
}
207215
}
@@ -331,7 +339,7 @@ impl From<SocketAddr> for TcpEndpoint {
331339
Self {
332340
addr,
333341
dst: addr.into(),
334-
identity: Conditional::None(tls::ReasonForNoPeerName::NotHttp.into()),
342+
identity: Conditional::None(tls::ReasonForNoPeerName::PortSkipped.into()),
335343
labels: None,
336344
}
337345
}

linkerd/app/outbound/src/lib.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use linkerd2_app_core::{
2020
spans::SpanConverter,
2121
svc::{self},
2222
transport::{self, listen, tls},
23-
Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
24-
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
23+
Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER,
24+
DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
2525
};
2626
use std::{collections::HashMap, net, time::Duration};
2727
use tokio::sync::mpsc;
@@ -500,13 +500,6 @@ impl Config {
500500

501501
// Load balances TCP streams that cannot be decoded as HTTP.
502502
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
503-
.push_fallback_with_predicate(
504-
tcp_forward
505-
.clone()
506-
.push_map_target(TcpEndpoint::from)
507-
.into_inner(),
508-
is_discovery_rejected,
509-
)
510503
.push_on_response(
511504
svc::layers()
512505
.push_failfast(dispatch_timeout)
@@ -591,9 +584,7 @@ pub fn trace_labels() -> HashMap<String, String> {
591584

592585
fn is_discovery_rejected(err: &Error) -> bool {
593586
fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool {
594-
err.is::<DiscoveryRejected>()
595-
|| err.is::<profiles::InvalidProfileAddr>()
596-
|| err.source().map(is_rejected).unwrap_or(false)
587+
err.is::<profiles::InvalidProfileAddr>() || err.source().map(is_rejected).unwrap_or(false)
597588
}
598589

599590
let rejected = is_rejected(&**err);

linkerd/app/outbound/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async fn plaintext_tcp() {
6767
let resolver = test_support::resolver().endpoint_exists(
6868
logical.clone(),
6969
target_addr,
70-
test_support::resolver::Metadata::empty(),
70+
test_support::resolver::Metadata::default(),
7171
);
7272

7373
// Build the outbound TCP balancer stack.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use super::Rejected;
2+
use futures::{future, prelude::*, stream};
3+
use linkerd2_app_core::{
4+
proxy::core::{Resolve, Update},
5+
svc, Error,
6+
};
7+
use std::{
8+
future::Future,
9+
pin::Pin,
10+
task::{Context, Poll},
11+
};
12+
13+
pub fn layer<S>() -> impl svc::Layer<S, Service = RecoverDefaultResolve<S>> + Clone {
14+
svc::layer::mk(RecoverDefaultResolve)
15+
}
16+
17+
#[derive(Clone, Debug)]
18+
pub struct RecoverDefaultResolve<S>(S);
19+
20+
impl<T, S> tower::Service<T> for RecoverDefaultResolve<S>
21+
where
22+
for<'t> &'t T: Into<std::net::SocketAddr>,
23+
S: Resolve<T, Error = Error>,
24+
S::Endpoint: Default + Send + 'static,
25+
S::Resolution: Send + 'static,
26+
S::Future: Send + 'static,
27+
stream::Once<future::Ready<Result<Update<S::Endpoint>, S::Error>>>:
28+
stream::TryStream<Ok = Update<S::Endpoint>, Error = S::Error>,
29+
{
30+
type Response = future::Either<
31+
S::Resolution,
32+
stream::Once<future::Ready<Result<Update<S::Endpoint>, Error>>>,
33+
>;
34+
type Error = Error;
35+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>;
36+
37+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38+
self.0.poll_ready(cx)
39+
}
40+
41+
fn call(&mut self, t: T) -> Self::Future {
42+
let addr = (&t).into();
43+
Box::pin(
44+
self.0
45+
.resolve(t)
46+
.map_ok(future::Either::Left)
47+
.or_else(move |error| {
48+
if Rejected::matches(&*error) {
49+
tracing::debug!(%error, %addr, "Synthesizing endpoint");
50+
let endpoint = (addr, S::Endpoint::default());
51+
let res = stream::once(future::ok(Update::Reset(vec![endpoint])));
52+
future::ok(future::Either::Right(res))
53+
} else {
54+
future::err(error)
55+
}
56+
}),
57+
)
58+
}
59+
}

linkerd/app/src/dst/mod.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
mod default_resolve;
12
mod permit;
23
mod resolve;
34

5+
use self::default_resolve::RecoverDefaultResolve;
46
use indexmap::IndexSet;
57
use linkerd2_app_core::{
68
control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls,
7-
ControlHttpMetrics, Error,
9+
Addr, ControlHttpMetrics, Error,
810
};
911
use permit::PermitConfiguredDsts;
1012
use std::time::Duration;
@@ -21,6 +23,9 @@ pub struct Config {
2123
pub initial_profile_timeout: Duration,
2224
}
2325

26+
#[derive(Clone, Debug)]
27+
pub struct Rejected(());
28+
2429
/// Handles to destination service clients.
2530
///
2631
/// The addr is preserved for logging.
@@ -30,7 +35,9 @@ pub struct Dst {
3035
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
3136
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>,
3237
>,
33-
pub resolve: RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
38+
pub resolve: RecoverDefaultResolve<
39+
RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
40+
>,
3441
}
3542

3643
impl Config {
@@ -48,6 +55,7 @@ impl Config {
4855
self.get_suffixes,
4956
self.get_networks,
5057
)))
58+
.push(default_resolve::layer())
5159
.into_inner();
5260

5361
let profiles = svc::stack(profiles::Client::new(
@@ -69,3 +77,31 @@ impl Config {
6977
})
7078
}
7179
}
80+
81+
impl std::fmt::Display for Rejected {
82+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83+
write!(f, "rejected discovery")
84+
}
85+
}
86+
87+
impl std::error::Error for Rejected {}
88+
89+
impl From<Addr> for Rejected {
90+
fn from(_: Addr) -> Self {
91+
Rejected(())
92+
}
93+
}
94+
95+
impl Rejected {
96+
fn matches(err: &(dyn std::error::Error + 'static)) -> bool {
97+
if err.is::<Self>() {
98+
return true;
99+
}
100+
101+
if let Some(status) = err.downcast_ref::<tonic::Status>() {
102+
return status.code() == tonic::Code::InvalidArgument;
103+
}
104+
105+
err.source().map(Self::matches).unwrap_or(false)
106+
}
107+
}

linkerd/app/src/dst/permit.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
use super::Rejected;
12
use ipnet::{Contains, IpNet};
2-
use linkerd2_app_core::{
3-
dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error,
4-
};
3+
use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error};
54
use std::marker::PhantomData;
65
use std::net::IpAddr;
76
use std::sync::Arc;
87

9-
pub struct PermitConfiguredDsts<E = DiscoveryRejected> {
8+
pub struct PermitConfiguredDsts<E = Rejected> {
109
name_suffixes: Arc<Vec<Suffix>>,
1110
networks: Arc<Vec<IpNet>>,
1211
_error: PhantomData<fn(E)>,
@@ -74,7 +73,7 @@ where
7473
if permitted {
7574
Ok(t)
7675
} else {
77-
Err(E::from(addr.clone()).into())
76+
Err(E::from(addr).into())
7877
}
7978
}
8079
}

linkerd/app/src/dst/resolve.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use linkerd2_app_core::{
66
api_resolve as api,
77
resolve::{self, recover},
88
},
9-
DiscoveryRejected, Error, Recover,
9+
Error, Recover,
1010
};
1111
use tonic::{
1212
body::{Body, BoxBody},
@@ -46,16 +46,15 @@ impl From<ExponentialBackoff> for BackoffUnlessInvalidArgument {
4646
impl Recover<Error> for BackoffUnlessInvalidArgument {
4747
type Backoff = ExponentialBackoffStream;
4848

49-
fn recover(&self, err: Error) -> Result<Self::Backoff, Error> {
50-
match err.downcast::<Status>() {
51-
Ok(ref status) if status.code() == Code::InvalidArgument => {
52-
tracing::debug!(message = "cannot recover", %status);
53-
return Err(DiscoveryRejected::new().into());
49+
fn recover(&self, error: Error) -> Result<Self::Backoff, Error> {
50+
if let Some(status) = error.downcast_ref::<Status>() {
51+
if status.code() == Code::InvalidArgument {
52+
tracing::debug!(%status, "Cannot recover");
53+
return Err(error);
5454
}
55-
Ok(status) => tracing::trace!(message = "recovering", %status),
56-
Err(error) => tracing::trace!(message = "recovering", %error),
5755
}
5856

57+
tracing::trace!(%error, "Recovering");
5958
Ok(self.0.stream())
6059
}
6160
}

linkerd/proxy/api-resolve/src/metadata.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ pub enum ProtocolHint {
4141

4242
// === impl Metadata ===
4343

44-
impl Metadata {
45-
pub fn empty() -> Self {
44+
impl Default for Metadata {
45+
fn default() -> Self {
4646
Self {
4747
labels: IndexMap::default(),
4848
protocol_hint: ProtocolHint::Unknown,
@@ -51,7 +51,9 @@ impl Metadata {
5151
authority_override: None,
5252
}
5353
}
54+
}
5455

56+
impl Metadata {
5557
pub fn new(
5658
labels: IndexMap<String, String>,
5759
protocol_hint: ProtocolHint,

0 commit comments

Comments
 (0)