Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
//! Metrics of the thread pool.

use lazy_static::lazy_static;
use prometheus::core::{Collector, Desc, Metric, MetricVec, MetricVecBuilder};
use prometheus::*;
use std::sync::Mutex;

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

lazy_static! {
/// Elapsed time of each level in the multilevel task queue.
Expand Down Expand Up @@ -71,6 +74,16 @@ lazy_static! {
)
.unwrap();

/// Max enqueue throughput of the global task injector (QueueCore) since last scrape.
pub static ref QUEUE_CORE_BURST_THROUGHPUT: MaxGaugeVec = MaxGaugeVec::new(
new_opts(
"yatp_queue_core_burst_throughput",
"max enqueue throughput (tasks/sec) of the global task injector since last scrape"
),
&["name"]
)
.unwrap();

static ref NAMESPACE: Mutex<Option<String>> = Mutex::new(None);
}

Expand Down Expand Up @@ -98,3 +111,182 @@ fn new_histogram_opts(name: &str, help: &str, buckets: Vec<f64>) -> HistogramOpt

opts
}

/// A gauge that tracks the maximum value since the last scrape.
#[derive(Clone, Debug)]
pub struct MaxGauge {
gauge: Gauge,
max_val: Arc<AtomicU64>,
}

impl MaxGauge {
/// Wraps a `Gauge` to create a `MaxGauge`. The `Gauge` should not be used directly after being wrapped, otherwise
/// the maximum tracking will be broken.
pub fn wrap(gauge: Gauge) -> Self {
let val = gauge.get().to_bits();
Self {
gauge,
max_val: Arc::new(AtomicU64::new(val)),
}
}
Comment on lines +124 to +132
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxGauge::wrap seeds max_val from the wrapped Gauge's current value (typically 0). That makes the max tracking incorrect for metrics that can legitimately be negative (e.g. observing -1.0 will never update the max if the seed is 0). Consider initializing the internal max to a sentinel like -inf/"no value yet" instead of the gauge's current value, and only exporting 0 when there have been no observations since the last scrape.

Copilot uses AI. Check for mistakes.

/// Observe a value, keeping the maximum since the last scrape.
pub fn observe(&self, v: f64) {
if !v.is_finite() {
return;
}
let mut current = self.max_val.load(Ordering::Relaxed);
loop {
if v <= f64::from_bits(current) {
break;
}
match self.max_val.compare_exchange_weak(
current,
v.to_bits(),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
self.gauge.set(v);
break;
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxGauge::observe updates the underlying Gauge directly, while collect()/metric() also mutate the same Gauge during a scrape/reset. Without synchronization, a concurrent observe() can race with collect() and cause a scrape to include values from after the reset window. To preserve "max since last scrape" semantics, consider not mutating the underlying gauge in observe() (only update the atomic max), and have collect()/metric() render from the atomic value, or add a lightweight lock around the reset+export path.

Copilot uses AI. Check for mistakes.
}
Err(actual) => current = actual,
}
}
}

/// Get the current maximum value without resetting it.
pub fn get(&self) -> f64 {
let val = self.max_val.load(Ordering::Relaxed);
f64::from_bits(val)
}

fn take(&self) -> f64 {
let val = self.max_val.swap(0f64.to_bits(), Ordering::Relaxed);
f64::from_bits(val)
Comment on lines +163 to +164
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxGauge::take resets the tracked max to 0. This breaks correctness for gauges whose values can be negative: after a scrape/reset, any negative observations will be ignored because 0 remains the maximum. Use a sentinel (e.g. f64::NEG_INFINITY) or a separate "has_value" flag for the reset state, and translate the sentinel to 0 only at collection time if you want the exported default to be 0.

Suggested change
let val = self.max_val.swap(0f64.to_bits(), Ordering::Relaxed);
f64::from_bits(val)
let prev = self
.max_val
.swap(f64::NEG_INFINITY.to_bits(), Ordering::Relaxed);
let val = f64::from_bits(prev);
if val == f64::NEG_INFINITY {
0.0
} else {
val
}

Copilot uses AI. Check for mistakes.
}
}

impl Collector for MaxGauge {
fn desc(&self) -> Vec<&Desc> {
self.gauge.desc()
}

fn collect(&self) -> Vec<proto::MetricFamily> {
let val = self.take();
self.gauge.set(val);
self.gauge.collect()
}
}

impl Metric for MaxGauge {
fn metric(&self) -> proto::Metric {
let val = self.take();
self.gauge.set(val);
self.gauge.metric()
}
}

/// Builder for `MaxGaugeVec`.
#[derive(Clone, Debug)]
pub struct MaxGaugeVecBuilder;

impl MaxGaugeVecBuilder {
/// Create a new `MaxGaugeVecBuilder`.
pub fn new() -> Self {
Self
}
}

