Skip to content

Commit 7e27e5d

Browse files
authored
tracing: Avoid high-cardinality client in INFO spans (#947)
Our default TCP server sets an info-level tracing span that includes the client IP and port. This client-level metadata is potentially high-cardinality, it's not helpful on the outbound side, and it is only rendered in rare situations. This change modifies the server to set its `accept` span on the debug level. The inbound and outbound proxies handle setting info-level spans with the relevant metadata -- the inbound proxy includes only a server port, and the outbound includes the original dst address. The error responder has been modified to include the client address in warning messages so that it need not be derived from span metadata.
1 parent e6aec6e commit 7e27e5d

File tree

7 files changed

+40
-51
lines changed

7 files changed

+40
-51
lines changed

linkerd/app/core/src/errors.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use linkerd_error::Error;
44
use linkerd_error_metrics as metrics;
55
use linkerd_error_respond as respond;
66
pub use linkerd_error_respond::RespondLayer;
7-
use linkerd_proxy_http::{client_handle::Close, ClientHandle, HasH2Reason};
7+
use linkerd_proxy_http::{ClientHandle, HasH2Reason};
88
use linkerd_timeout::{error::ResponseTimeout, FailFastError};
99
use linkerd_tls as tls;
1010
use pin_project::pin_project;
@@ -55,7 +55,7 @@ pub struct NewRespond(());
5555
pub struct Respond {
5656
version: http::Version,
5757
is_grpc: bool,
58-
close: Option<Close>,
58+
client: Option<ClientHandle>,
5959
}
6060

6161
#[pin_project(project = ResponseBodyProj)]
@@ -142,10 +142,9 @@ impl<ReqB, RspB: Default + hyper::body::HttpBody>
142142
type Respond = Respond;
143143

144144
fn new_respond(&self, req: &http::Request<ReqB>) -> Self::Respond {
145-
let close = req
146-
.extensions()
147-
.get::<ClientHandle>()
148-
.map(|h| h.close.clone());
145+
let client = req.extensions().get::<ClientHandle>().cloned();
146+
debug_assert!(client.is_some(), "Missing client handle");
147+
149148
match req.version() {
150149
http::Version::HTTP_2 => {
151150
let is_grpc = req
@@ -155,13 +154,13 @@ impl<ReqB, RspB: Default + hyper::body::HttpBody>
155154
.unwrap_or(false);
156155
Respond {
157156
is_grpc,
158-
close,
157+
client,
159158
version: http::Version::HTTP_2,
160159
}
161160
}
162161
version => Respond {
163162
version,
164-
close,
163+
client,
165164
is_grpc: false,
166165
},
167166
}
@@ -181,7 +180,15 @@ impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB
181180
_ => ResponseBody::NonGrpc(b),
182181
})),
183182
Err(error) => {
184-
warn!("Failed to proxy request: {}", error);
183+
let addr = self
184+
.client
185+
.as_ref()
186+
.map(|ClientHandle { ref addr, .. }| *addr)
187+
.unwrap_or_else(|| {
188+
debug!("Missing client address");
189+
([0, 0, 0, 0], 0).into()
190+
});
191+
warn!(client.addr = %addr, "Failed to proxy request: {}", error);
185192

186193
if self.version == http::Version::HTTP_2 {
187194
if let Some(reset) = error.h2_reason() {
@@ -191,9 +198,9 @@ impl<RspB: Default + hyper::body::HttpBody> respond::Respond<http::Response<RspB
191198
}
192199

193200
// Gracefully teardown the server-side connection.
194-
if let Some(c) = self.close.as_ref() {
201+
if let Some(ClientHandle { ref close, .. }) = self.client.as_ref() {
195202
debug!("Closing server-side connection");
196-
c.close();
203+
close.close();
197204
}
198205

199206
if self.is_grpc {

linkerd/app/core/src/serve.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd_error::Error;
55
use linkerd_proxy_transport::listen::Addrs;
66
use tower::util::ServiceExt;
77
use tracing::instrument::Instrument;
8-
use tracing::{debug, info, info_span, warn};
8+
use tracing::{debug, debug_span, info, warn};
99

1010
/// Spawns a task that binds an `L`-typed listener with an `A`-typed
1111
/// connection-accepting service.
@@ -38,13 +38,9 @@ pub async fn serve<M, A, I>(
3838
};
3939

4040
// The local addr should be instrumented from the listener's context.
41-
let span = info_span!(
42-
"accept",
43-
client.addr = %addrs.client(),
44-
target.addr = %addrs.target_addr(),
45-
);
41+
let span = debug_span!("accept", client.addr = %addrs.client());
4642

47-
let accept = new_accept.new_service(addrs);
43+
let accept = span.in_scope(|| new_accept.new_service(addrs));
4844

4945
// Dispatch all of the work for a given connection onto a connection-specific task.
5046
tokio::spawn(

linkerd/app/inbound/src/direct.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use linkerd_app_core::{
99
Conditional, Error, NameAddr, Never,
1010
};
1111
use std::{convert::TryFrom, fmt::Debug, net::SocketAddr};
12-
use tracing::debug_span;
12+
use tracing::{debug_span, info_span};
1313

1414
#[derive(Clone, Debug)]
1515
struct WithTransportHeaderAlpn(LocalCrtKey);
@@ -95,7 +95,6 @@ impl<T> Inbound<T> {
9595
let detect_timeout = config.proxy.detect_protocol_timeout;
9696

9797
let stack = tcp
98-
.check_new_service::<TcpEndpoint, FwdIo<I>>()
9998
.instrument(|_: &TcpEndpoint| debug_span!("opaque"))
10099
// When the transport header is present, it may be used for either local
101100
// TCP forwarding, or we may be processing an HTTP gateway connection.
@@ -112,13 +111,11 @@ impl<T> Inbound<T> {
112111
port,
113112
name: Some(name),
114113
protocol,
115-
} => Ok(svc::Either::B(GatewayConnection::TransportHeader(
116-
GatewayTransportHeader {
117-
target: NameAddr::from((name, port)),
118-
protocol,
119-
client,
120-
},
121-
))),
114+
} => Ok(svc::Either::B(GatewayTransportHeader {
115+
target: NameAddr::from((name, port)),
116+
protocol,
117+
client,
118+
})),
122119
TransportHeader {
123120
name: None,
124121
protocol: Some(_),
@@ -129,8 +126,8 @@ impl<T> Inbound<T> {
129126
// header indicates the connection's HTTP version.
130127
svc::stack(gateway.clone())
131128
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Left))
132-
.check_new_service::<GatewayConnection, FwdIo<I>>()
133-
.instrument(|_: &GatewayConnection| debug_span!("gateway"))
129+
.push_map_target(GatewayConnection::TransportHeader)
130+
.instrument(|g: &GatewayTransportHeader| info_span!("gateway", dst = %g.target))
134131
.into_inner(),
135132
)
136133
// Use ALPN to determine whether a transport header should be read.
@@ -150,13 +147,10 @@ impl<T> Inbound<T> {
150147
// with transport header support.
151148
svc::stack(gateway)
152149
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Right))
153-
.check_new_service::<GatewayConnection, SensorIo<tls::server::Io<I>>>()
154-
.instrument(|_: &GatewayConnection| debug_span!("legacy"))
150+
.instrument(|_: &GatewayConnection| info_span!("gateway", legacy = true))
155151
.into_inner(),
156152
)
157-
.check_new_service::<ClientInfo, SensorIo<tls::server::Io<I>>>()
158153
.push(rt.metrics.transport.layer_accept())
159-
.instrument(|_: &ClientInfo| debug_span!("direct"))
160154
// Build a ClientInfo target for each accepted connection. Refuse the
161155
// connection if it doesn't include an mTLS identity.
162156
.push_request_filter(ClientInfo::try_from)

linkerd/app/inbound/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use linkerd_app_core::{
3131
Error, NameMatch, ProxyRuntime,
3232
};
3333
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
34-
use tracing::debug_span;
34+
use tracing::{debug_span, info_span};
3535

3636
#[derive(Clone, Debug)]
3737
pub struct Config {
@@ -258,7 +258,7 @@ where
258258
self.runtime.identity.clone(),
259259
config.detect_protocol_timeout,
260260
))
261-
.check_new_service::<listen::Addrs, I>()
261+
.instrument(|_: &_| debug_span!("proxy"))
262262
.push_switch(
263263
disable_detect,
264264
self.clone()
@@ -267,7 +267,7 @@ where
267267
.push_map_target(TcpEndpoint::from)
268268
.push(self.runtime.metrics.transport.layer_accept())
269269
.push_map_target(TcpAccept::port_skipped)
270-
.check_new_service::<listen::Addrs, I>()
270+
.instrument(|_: &_| debug_span!("forward"))
271271
.into_inner(),
272272
)
273273
.check_new_service::<listen::Addrs, I>()
@@ -276,9 +276,10 @@ where
276276
self.push_tcp_forward(server_port)
277277
.push_direct(gateway)
278278
.stack
279-
.check_new_service::<listen::Addrs, I>()
279+
.instrument(|_: &_| debug_span!("direct"))
280280
.into_inner(),
281281
)
282+
.instrument(|a: &listen::Addrs| info_span!("server", port = %a.target_addr().port()))
282283
.check_new_service::<listen::Addrs, I>()
283284
.into_inner()
284285
}

