Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ categories = ["development-tools::debugging"]
keywords = ["metrics", "telemetry", "prometheus"]

[features]
default = ["http-listener", "push-gateway"]
default = ["http-listener", "push-gateway","remote-write"]
async-runtime = ["tokio", "hyper-util/tokio"]
http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"]
uds-listener = ["http-listener"]
push-gateway = ["async-runtime", "tracing", "_hyper-client"]
remote-write = ["_hyper-client","async-runtime","dep:prost","dep:snap","dep:prometheus-parse"]
_hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"]
_hyper-client = [
"http-body-util",
Expand All @@ -48,7 +49,10 @@ metrics-util = { version = "^0.18", path = "../metrics-util", default-features =
"registry",
"summary",
] }
prometheus-parse = {version = "0.2.5", optional = true}
prost = {workspace = true, optional = true}
quanta = { workspace = true }
snap = { version = "1.1.1", optional = true}
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
Expand Down
73 changes: 73 additions & 0 deletions metrics-exporter-prometheus/examples/prometheus_remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/// Make sure to run this example with `--features remote-write` to properly enable remote write support.
#[allow(unused_imports)]
use std::thread;
use std::time::Duration;

#[allow(unused_imports)]
use metrics::{counter, gauge, histogram};
use metrics::{describe_counter, describe_histogram};
#[allow(unused_imports)]
use metrics_exporter_prometheus::PrometheusBuilder;
#[allow(unused_imports)]
use metrics_util::MetricKindMask;

use quanta::Clock;
use rand::{thread_rng, Rng};

fn main() {
tracing_subscriber::fmt::init();

PrometheusBuilder::new()
.with_remote_write(
"http://127.0.0.1:9091/metrics/job/example",
Duration::from_secs(10),
"test-agent",
)
.expect("remote write endpoint should be valid")
.idle_timeout(
MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM,
Some(Duration::from_secs(10)),
)
.install()
.expect("failed to install Prometheus recorder");

// We register these metrics, which gives us a chance to specify a description for them. The
// Prometheus exporter records this description and adds it as HELP text when the endpoint is
// scraped.
//
// Registering metrics ahead of using them is not required, but is the only way to specify the
// description of a metric.
describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far.");
describe_histogram!(
"tcp_server_loop_delta_secs",
"The time taken for iterations of the TCP server event loop."
);

let clock = Clock::new();
let mut last = None;

counter!("idle_metric").increment(1);
gauge!("testing").set(42.0);

// Loop over and over, pretending to do some work.
loop {
counter!("tcp_server_loops", "system" => "foo").increment(1);

if let Some(t) = last {
let delta: Duration = clock.now() - t;
histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta);
}

let increment_gauge = thread_rng().gen_bool(0.75);
let gauge = gauge!("lucky_iterations");
if increment_gauge {
gauge.increment(1.0);
} else {
gauge.decrement(1.0);
}

last = Some(clock.now());

thread::sleep(Duration::from_millis(750));
}
}
4 changes: 3 additions & 1 deletion metrics-exporter-prometheus/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub enum BuildError {
/// The given push gateway endpoint is not a valid URI.
#[error("push gateway endpoint is not valid: {0}")]
InvalidPushGatewayEndpoint(String),

/// The given push gateway endpoint is not a valid URI.
#[error("remote write endpoint is not valid: {0}")]
InvalidRemoteWriteEndpoint(String),
/// No exporter configuration was present.
///
/// This generally only occurs when HTTP listener support is disabled, but no push gateway
Expand Down
35 changes: 35 additions & 0 deletions metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,37 @@ impl PrometheusBuilder {
Ok(self)
}

/// Configures the exporter to push periodic requests to endpoint by [remote write protocol](https://prometheus.io/docs/specs/remote_write_spec/).
///
/// Running in remote write mode is mutually exclusive with the HTTP listener/push gateway i.e. enabling the remote write will
/// disable the HTTP listener/push gateway, and vise versa.
///
/// Defaults to disabled.
///
/// ## Errors
///
/// If the given endpoint cannot be parsed into a valid URI, an error variant will be returned describing the error.
///
#[cfg(feature = "remote-write")]
#[cfg_attr(docsrs, doc(cfg(feature = "remote-write")))]
pub fn with_remote_write<T>(
mut self,
endpoint: T,
interval: Duration,
user_agent: &str,
) -> Result<Self, BuildError>
where
T: AsRef<str>,
{
self.exporter_config = ExporterConfig::RemoteWrite {
endpoint: Uri::try_from(endpoint.as_ref())
.map_err(|e| BuildError::InvalidRemoteWriteEndpoint(e.to_string()))?,
interval,
user_agent: user_agent.to_string(),
};
Ok(self)
}

/// Configures the exporter to expose an HTTP listener that functions as a [scrape endpoint], listening on a Unix
/// Domain socket at the given path
///
Expand Down Expand Up @@ -486,6 +517,10 @@ impl PrometheusBuilder {
endpoint, interval, username, password, handle,
)
}
#[cfg(feature = "remote-write")]
ExporterConfig::RemoteWrite { endpoint, interval, user_agent } => {
super::remote_write::new_remote_write(endpoint, interval, handle, &user_agent)
}
},
))
}
Expand Down
12 changes: 12 additions & 0 deletions metrics-exporter-prometheus/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum ExporterConfig {
password: Option<String>,
},

