From ed7ac98a7903986438e1ca06310a342f36d2de73 Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Tue, 19 Aug 2025 19:24:46 -0400 Subject: [PATCH] Added support for the protobuf format for prometheus this is a pre-req for #601, because native histograms are only supported with protobufs --- Cargo.lock | 122 +++++-- metrics-exporter-prometheus/Cargo.toml | 9 + metrics-exporter-prometheus/build.rs | 6 + .../proto/metrics.proto | 157 +++++++++ .../src/exporter/http_listener.rs | 72 ++++- metrics-exporter-prometheus/src/lib.rs | 12 +- metrics-exporter-prometheus/src/protobuf.rs | 301 ++++++++++++++++++ metrics-exporter-prometheus/src/recorder.rs | 15 + .../tests/http_listener_integration_test.rs | 97 +++++- 9 files changed, 749 insertions(+), 42 deletions(-) create mode 100644 metrics-exporter-prometheus/build.rs create mode 100644 metrics-exporter-prometheus/proto/metrics.proto create mode 100644 metrics-exporter-prometheus/src/protobuf.rs diff --git a/Cargo.lock b/Cargo.lock index 6e2d8be7..75b723f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,7 +138,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -1020,7 +1020,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -1138,7 +1138,11 @@ dependencies = [ "ipnet", "metrics", "metrics-util", + "mime", "proptest", + "prost", + "prost-build", + "prost-types", "quanta", "rand 0.9.0", "thiserror", @@ -1232,6 +1236,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1449,7 +1459,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -1919,15 +1929,15 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ "bitflags 2.9.0", "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2243,7 +2253,7 @@ dependencies = [ "fastrand", "getrandom 0.3.2", "once_cell", - "rustix 1.0.5", + "rustix 1.0.8", "windows-sys 0.59.0", ] @@ -2682,9 +2692,9 @@ dependencies = [ [[package]] name = "windows-link" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] name = "windows-result" @@ -2710,7 +2720,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2719,7 +2729,16 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.3", ] [[package]] @@ -2728,14 +2747,31 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", ] [[package]] @@ -2744,48 +2780,96 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + [[package]] name = "winnow" version = "0.7.6" diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index 5a105724..f351d627 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -21,6 +21,7 @@ async-runtime = ["tokio", "hyper-util/tokio"] http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"] uds-listener = ["http-listener"] push-gateway = ["async-runtime", "tracing", "_hyper-client"] +protobuf = ["mime", "prost", "prost-types", "prost-build"] _hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"] _hyper-client = [ "http-body-util", @@ -52,12 +53,20 @@ thiserror = { workspace = true } tokio = { workspace = true, optional = true } tracing = { workspace = true, optional = true } +# Protobuf support +mime = { version = "0.3", optional = true } +prost = { workspace = true, optional = true } +prost-types = { workspace = true, optional = true } + [dev-dependencies] proptest = { workspace = true } rand = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["fmt"] } +[build-dependencies] +prost-build = { workspace = true, optional = true } + [[example]] name = "prometheus_push_gateway" required-features = ["push-gateway"] diff --git a/metrics-exporter-prometheus/build.rs b/metrics-exporter-prometheus/build.rs new file mode 100644 index 00000000..cbe17e43 --- /dev/null +++ b/metrics-exporter-prometheus/build.rs @@ -0,0 +1,6 @@ +fn main() { + #[cfg(feature = "protobuf")] + { + prost_build::compile_protos(&["proto/metrics.proto"], &["proto/"]).unwrap(); + } +} diff --git a/metrics-exporter-prometheus/proto/metrics.proto b/metrics-exporter-prometheus/proto/metrics.proto new file mode 100644 index 00000000..3e9168e6 --- /dev/null +++ b/metrics-exporter-prometheus/proto/metrics.proto @@ -0,0 +1,157 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package io.prometheus.client; +option java_package = "io.prometheus.client"; +option go_package = "github.com/prometheus/client_model/go;io_prometheus_client"; + +import "google/protobuf/timestamp.proto"; + +message LabelPair { + optional string name = 1; + optional string value = 2; +} + +enum MetricType { + // COUNTER must use the Metric field "counter". + COUNTER = 0; + // GAUGE must use the Metric field "gauge". + GAUGE = 1; + // SUMMARY must use the Metric field "summary". + SUMMARY = 2; + // UNTYPED must use the Metric field "untyped". + UNTYPED = 3; + // HISTOGRAM must use the Metric field "histogram". + HISTOGRAM = 4; + // GAUGE_HISTOGRAM must use the Metric field "histogram". + GAUGE_HISTOGRAM = 5; +} + +message Gauge { + optional double value = 1; +} + +message Counter { + optional double value = 1; + optional Exemplar exemplar = 2; + + optional google.protobuf.Timestamp created_timestamp = 3; +} + +message Quantile { + optional double quantile = 1; + optional double value = 2; +} + +message Summary { + optional uint64 sample_count = 1; + optional double sample_sum = 2; + repeated Quantile quantile = 3; + + optional google.protobuf.Timestamp created_timestamp = 4; +} + +message Untyped { + optional double value = 1; +} + +message Histogram { + optional uint64 sample_count = 1; + optional double sample_count_float = 4; // Overrides sample_count if > 0. + optional double sample_sum = 2; + // Buckets for the conventional histogram. + repeated Bucket bucket = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional. + + optional google.protobuf.Timestamp created_timestamp = 15; + + // Everything below here is for native histograms (also known as sparse histograms). + // Native histograms are an experimental feature without stability guarantees. + + // schema defines the bucket schema. Currently, valid numbers are -4 <= n <= 8. + // They are all for base-2 bucket schemas, where 1 is a bucket boundary in each case, and + // then each power of two is divided into 2^n logarithmic buckets. + // Or in other words, each bucket boundary is the previous boundary times 2^(2^-n). + // In the future, more bucket schemas may be added using numbers < -4 or > 8. + optional sint32 schema = 5; + optional double zero_threshold = 6; // Breadth of the zero bucket. + optional uint64 zero_count = 7; // Count in zero bucket. + optional double zero_count_float = 8; // Overrides sb_zero_count if > 0. + + // Negative buckets for the native histogram. + repeated BucketSpan negative_span = 9; + // Use either "negative_delta" or "negative_count", the former for + // regular histograms with integer counts, the latter for float + // histograms. + repeated sint64 negative_delta = 10; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + repeated double negative_count = 11; // Absolute count of each bucket. + + // Positive buckets for the native histogram. + // Use a no-op span (offset 0, length 0) for a native histogram without any + // observations yet and with a zero_threshold of 0. Otherwise, it would be + // indistinguishable from a classic histogram. + repeated BucketSpan positive_span = 12; + // Use either "positive_delta" or "positive_count", the former for + // regular histograms with integer counts, the latter for float + // histograms. + repeated sint64 positive_delta = 13; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + repeated double positive_count = 14; // Absolute count of each bucket. + + // Only used for native histograms. These exemplars MUST have a timestamp. + repeated Exemplar exemplars = 16; +} + +// A Bucket of a conventional histogram, each of which is treated as +// an individual counter-like time series by Prometheus. +message Bucket { + optional uint64 cumulative_count = 1; // Cumulative in increasing order. + optional double cumulative_count_float = 4; // Overrides cumulative_count if > 0. + optional double upper_bound = 2; // Inclusive. + optional Exemplar exemplar = 3; +} + +// A BucketSpan defines a number of consecutive buckets in a native +// histogram with their offset. Logically, it would be more +// straightforward to include the bucket counts in the Span. However, +// the protobuf representation is more compact in the way the data is +// structured here (with all the buckets in a single array separate +// from the Spans). +message BucketSpan { + optional sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative). + optional uint32 length = 2; // Length of consecutive buckets. +} + +message Exemplar { + repeated LabelPair label = 1; + optional double value = 2; + optional google.protobuf.Timestamp timestamp = 3; // OpenMetrics-style. +} + +message Metric { + repeated LabelPair label = 1; + optional Gauge gauge = 2; + optional Counter counter = 3; + optional Summary summary = 4; + optional Untyped untyped = 5; + optional Histogram histogram = 7; + optional int64 timestamp_ms = 6; +} + +message MetricFamily { + optional string name = 1; + optional string help = 2; + optional MetricType type = 3; + repeated Metric metric = 4; + optional string unit = 5; +} diff --git a/metrics-exporter-prometheus/src/exporter/http_listener.rs b/metrics-exporter-prometheus/src/exporter/http_listener.rs index 0185fa0d..c65b2a64 100644 --- a/metrics-exporter-prometheus/src/exporter/http_listener.rs +++ b/metrics-exporter-prometheus/src/exporter/http_listener.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use http_body_util::Full; +#[cfg(feature = "protobuf")] +use hyper::header::ACCEPT; use hyper::{ body::{Bytes, Incoming}, header::{HeaderValue, CONTENT_TYPE}, @@ -129,25 +131,69 @@ impl HttpListeningExporter { handle: PrometheusHandle, req: Request, ) -> Result>, hyper::Error> { - if is_allowed { - let mut response = Response::new(match req.uri().path() { - "/health" => "OK".into(), - _ => tokio::task::spawn_blocking(move || handle.render()).await.unwrap().into(), - }); - response.headers_mut().append(CONTENT_TYPE, HeaderValue::from_static("text/plain")); - Ok(response) - } else { - // This unwrap should not fail because we don't use any function that - // can assign an Err to it's inner such as `Builder::header``. A unit test - // will have to suffice to detect if this fails to hold true. - Ok(Response::builder() + if !is_allowed { + return Ok(Response::builder() .status(StatusCode::FORBIDDEN) .body(Full::::default()) - .unwrap()) + .unwrap()); + } + + if req.uri().path() == "/health" { + let mut response = Response::new("OK".into()); + response.headers_mut().append(CONTENT_TYPE, HeaderValue::from_static("text/plain")); + return Ok(response); + } + + // Check content negotiation for metrics endpoint + let response_format = Self::negotiate_content_type(&req); + let (body, content_type) = match response_format { + #[cfg(feature = "protobuf")] + ResponseFormat::Protobuf => { + let data = + tokio::task::spawn_blocking(move || handle.render_protobuf()).await.unwrap(); + (data.into(), crate::protobuf::PROTOBUF_CONTENT_TYPE) + } + ResponseFormat::Text => { + let data = tokio::task::spawn_blocking(move || handle.render()).await.unwrap(); + (data.into(), "text/plain") + } + }; + + let mut response = Response::new(body); + response.headers_mut().append(CONTENT_TYPE, HeaderValue::from_static(content_type)); + Ok(response) + } + + fn negotiate_content_type(req: &Request) -> ResponseFormat { + #[cfg(feature = "protobuf")] + { + let accept_header = + req.headers().get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or(""); + + for mime_type in mime::MimeIter::new(accept_header).flatten() { + if mime_type.type_() == "application" + && (mime_type.subtype() == "vnd.google.protobuf" + || mime_type.subtype() == "x-protobuf") + { + return ResponseFormat::Protobuf; + } + } } + + #[cfg(not(feature = "protobuf"))] + let _ = req; + + ResponseFormat::Text } } +#[derive(Debug, Clone, Copy)] +enum ResponseFormat { + Text, + #[cfg(feature = "protobuf")] + Protobuf, +} + /// Creates an `ExporterFuture` implementing a http listener that serves prometheus metrics. /// /// # Errors diff --git a/metrics-exporter-prometheus/src/lib.rs b/metrics-exporter-prometheus/src/lib.rs index 2240808c..b96d7b9f 100644 --- a/metrics-exporter-prometheus/src/lib.rs +++ b/metrics-exporter-prometheus/src/lib.rs @@ -14,6 +14,7 @@ //! quantiles/buckets //! - ability to control bucket configuration on a per-metric basis //! - configurable global labels (applied to all metrics, overridden by metric's own labels if present) +//! - protobuf format support with automatic content negotiation //! //! ## Behavior //! @@ -81,11 +82,16 @@ //! //! ## Features //! -//! Two main feature flags control which modes that exporter can run in: +//! Three main feature flags control the capabilities of the exporter: //! - **`http-listener`**: allows running the exporter as a scrape endpoint (_enabled by default_) //! - **`push-gateway`**: allows running the exporter in push gateway mode (_enabled by default_) +//! - **`protobuf`**: enables Prometheus protobuf format support with automatic content negotiation //! -//! Neither of these flags are required to create, or install, only a recorder. However, in order to create or build an +//! For the HTTP listener mode, the exporter automatically detects the requested format based on the `Accept` header: +//! - Text format (default): `text/plain` +//! - Protobuf format: `application/vnd.google.protobuf` or `application/x-protobuf` +//! +//! Neither of the mode flags are required to create, or install, only a recorder. However, in order to create or build an //! exporter, at least one of these feature flags must be enabled. Builder methods that require certain feature flags //! will be documented as such. //! @@ -125,6 +131,8 @@ pub use self::exporter::builder::PrometheusBuilder; pub use self::exporter::ExporterFuture; pub mod formatting; +#[cfg(feature = "protobuf")] +pub mod protobuf; mod recorder; mod registry; diff --git a/metrics-exporter-prometheus/src/protobuf.rs b/metrics-exporter-prometheus/src/protobuf.rs new file mode 100644 index 00000000..99feba69 --- /dev/null +++ b/metrics-exporter-prometheus/src/protobuf.rs @@ -0,0 +1,301 @@ +//! Protobuf serialization support for Prometheus metrics. + +use indexmap::IndexMap; +use metrics::Unit; +use prost::Message; +use std::collections::HashMap; + +use crate::common::Snapshot; +use crate::distribution::Distribution; +use crate::formatting::sanitize_metric_name; + +// Include the generated protobuf code +mod pb { + #![allow(missing_docs, clippy::trivially_copy_pass_by_ref, clippy::doc_markdown)] + include!(concat!(env!("OUT_DIR"), "/io.prometheus.client.rs")); +} + +#[cfg(feature = "http-listener")] +pub(crate) const PROTOBUF_CONTENT_TYPE: &str = + "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited"; + +/// Renders a snapshot of metrics into protobuf format using length-delimited encoding. +/// +/// This function takes a snapshot of metrics and converts them into the Prometheus +/// protobuf wire format, where each `MetricFamily` message is prefixed with a varint +/// length header. +#[allow(clippy::too_many_lines)] +pub(crate) fn render_protobuf( + snapshot: &Snapshot, + descriptions: &HashMap)>, + global_labels: &IndexMap, + counter_suffix: Option<&'static str>, +) -> Vec { + let mut output = Vec::new(); + + // Process counters + for (name, by_labels) in &snapshot.counters { + let sanitized_name = sanitize_metric_name(name); + let help = + descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default(); + + let mut metrics = Vec::new(); + for (labels, value) in by_labels { + let label_pairs = parse_labels(labels, global_labels); + + metrics.push(pb::Metric { + label: label_pairs, + counter: Some(pb::Counter { + #[allow(clippy::cast_precision_loss)] + value: Some(*value as f64), + + ..Default::default() + }), + + ..Default::default() + }); + } + + let metric_family = pb::MetricFamily { + name: Some(add_suffix_to_name(&sanitized_name, counter_suffix)), + help: if help.is_empty() { None } else { Some(help) }, + r#type: Some(pb::MetricType::Counter as i32), + metric: metrics, + unit: None, + }; + + metric_family.encode_length_delimited(&mut output).unwrap(); + } + + // Process gauges + for (name, by_labels) in &snapshot.gauges { + let sanitized_name = sanitize_metric_name(name); + let help = + descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default(); + + let mut metrics = Vec::new(); + for (labels, value) in by_labels { + let label_pairs = parse_labels(labels, global_labels); + + metrics.push(pb::Metric { + label: label_pairs, + gauge: Some(pb::Gauge { value: Some(*value) }), + + ..Default::default() + }); + } + + let metric_family = pb::MetricFamily { + name: Some(sanitized_name), + help: if help.is_empty() { None } else { Some(help) }, + r#type: Some(pb::MetricType::Gauge as i32), + metric: metrics, + unit: None, + }; + + metric_family.encode_length_delimited(&mut output).unwrap(); + } + + // Process distributions (histograms and summaries) + for (name, by_labels) in &snapshot.distributions { + let sanitized_name = sanitize_metric_name(name); + let help = + descriptions.get(name.as_str()).map(|(desc, _)| desc.to_string()).unwrap_or_default(); + + let mut metrics = Vec::new(); + for (labels, distribution) in by_labels { + let label_pairs = parse_labels(labels, global_labels); + + let metric = match distribution { + Distribution::Summary(summary, quantiles, sum) => { + use quanta::Instant; + let snapshot = summary.snapshot(Instant::now()); + let quantile_values: Vec = quantiles + .iter() + .map(|q| pb::Quantile { + quantile: Some(q.value()), + value: Some(snapshot.quantile(q.value()).unwrap_or(0.0)), + }) + .collect(); + + pb::Metric { + label: label_pairs, + summary: Some(pb::Summary { + sample_count: Some(summary.count() as u64), + sample_sum: Some(*sum), + quantile: quantile_values, + + created_timestamp: None, + }), + + ..Default::default() + } + } + Distribution::Histogram(histogram) => { + let mut buckets = Vec::new(); + for (le, count) in histogram.buckets() { + buckets.push(pb::Bucket { + cumulative_count: Some(count), + upper_bound: Some(le), + + ..Default::default() + }); + } + // Add +Inf bucket + buckets.push(pb::Bucket { + cumulative_count: Some(histogram.count()), + upper_bound: Some(f64::INFINITY), + + ..Default::default() + }); + + pb::Metric { + label: label_pairs, + histogram: Some(pb::Histogram { + sample_count: Some(histogram.count()), + sample_sum: Some(histogram.sum()), + bucket: buckets, + + ..Default::default() + }), + + ..Default::default() + } + } + }; + + metrics.push(metric); + } + + let metric_type = match by_labels.values().next() { + Some(Distribution::Summary(_, _, _)) => pb::MetricType::Summary, + Some(Distribution::Histogram(_)) => pb::MetricType::Histogram, + None => continue, // Skip empty metric families + }; + + let metric_family = pb::MetricFamily { + name: Some(sanitized_name), + help: if help.is_empty() { None } else { Some(help) }, + r#type: Some(metric_type as i32), + metric: metrics, + unit: None, + }; + + metric_family.encode_length_delimited(&mut output).unwrap(); + } + + output +} + +fn parse_labels(labels: &[String], global_labels: &IndexMap) -> Vec { + let mut label_pairs = Vec::new(); + + // Add global labels first + for (key, value) in global_labels { + label_pairs.push(pb::LabelPair { name: Some(key.clone()), value: Some(value.clone()) }); + } + + // Add metric-specific labels + for label_str in labels { + if let Some(eq_pos) = label_str.find('=') { + let key = &label_str[..eq_pos]; + let value = &label_str[eq_pos + 1..]; + let value = value.trim_matches('"'); + + // Skip if this label key already exists from global labels + if !global_labels.contains_key(key) { + label_pairs.push(pb::LabelPair { + name: Some(key.to_string()), + value: Some(value.to_string()), + }); + } + } + } + + label_pairs +} + +fn add_suffix_to_name(name: &str, suffix: Option<&'static str>) -> String { + match suffix { + Some(suffix) if !name.ends_with(suffix) => format!("{name}_{suffix}"), + _ => name.to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::Snapshot; + use indexmap::IndexMap; + use metrics::SharedString; + use prost::Message; + use std::collections::HashMap; + + #[test] + fn test_render_protobuf_counters() { + let mut counters = HashMap::new(); + let mut counter_labels = HashMap::new(); + counter_labels.insert(vec!["method=\"GET\"".to_string()], 42u64); + counters.insert("http_requests".to_string(), counter_labels); + + let snapshot = Snapshot { counters, gauges: HashMap::new(), distributions: HashMap::new() }; + + let descriptions = HashMap::new(); + let global_labels = IndexMap::new(); + + let protobuf_data = + render_protobuf(&snapshot, &descriptions, &global_labels, Some("total")); + + assert!(!protobuf_data.is_empty(), "Protobuf data should not be empty"); + + // Parse the protobuf response to verify it's correct + let metric_family = pb::MetricFamily::decode_length_delimited(&protobuf_data[..]).unwrap(); + + assert_eq!(metric_family.name.as_ref().unwrap(), "http_requests_total"); + assert_eq!(metric_family.r#type.unwrap(), pb::MetricType::Counter as i32); + assert_eq!(metric_family.metric.len(), 1); + + let metric = &metric_family.metric[0]; + assert!(metric.counter.is_some()); + assert_eq!(metric.counter.as_ref().unwrap().value.unwrap(), 42.0); + } + + #[test] + fn test_render_protobuf_gauges() { + let mut gauges = HashMap::new(); + let mut gauge_labels = HashMap::new(); + gauge_labels.insert(vec!["instance=\"localhost\"".to_string()], 0.75f64); + gauges.insert("cpu_usage".to_string(), gauge_labels); + + let snapshot = Snapshot { counters: HashMap::new(), gauges, distributions: HashMap::new() }; + + let mut descriptions = HashMap::new(); + descriptions.insert( + "cpu_usage".to_string(), + (SharedString::const_str("CPU usage percentage"), None), + ); + let global_labels = IndexMap::new(); + + let protobuf_data = render_protobuf(&snapshot, &descriptions, &global_labels, None); + + assert!(!protobuf_data.is_empty(), "Protobuf data should not be empty"); + + // Parse the protobuf response to verify it's correct + let metric_family = pb::MetricFamily::decode_length_delimited(&protobuf_data[..]).unwrap(); + + assert_eq!(metric_family.name.as_ref().unwrap(), "cpu_usage"); + assert_eq!(metric_family.r#type.unwrap(), pb::MetricType::Gauge as i32); + assert_eq!(metric_family.help.as_ref().unwrap(), "CPU usage percentage"); + + let metric = &metric_family.metric[0]; + assert!(metric.gauge.is_some()); + assert_eq!(metric.gauge.as_ref().unwrap().value.unwrap(), 0.75); + } + + #[test] + fn test_add_suffix_to_name() { + assert_eq!(add_suffix_to_name("requests", Some("total")), "requests_total"); + assert_eq!(add_suffix_to_name("requests_total", Some("total")), "requests_total"); + assert_eq!(add_suffix_to_name("requests", None), "requests"); + } +} diff --git a/metrics-exporter-prometheus/src/recorder.rs b/metrics-exporter-prometheus/src/recorder.rs index 64afa645..04101558 100644 --- a/metrics-exporter-prometheus/src/recorder.rs +++ b/metrics-exporter-prometheus/src/recorder.rs @@ -325,6 +325,21 @@ impl PrometheusHandle { self.inner.render() } + /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to + /// the Prometheus protobuf format. + #[cfg(feature = "protobuf")] + pub fn render_protobuf(&self) -> Vec { + let snapshot = self.inner.get_recent_metrics(); + let descriptions = self.inner.descriptions.read().unwrap_or_else(PoisonError::into_inner); + + crate::protobuf::render_protobuf( + &snapshot, + &descriptions, + &self.inner.global_labels, + self.inner.counter_suffix, + ) + } + /// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not /// grow unboundedly. pub fn run_upkeep(&self) { diff --git a/metrics-exporter-prometheus/tests/http_listener_integration_test.rs b/metrics-exporter-prometheus/tests/http_listener_integration_test.rs index ffbd3efa..8ab5ec97 100644 --- a/metrics-exporter-prometheus/tests/http_listener_integration_test.rs +++ b/metrics-exporter-prometheus/tests/http_listener_integration_test.rs @@ -3,6 +3,7 @@ mod http_listener_test { use http_body_util::{BodyExt, Collected, Empty}; use hyper::{ body::{Buf, Bytes}, + header::{ACCEPT, CONTENT_TYPE}, Request, StatusCode, Uri, }; use hyper_util::client::legacy::{connect::HttpConnector, Client}; @@ -45,10 +46,78 @@ mod http_listener_test { .parse::() .unwrap_or_else(|e| panic!("Error parsing URI: {:?}", e)); - let (status, body) = read_from(uri).await; + let (status, body, _) = read_from(uri, None).await; assert_eq!(status, StatusCode::OK); - assert!(body.contains("basic_gauge{wutang=\"forever\"} -1.23")); + assert!(String::from_utf8(body) + .unwrap() + .contains("basic_gauge{wutang=\"forever\"} -1.23")); + }); + } + + #[cfg(feature = "protobuf")] + #[test] + fn test_http_listener_protobuf() { + use prost::Message; + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap_or_else(|e| panic!("Failed to create test runtime: {:?}", e)); + + runtime.block_on(async { + let local = [127, 0, 0, 1]; + let port = get_available_port(local).await; + let socket_address = SocketAddr::from((local, port)); + + let (recorder, exporter) = { + PrometheusBuilder::new().with_http_listener(socket_address).build().unwrap_or_else( + |e| panic!("failed to create Prometheus recorder and http listener: {:?}", e), + ) + }; + + let labels = vec![Label::new("test", "protobuf")]; + let key = Key::from_parts("test_gauge", labels); + let gauge = recorder.register_gauge(&key, &METADATA); + gauge.set(42.0); + + runtime.spawn(exporter); + tokio::time::sleep(Duration::from_millis(200)).await; + + let uri = format!("http://{socket_address}") + .parse::() + .unwrap_or_else(|e| panic!("Error parsing URI: {:?}", e)); + + // Test protobuf content negotiation + let (status, body, content_type) = + read_from(uri, Some("application/vnd.google.protobuf")).await; + + assert_eq!(status, StatusCode::OK); + assert!(content_type.contains("application/vnd.google.protobuf")); + assert!(!body.is_empty(), "Protobuf response should not be empty"); + + // Parse the protobuf response to verify it's correct + let mut cursor = std::io::Cursor::new(&body); + + // Include the generated protobuf types for testing + mod pb { + include!(concat!(env!("OUT_DIR"), "/io.prometheus.client.rs")); + } + + let metric_family = pb::MetricFamily::decode_length_delimited(&mut cursor) + .expect("Failed to decode protobuf response"); + + assert_eq!(metric_family.name.as_ref().unwrap(), "test_gauge"); + assert_eq!(metric_family.r#type.unwrap(), pb::MetricType::Gauge as i32); + assert_eq!(metric_family.metric.len(), 1); + + let metric = &metric_family.metric[0]; + assert!(metric.gauge.is_some()); + assert_eq!(metric.gauge.as_ref().unwrap().value.unwrap(), 42.0); + + assert_eq!(metric.label.len(), 1); + assert_eq!(metric.label[0].name.as_ref().unwrap(), "test"); + assert_eq!(metric.label[0].value.as_ref().unwrap(), "protobuf"); }); } @@ -64,12 +133,18 @@ mod http_listener_test { .port() } - async fn read_from(endpoint: Uri) -> (StatusCode, String) { + async fn read_from( + endpoint: Uri, + accept_header: Option<&str>, + ) -> (StatusCode, Vec, String) { let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); - let req = Request::builder() - .uri(endpoint.to_string()) + let req = Request::builder().uri(endpoint.to_string()); + + let req = if let Some(accept) = accept_header { req.header(ACCEPT, accept) } else { req }; + + let req = req .body(Empty::::new()) .unwrap_or_else(|e| panic!("Failed building request: {:?}", e)); @@ -79,6 +154,13 @@ mod http_listener_test { .unwrap_or_else(|e| panic!("Failed requesting data from {endpoint}: {:?}", e)); let status = response.status(); + let content_type = response + .headers() + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + let mut body = response .into_body() .collect() @@ -86,9 +168,8 @@ mod http_listener_test { .map(Collected::aggregate) .unwrap_or_else(|e| panic!("Error reading response: {:?}", e)); - let body_string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec()) - .unwrap_or_else(|e| panic!("Error decoding response body: {:?}", e)); + let body_bytes = body.copy_to_bytes(body.remaining()).to_vec(); - (status, body_string) + (status, body_bytes, content_type) } }