Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,7 @@ dependencies = [
"linkerd-io",
"linkerd-stack",
"linkerd-tracing",
"prometheus-client",
"thiserror 2.0.12",
"tokio",
"tokio-test",
Expand Down
10 changes: 7 additions & 3 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Config {
.push_on_service(http::BoxResponse::layer())
.arc_new_clone_http();

let inbound::DetectMetrics(detect_metrics) = metrics.detect.clone();
let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer({
Expand Down Expand Up @@ -177,9 +178,12 @@ impl Config {
)
.arc_new_tcp()
.lift_new_with_target()
.push(http::NewDetect::layer(svc::stack::CloneParam::from(
http::DetectParams { read_timeout: DETECT_TIMEOUT }
)))
.push(http::NewDetect::layer(move |tcp: &Tcp| {
http::DetectParams {
read_timeout: DETECT_TIMEOUT,
metrics: detect_metrics.metrics(tcp.policy.server_label())
}
}))
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
Tcp {
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ mod tests {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T>(msg: &'static str) -> svc::ArcNewTcp<T, io::DuplexStream> {
Expand Down
39 changes: 31 additions & 8 deletions linkerd/app/inbound/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use linkerd_app_core::{
identity, io,
metrics::ServerLabel,
metrics::{prom, ServerLabel},
proxy::http,
svc, tls,
transport::{
Expand All @@ -20,6 +20,10 @@ use tracing::info;
#[cfg(test)]
mod tests;

#[derive(Clone, Debug)]
pub struct MetricsFamilies(pub HttpDetectMetrics);
pub type HttpDetectMetrics = http::DetectMetricsFamilies<ServerLabel>;

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Forward {
client_addr: Remote<ClientAddr>,
Expand Down Expand Up @@ -61,7 +65,11 @@ type TlsIo<I> = tls::server::Io<identity::ServerIo<tls::server::DetectIo<I>>, I>
impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
/// Builds a stack that terminates mesh TLS and detects whether the traffic is HTTP (as hinted
/// by policy).
pub(crate) fn push_detect<T, I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<T, I>>
pub(crate) fn push_detect<T, I, F, FSvc>(
self,
MetricsFamilies(metrics): MetricsFamilies,
forward: F,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
T: svc::Param<OrigDstAddr> + svc::Param<Remote<ClientAddr>> + svc::Param<AllowPolicy>,
T: Clone + Send + 'static,
Expand All @@ -72,14 +80,18 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
FSvc::Error: Into<Error>,
FSvc::Future: Send,
{
self.push_detect_http(forward.clone())
self.push_detect_http(metrics, forward.clone())
.push_detect_tls(forward)
}

/// Builds a stack that handles HTTP detection once TLS detection has been performed. If the
/// connection is determined to be HTTP, the inner stack is used; otherwise the connection is
/// passed to the provided 'forward' stack.
fn push_detect_http<I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<Tls, I>>
fn push_detect_http<I, F, FSvc>(
self,
metrics: HttpDetectMetrics,
forward: F,
) -> Inbound<svc::ArcNewTcp<Tls, I>>
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
I: Debug + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -153,11 +165,12 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
forward.into_inner(),
)
.lift_new_with_target()
.push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| {
http::DetectParams {
.push(http::NewDetect::layer(
move |Detect { timeout, tls }: &Detect| http::DetectParams {
read_timeout: *timeout,
}
}))
metrics: metrics.metrics(tls.policy.server_label()),
},
))
.arc_new_tcp();

http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
Expand Down Expand Up @@ -445,3 +458,13 @@ impl<T> svc::InsertParam<tls::ConditionalServerTls, T> for TlsParams {
(tls, target)
}
}

// === impl MetricsFamilies ===

impl MetricsFamilies {
pub fn register(reg: &mut prom::Registry) -> Self {
Self(http::DetectMetricsFamilies::register(
reg.sub_registry_with_prefix("http"),
))
}
}
104 changes: 98 additions & 6 deletions linkerd/app/inbound/src/detect/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,46 @@ fn allow(protocol: Protocol) -> AllowPolicy {
allow
}

macro_rules! assert_contains_metric {
($registry:expr, $metric:expr) => {{
let mut buf = String::new();
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
assert!(
lines.iter().any(|l| *l.starts_with($metric)),
"metric '{}' not found in:\n{:?}",
$metric,
buf
);
}};
($registry:expr, $metric:expr, $value:expr) => {{
let mut buf = String::new();
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
assert_eq!(
lines.iter().find(|l| l.starts_with($metric)),
Some(&&*format!("{} {}", $metric, $value)),
"metric '{}' not found in:\n{:?}",
$metric,
buf
);
}};
}

macro_rules! assert_not_contains_metric {
($registry:expr, $pattern:expr) => {{
let mut buf = String::new();
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
assert!(
!lines.iter().any(|l| l.starts_with($pattern)),
"metric '{}' found in:\n{:?}",
$pattern,
buf
);
}};
}

#[tokio::test(flavor = "current_thread")]
async fn detect_tls_opaque() {
let _trace = trace::test::trace_init();
Expand Down Expand Up @@ -77,14 +117,21 @@ async fn detect_http_non_http() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(NOT_HTTP).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_panic("http stack must not be used"))
.push_detect_http(new_ok())
.push_detect_http(super::HttpDetectMetrics::register(&mut registry), new_ok())
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 1);
assert_contains_metric!(&registry, "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -108,14 +155,24 @@ async fn detect_http() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP1).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 1);
assert_contains_metric!(&registry, "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -134,14 +191,24 @@ async fn hinted_http1() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP1).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 1);
assert_contains_metric!(&registry, "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -160,14 +227,24 @@ async fn hinted_http1_supports_http2() {
let (ior, mut iow) = io::duplex(100);
iow.write_all(HTTP2).await.unwrap();

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

assert_contains_metric!(&registry, "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 1);
assert_contains_metric!(&registry, "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
assert_contains_metric!(&registry, "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}", 0);
}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -185,14 +262,25 @@ async fn hinted_http2() {

let (ior, _) = io::duplex(100);

let mut registry = prom::Registry::default();
inbound()
.with_stack(new_ok())
.push_detect_http(new_panic("tcp stack must not be used"))
.push_detect_http(
super::HttpDetectMetrics::register(&mut registry),
new_panic("tcp stack must not be used"),
)
.into_inner()
.new_service(target)
.oneshot(ior)
.await
.expect("should succeed");

// No detection is performed when HTTP/2 is hinted, so no metrics are recorded.
assert_not_contains_metric!(&registry, "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}");
assert_not_contains_metric!(&registry, "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}");
assert_not_contains_metric!(&registry, "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}");
assert_not_contains_metric!(&registry, "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}");
assert_not_contains_metric!(&registry, "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}");
}

fn client_id() -> tls::ClientId {
Expand All @@ -210,7 +298,11 @@ fn orig_dst_addr() -> OrigDstAddr {
}

fn inbound() -> Inbound<()> {
Inbound::new(test_util::default_config(), test_util::runtime().0)
Inbound::new(
test_util::default_config(),
test_util::runtime().0,
&mut Default::default(),
)
}

fn new_panic<T, I: 'static>(msg: &'static str) -> svc::ArcNewTcp<T, I> {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn build_server<I>(
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
{
Inbound::new(cfg, rt)
Inbound::new(cfg, rt, &mut Default::default())
.with_stack(connect)
.map_stack(|cfg, _, s| {
s.push_map_target(|t| Param::<Remote<ServerAddr>>::param(&t))
Expand Down
15 changes: 11 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
pub use self::{
detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy,
};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::SpanSink,
identity, io,
metrics::prom,
proxy::{tap, tcp},
svc,
transport::{self, Remote, ServerAddr},
Expand Down Expand Up @@ -148,9 +151,9 @@ impl<S> Inbound<S> {
}

impl Inbound<()> {
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
pub fn new(config: Config, runtime: ProxyRuntime, prom: &mut prom::Registry) -> Self {
let runtime = Runtime {
metrics: InboundMetrics::new(runtime.metrics),
metrics: InboundMetrics::new(runtime.metrics, prom),
identity: runtime.identity,
tap: runtime.tap,
span_sink: runtime.span_sink,
Expand All @@ -166,7 +169,11 @@ impl Inbound<()> {
#[cfg(any(test, feature = "test-util"))]
pub fn for_test() -> (Self, drain::Signal) {
let (rt, drain) = test_util::runtime();
let this = Self::new(test_util::default_config(), rt);
let this = Self::new(
test_util::default_config(),
rt,
&mut prom::Registry::default(),
);
(this, drain)
}

Expand Down
8 changes: 7 additions & 1 deletion linkerd/app/inbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ pub struct InboundMetrics {
/// Holds metrics that are common to both inbound and outbound proxies. These metrics are
/// reported separately
pub proxy: Proxy,

pub detect: crate::detect::MetricsFamilies,
}

impl InboundMetrics {
pub(crate) fn new(proxy: Proxy) -> Self {
pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self {
let detect =
crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect"));

Self {
http_authz: authz::HttpAuthzMetrics::default(),
http_errors: error::HttpErrorMetrics::default(),
tcp_authz: authz::TcpAuthzMetrics::default(),
tcp_errors: error::TcpErrorMetrics::default(),
proxy,
detect,
}
}
}
Expand Down
Loading
Loading