// Run a remote write task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
#[cfg(feature = "remote-write")]
RemoteWrite { endpoint: Uri, interval: Duration, user_agent: String },

#[allow(dead_code)]
Unconfigured,
}
Expand All @@ -60,6 +65,8 @@ impl ExporterConfig {
Self::HttpListener { .. } => "http-listener",
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
#[cfg(feature = "remote-write")]
Self::RemoteWrite { .. } => "remote-write",
Self::Unconfigured => "unconfigured,",
}
}
Expand All @@ -71,4 +78,9 @@ mod http_listener;
#[cfg(feature = "push-gateway")]
mod push_gateway;

#[cfg(feature = "remote-write")]
mod remote_write;
#[cfg(feature = "remote-write")]
mod remote_write_proto;

pub(crate) mod builder;
75 changes: 75 additions & 0 deletions metrics-exporter-prometheus/src/exporter/remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::time::Duration;

use http_body_util::{BodyExt, Collected, Full};
use hyper::{body::Bytes, Uri};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use tracing::error;

use crate::PrometheusHandle;

use super::{remote_write_proto::WriteRequest, ExporterFuture};

// Creates an ExporterFuture implementing a remote write.
pub(super) fn new_remote_write(
endpoint: Uri,
interval: Duration,
handle: PrometheusHandle,
user_agent: &str,
) -> ExporterFuture {
let user_agent = user_agent.to_string();
Box::pin(async move {
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("no native root CA certificates found")
.https_or_http()
.enable_http1()
.build();
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(30))
.build(https);

loop {
// Sleep for `interval` amount of time, and then do a push.
tokio::time::sleep(interval).await;

let output = handle.render();
let binary = match WriteRequest::from_text_format(output) {
Ok(req) => req,
Err(err) => {
error!("failed to build output to remote write request: {}", err);
continue;
}
};

let req = match binary.build_http_request(&endpoint, &user_agent) {
Ok(req) => req,
Err(err) => {
error!("failed to build http remote write request {}", err);
continue;
}
};
match client.request(req).await {
Ok(response) => {
if !response.status().is_success() {
let status = response.status();
let status = status.canonical_reason().unwrap_or_else(|| status.as_str());
let body = response
.into_body()
.collect()
.await
.map(Collected::to_bytes)
.map_err(|_| ())
.and_then(|b| String::from_utf8(b[..].to_vec()).map_err(|_| ()))
.unwrap_or_else(|()| String::from("<failed to read response body>"));
error!(
message = "unexpected status after pushing metrics to remote write",
status,
%body,
);
}
}
Err(e) => error!("error sending request to remote write {}: {:?}", endpoint, e),
}
}
})
}
Loading