linkerd/app/inbound/src/prevent_loop.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,7 @@ impl Predicate<Addrs> for PreventLoop {
5050

5151
impl std::fmt::Display for LoopPrevented {
5252
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53-
write!(
54-
f,
55-
"inbound requests must not target localhost:{}",
56-
self.port
57-
)
53+
write!(f, "inbound connection must not target port {}", self.port)
5854
}
5955
}
6056

linkerd/app/outbound/src/discover.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use linkerd_app_core::{
55
Error, IpMatch,
66
};
77
use std::convert::TryFrom;
8+
use tracing::info_span;
89

910
impl<N> Outbound<N> {
1011
/// Discovers the profile for a TCP endpoint.
@@ -36,11 +37,8 @@ impl<N> Outbound<N> {
3637
let allow = AllowProfile(config.allow_discovery.clone().into());
3738

3839
let stack = accept
39-
.check_new::<tcp::Logical>()
40-
.check_new_service::<tcp::Logical, SensorIo<I>>()
4140
.push_map_target(tcp::Logical::from)
4241
.push(profiles::discover::layer(profiles, allow))
43-
.check_new_service::<tcp::Accept, SensorIo<I>>()
4442
.push_on_response(
4543
svc::layers()
4644
// If the traffic split is empty/unavailable, eagerly fail
@@ -55,10 +53,9 @@ impl<N> Outbound<N> {
5553
))
5654
.push_spawn_buffer(config.proxy.buffer_capacity),
5755
)
58-
.check_new_service::<tcp::Accept, SensorIo<I>>()
5956
.push(rt.metrics.transport.layer_accept())
6057
.push_cache(config.proxy.cache_max_idle_age)
61-
.check_new_service::<tcp::Accept, I>()
58+
.instrument(|a: &tcp::Accept| info_span!("server", orig_dst = %a.orig_dst))
6259
.push_request_filter(tcp::Accept::try_from)
6360
.check_new_service::<listen::Addrs, I>();
6461

linkerd/app/outbound/src/ingress.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ impl Outbound<()> {
118118
))
119119
.push(http::BoxResponse::layer()),
120120
)
121-
.check_new_service::<http::Accept, http::Request<_>>()
122121
.instrument(|a: &http::Accept| debug_span!("http", v = %a.protocol))
123-
.check_new_service::<http::Accept, http::Request<_>>()
124122
.push(http::NewServeHttp::layer(
125123
h2_settings,
126124
self.runtime.drain.clone(),
@@ -133,13 +131,13 @@ impl Outbound<()> {
133131
detect_protocol_timeout,
134132
http::DetectHttp::default(),
135133
))
136-
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
137134
.push(self.runtime.metrics.transport.layer_accept())
135+
.instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst))
138136
.push_request_filter(tcp::Accept::try_from)
139-
.check_new_service::<listen::Addrs, I>()
140137
// Boxing is necessary purely to limit the link-time overhead of
141138
// having enormous types.
142139
.push(svc::BoxNewService::layer())
140+
.check_new_service::<listen::Addrs, I>()
143141
.into_inner()
144142
}
145143
}

0 commit comments

Comments
 (0)