Skip to content

Commit 9f8e36c

Browse files
authored
feat(exporter): option to export process metrics at an interval (#52)
2 parents 7f2f7ae + adbef65 commit 9f8e36c

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

prometric-derive/tests/macro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ fn quantiles_with_batching_work() {
272272
encoder.encode(&metric_families, &mut buffer).unwrap();
273273
let output = String::from_utf8(buffer).unwrap();
274274

275-
println!("{}", output);
275+
println!("{output}");
276276

277277
assert!(output.contains("test_summary"));
278278
}

prometric/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ prometheus = { workspace = true }
1616
# Exporter
1717
hyper = { version = "1.7.0", optional = true, features = ["http1", "server"] }
1818
hyper-util = { version = "0.1.17", optional = true, features = ["tokio"] }
19-
tokio = { version = "1.40.0", optional = true, features = ["net", "rt"] }
19+
tokio = { version = "1.40.0", optional = true, features = ["net", "rt", "macros"] }
2020

2121
# Process
2222
sysinfo = { version = "0.37.2", optional = true }
2323

2424
# Summary
25-
arc-cell = {version = "0.3.3", optional = true }
25+
arc-cell = { version = "0.3.3", optional = true }
2626
metrics-util = { version = "0.20.0", optional = true }
2727
metrics-exporter-prometheus = { version = "0.17.2", optional = true }
2828
orx-concurrent-vec = { version = "3.10.0", optional = true }

prometric/src/exporter.rs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{net::SocketAddr, thread};
1+
use std::{net::SocketAddr, thread, time::Duration};
22

33
use hyper::{
44
Request, Response, body::Incoming, header::CONTENT_TYPE, server::conn::http1,
@@ -13,6 +13,7 @@ pub struct ExporterBuilder {
1313
address: String,
1414
path: String,
1515
global_prefix: Option<String>,
16+
process_metrics_poll_interval: Option<Duration>,
1617
}
1718

1819
impl Default for ExporterBuilder {
@@ -22,6 +23,7 @@ impl Default for ExporterBuilder {
2223
address: "0.0.0.0:9090".to_owned(),
2324
path: "/metrics".to_owned(),
2425
global_prefix: None,
26+
process_metrics_poll_interval: None,
2527
}
2628
}
2729
}
@@ -66,6 +68,15 @@ impl ExporterBuilder {
6668
self
6769
}
6870

71+
/// Also collect process metrics, polling at the given interval in the background.
72+
///
73+
/// A 10 second interval is a good default for most applications.
74+
#[cfg(feature = "process")]
75+
pub fn with_process_metrics(mut self, poll_interval: Duration) -> Self {
76+
self.process_metrics_poll_interval = Some(poll_interval);
77+
self
78+
}
79+
6980
fn path(&self) -> Result<String, ExporterError> {
7081
if self.path.is_empty() {
7182
return Err(ExporterError::InvalidPath(self.path.clone()));
@@ -101,18 +112,20 @@ impl ExporterBuilder {
101112
let address = self.address()?;
102113
let registry = self.registry.unwrap_or_else(|| prometheus::default_registry().clone());
103114

104-
// Build the serve function
115+
// Build the serve and process collection futures.
105116
let serve = serve(address, registry, path, self.global_prefix);
117+
let collect = collect_process_metrics(self.process_metrics_poll_interval);
118+
let fut = async { tokio::try_join!(serve, collect) };
106119

107120
// If a Tokio runtime is available, use it to spawn the listener. Otherwise,
108121
// create a new single-threaded runtime and spawn the listener there.
109122
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
110-
runtime.spawn(serve);
123+
runtime.spawn(fut);
111124
} else {
112125
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
113126

114127
thread::spawn(move || {
115-
runtime.block_on(serve).unwrap_or_else(|e| panic!("server error: {:?}", e));
128+
runtime.block_on(fut).unwrap_or_else(|e| panic!("server error: {e:?}"));
116129
});
117130
}
118131

@@ -176,6 +189,23 @@ async fn serve_req(
176189
Ok(response)
177190
}
178191

192+
/// If the "process" feature is enabled AND the poll interval is provided, collect
193+
/// process metrics at the given interval. Otherwise, no-op.
194+
///
195+
/// NOTE: the return type is Result to use [`tokio::try_join!`] with [`serve`].
196+
async fn collect_process_metrics(_poll_interval: Option<Duration>) -> Result<(), ExporterError> {
197+
#[cfg(feature = "process")]
198+
if let Some(interval) = _poll_interval {
199+
let mut collector = crate::process::ProcessCollector::default();
200+
loop {
201+
collector.collect();
202+
tokio::time::sleep(interval).await;
203+
}
204+
}
205+
206+
Ok(())
207+
}
208+
179209
/// An error that can occur when building or installing the Prometheus HTTP exporter.
180210
pub enum ExporterError {
181211
BindError(std::io::Error),
@@ -189,10 +219,10 @@ impl std::error::Error for ExporterError {}
189219
impl std::fmt::Display for ExporterError {
190220
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191221
match self {
192-
Self::BindError(e) => write!(f, "Failed to bind to address: {:?}", e),
193-
Self::ServeError(e) => write!(f, "HTTP server failed: {:?}", e),
194-
Self::InvalidPath(path) => write!(f, "Invalid path: {}", path),
195-
Self::InvalidAddress(address, e) => write!(f, "Invalid address: {}: {:?}", address, e),
222+
Self::BindError(e) => write!(f, "Failed to bind to address: {e:?}"),
223+
Self::ServeError(e) => write!(f, "HTTP server failed: {e:?}"),
224+
Self::InvalidPath(path) => write!(f, "Invalid path: {path}"),
225+
Self::InvalidAddress(address, e) => write!(f, "Invalid address: {address}: {e:?}"),
196226
}
197227
}
198228
}
@@ -205,6 +235,6 @@ impl From<std::io::Error> for ExporterError {
205235

206236
impl std::fmt::Debug for ExporterError {
207237
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208-
write!(f, "{}", self)
238+
write!(f, "{self}")
209239
}
210240
}

0 commit comments

Comments
 (0)