impl MetricVecBuilder for MaxGaugeVecBuilder {
type M = MaxGauge;
type P = Opts;

fn build(&self, opts: &Opts, vals: &[&str]) -> Result<Self::M> {
let mut opts = opts.clone();
for (name, val) in opts.variable_labels.iter().zip(vals.iter()) {
opts.const_labels.insert(name.clone(), (*val).to_owned());
}
opts.variable_labels.clear();

let gauge = Gauge::with_opts(opts)?;
Ok(MaxGauge::wrap(gauge))
}
}

/// A `MetricVec` for `MaxGauge` values, partitioned by label values.
#[derive(Clone, Debug)]
pub struct MaxGaugeVec {
inner: MetricVec<MaxGaugeVecBuilder>,
}

impl MaxGaugeVec {
/// Create a new `MaxGaugeVec` with the given label names.
pub fn new(opts: Opts, label_names: &[&str]) -> Result<Self> {
let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
let opts = opts.variable_labels(variable_names);
let inner = MetricVec::create(proto::MetricType::GAUGE, MaxGaugeVecBuilder::new(), opts)?;

Ok(Self { inner })
}
}

impl std::ops::Deref for MaxGaugeVec {
type Target = MetricVec<MaxGaugeVecBuilder>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl Collector for MaxGaugeVec {
fn desc(&self) -> Vec<&Desc> {
self.inner.desc()
}

fn collect(&self) -> Vec<proto::MetricFamily> {
self.inner.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_max_gauge_resets_on_metric() {
let gauge = Gauge::with_opts(Opts::new("test_max_gauge", "test max gauge")).unwrap();
let max = MaxGauge::wrap(gauge);

max.observe(1.0);
max.observe(3.0);
max.observe(2.0);
assert_eq!(max.get(), 3.0);

let _ = max.metric();
assert_eq!(max.get(), 0.0);
assert_eq!(max.gauge.get(), 3.0);
}

#[test]
fn test_max_gauge_vec_resets_on_collect() {
let vec = MaxGaugeVec::new(
Opts::new("test_max_gauge_vec", "test max gauge vec"),
&["l1"],
)
.unwrap();
let m = vec.with_label_values(&["v1"]);
m.observe(5.0);

let mfs = vec.collect();
assert_eq!(mfs.len(), 1);
let mf = &mfs[0];
let metric = mf.get_metric().get(0).unwrap();
assert_eq!(metric.get_label().len(), 1);
assert_eq!(metric.get_label()[0].get_name(), "l1");
assert_eq!(metric.get_label()[0].get_value(), "v1");
assert_eq!(metric.get_gauge().get_value(), 5.0);

assert_eq!(m.get(), 0.0);
}
}
34 changes: 33 additions & 1 deletion src/pool/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ pub struct SchedConfig {
pub alloc_slot_backoff: Duration,
}

#[derive(Clone, Copy)]
pub(crate) struct BurstMonitorConfig {
pub(crate) per_worker_multiplier: usize,
pub(crate) min_sample_size: usize,
}

impl Default for SchedConfig {
fn default() -> SchedConfig {
SchedConfig {
Expand Down Expand Up @@ -129,6 +135,7 @@ pub struct Builder {
name_prefix: String,
stack_size: Option<usize>,
sched_config: SchedConfig,
burst_monitoring: Option<BurstMonitorConfig>,
}

impl Builder {
Expand All @@ -138,6 +145,7 @@ impl Builder {
name_prefix: name_prefix.into(),
stack_size: None,
sched_config: SchedConfig::default(),
burst_monitoring: None,
}
}

Expand Down Expand Up @@ -212,6 +220,23 @@ impl Builder {
self
}

/// Enables monitoring of global injector enqueue burst throughput.
///
/// The sample size is computed as `max(min_sample_size, per_worker_multiplier * active_workers)`.
pub fn enable_burst_monitoring(
&mut self,
per_worker_multiplier: usize,
min_sample_size: usize,
) -> &mut Self {
if per_worker_multiplier > 0 && min_sample_size > 0 {
self.burst_monitoring = Some(BurstMonitorConfig {
per_worker_multiplier,
min_sample_size,
});
}
self
}

/// Freezes the configurations and returns the task scheduler and
/// a builder to for lazy spawning threads.
///
Expand Down Expand Up @@ -252,7 +277,14 @@ impl Builder {
.store(self.sched_config.min_thread_count, Ordering::SeqCst);
}
let (injector, local_queues) = queue::build(queue_type, self.sched_config.max_thread_count);
let core = Arc::new(QueueCore::new(injector, self.sched_config.clone()));
let burst_monitor = self
.burst_monitoring
.map(|cfg| super::spawn::BurstMonitor::new(&self.name_prefix, cfg));
let core = Arc::new(QueueCore::new(
injector,
self.sched_config.clone(),
burst_monitor,
));

(
Remote::new(core.clone()),
Expand Down
Loading
Loading