add burst monitoring for global task injector#85
Conversation
Signed-off-by: zyguan <zhongyangguan@gmail.com>
📝 WalkthroughWalkthroughAdds burst monitoring: a MaxGauge metric/vector for max enqueue throughput, a BurstMonitor sampling on enqueues, Builder support to enable monitoring, QueueCore updated to accept an optional BurstMonitor, and tests exercising throughput metrics and reset behavior. Changes
Sequence DiagramsequenceDiagram
participant Client as Task Submission
participant QueueCore
participant BurstMonitor
participant Metrics as MaxGaugeVec
participant Scraper as Prometheus Scraper
Client->>QueueCore: push(task)
QueueCore->>BurstMonitor: on_enqueue(active_workers) (if Some)
rect rgba(100,200,100,0.5)
Note over BurstMonitor: sample timing & counts, compute throughput
BurstMonitor->>Metrics: observe(throughput_value)
end
Scraper->>Metrics: collect()
rect rgba(100,100,200,0.5)
Note over Metrics: emit max since last scrape and reset
Metrics-->>Scraper: metric_data
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/metrics.rs`:
- Around line 120-129: MaxGauge::observe currently reads self.gauge.get() then
sets, causing a TOCTOU race that can regress the max; replace the read-then-set
with a lock-free compare-and-swap loop on an AtomicU64 storing f64 bits: load
the current bits, compute new_bits = f64::to_bits(v), if new_bits > current_bits
attempt compare_exchange (using appropriate Orderings) and retry on failure
until success or current is already >= new_bits; use f64::from_bits when needed
and operate on self.gauge's underlying AtomicU64 (or change gauge to hold one)
so concurrent observers cannot overwrite a larger value with a smaller one.
In `@src/pool/tests.rs`:
- Around line 299-332: The timing-sensitive assertions in test_burst_monitoring
are too strict and can flake under CI; update the test (test_burst_monitoring)
to make the throughput checks more robust by either increasing the sleep
durations around spawn_n to reduce relative jitter (e.g., double the sleep
times) or widening the assertion tolerance for QUEUE_CORE_BURST_THROUGHPUT (the
gauge checks after the spawn_n sequences) to a safer margin (e.g., ±50 or use a
percentage-based threshold) so the asserts no longer fail from small OS
scheduling delays during the 200ms/400ms windows.
🧹 Nitpick comments (3)
src/metrics.rs (1)
131-149: Reset-on-read in bothCollector::collectandMetric::metric— clarify intended usage.Both
collect()(Line 137-138) andmetric()(Line 145-146) reset the gauge to 0.0 after reading. WhenMaxGaugeVec::collect()is called, the innerMetricVeccallsmetric()on each childMaxGauge, which triggers the reset. This is the correct path.However, the standalone
Collector::collect()onMaxGaugeis also callable. If someone inadvertently registers a singleMaxGaugeas aCollectorand it's also part of aMaxGaugeVec, both paths would race to reset. Consider adding a doc comment clarifying thatMaxGaugeinstances obtained fromMaxGaugeVecshould only be reset via the vec'scollect().src/pool/spawn.rs (1)
61-97: Relaxed ordering onsample_targetandstart_nscan cause stale reads across threads.When Thread A (the one with
new_count == 1) storessample_targetandstart_nswithRelaxedordering (lines 69-70), another Thread B that subsequently reaches the target check (line 73-76) or readsstart_ns(line 78) is not guaranteed to observe those stores —Relaxedprovides no cross-thread visibility ordering.In practice, this is likely benign because:
- If
targetis read as 0 (stale), the thread returns early — no harm.- A stale
start_nswould only slightly skew the throughput measurement.However, if correctness of the throughput measurement matters more than the comment suggests, consider using
Releaseon the stores in thenew_count == 1branch andAcquireon the corresponding loads. Thecountfetch_add already acts as a synchronization point, butRelaxedon the ancillary fields doesn't piggyback on it.Given the documented "good enough" stance, this is likely acceptable as-is.
src/pool/builder.rs (1)
223-238: Silently ignoringper_worker_multiplier == 0is overly restrictive.The guard
per_worker_multiplier > 0 && min_sample_size > 0rejectsper_worker_multiplier = 0, but that value would still produce a valid configuration: the sample target would always bemax(min_sample_size, 0 * workers) = min_sample_size. Onlymin_sample_size == 0is truly problematic (would cause the monitor to fire on every enqueue).Consider either relaxing the check to only require
min_sample_size > 0, or documenting that both must be positive.
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/metrics.rs`:
- Around line 194-199: Clippy warns that MaxGaugeVecBuilder should implement
Default; add an impl Default for MaxGaugeVecBuilder that returns Self::new()
(i.e., implement fn default() -> Self { Self::new() }) so the type satisfies
Default and the linter error is resolved while keeping the existing pub fn new()
constructor.
🧹 Nitpick comments (1)
src/pool/spawn.rs (1)
61-97: Relaxed ordering betweensample_target/start_nsstores and loads across threads is technically unsound but practically benign.When the thread that sets
new_count == 1storessample_target(Line 69) thenstart_ns(Line 70) withRelaxedordering, a later thread that observessample_target != 0is not formally guaranteed to also see the updatedstart_ns. On weakly-ordered architectures (e.g., ARM), it's theoretically possible to read a stalestart_nsof0.The impact is minimal:
elapsed_nswould be inflated (time since process start), yielding an artificially low throughput thatMaxGaugewould ignore if a higher value was already recorded. This self-corrects on the next sampling window.If you ever want to tighten this, changing the
sample_targetstore toReleaseand its load (Line 73) toAcquirewould establish a happens-before edge that coversstart_ns.
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
There was a problem hiding this comment.
Pull request overview
Adds optional “burst” monitoring for the global task injector to report peak enqueue throughput (tasks/sec) since the last Prometheus scrape, and wires it through the pool builder/QueueCore.
Changes:
- Introduce
MaxGauge/MaxGaugeVecto track a maximum value and reset on collection. - Add
BurstMonitorand emitQUEUE_CORE_BURST_THROUGHPUTfromQueueCore::push. - Expose burst monitoring configuration via
Builderand add tests.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/metrics.rs |
Adds MaxGauge/MaxGaugeVec and the yatp_queue_core_burst_throughput metric. |
src/pool/spawn.rs |
Implements BurstMonitor, extends QueueCore to optionally track enqueue throughput. |
src/pool/builder.rs |
Adds builder configuration and wires BurstMonitor construction into QueueCore::new. |
src/pool/tests.rs |
Adds a unit test for burst monitoring behavior. |
src/queue/priority.rs |
Updates tests to pass the new QueueCore::new(..., burst_monitor) parameter. |
src/queue.rs |
Simplifies QueueType default via #[derive(Default)]. |
src/task/future.rs |
Adjusts thread_local! initialization style. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let value = metric.metric().get_gauge().get_value(); | ||
| assert!(value > 100.0); // the above loop should be executed within 1s |
There was a problem hiding this comment.
This assertion is timing-based (value > 100.0 assuming the enqueue loop completes within 1s) and is likely to be flaky on slower/contended CI runners. Consider asserting only that a positive, finite value is reported and that it resets after a scrape/collect (e.g. call metric.metric()/collect() twice and assert the second is 0), or make the threshold much more forgiving and/or control time in the monitor for the test.
| let value = metric.metric().get_gauge().get_value(); | |
| assert!(value > 100.0); // the above loop should be executed within 1s | |
| let value1 = metric.metric().get_gauge().get_value(); | |
| assert!(value1.is_finite()); | |
| assert!(value1 > 0.0); | |
| let value2 = metric.metric().get_gauge().get_value(); | |
| assert_eq!(value2, 0.0); |
| /// Wraps a `Gauge` to create a `MaxGauge`. The `Gauge` should not be used directly after being wrapped, otherwise | ||
| /// the maximum tracking will be broken. | ||
| pub fn wrap(gauge: Gauge) -> Self { | ||
| let val = gauge.get().to_bits(); | ||
| Self { | ||
| gauge, | ||
| max_val: Arc::new(AtomicU64::new(val)), | ||
| } | ||
| } |
There was a problem hiding this comment.
MaxGauge::wrap seeds max_val from the wrapped Gauge's current value (typically 0). That makes the max tracking incorrect for metrics that can legitimately be negative (e.g. observing -1.0 will never update the max if the seed is 0). Consider initializing the internal max to a sentinel like -inf/"no value yet" instead of the gauge's current value, and only exporting 0 when there have been no observations since the last scrape.
| let val = self.max_val.swap(0f64.to_bits(), Ordering::Relaxed); | ||
| f64::from_bits(val) |
There was a problem hiding this comment.
MaxGauge::take resets the tracked max to 0. This breaks correctness for gauges whose values can be negative: after a scrape/reset, any negative observations will be ignored because 0 remains the maximum. Use a sentinel (e.g. f64::NEG_INFINITY) or a separate "has_value" flag for the reset state, and translate the sentinel to 0 only at collection time if you want the exported default to be 0.
| let val = self.max_val.swap(0f64.to_bits(), Ordering::Relaxed); | |
| f64::from_bits(val) | |
| let prev = self | |
| .max_val | |
| .swap(f64::NEG_INFINITY.to_bits(), Ordering::Relaxed); | |
| let val = f64::from_bits(prev); | |
| if val == f64::NEG_INFINITY { | |
| 0.0 | |
| } else { | |
| val | |
| } |
src/metrics.rs
Outdated
| match self.max_val.compare_exchange_weak( | ||
| current, | ||
| v.to_bits(), | ||
| Ordering::Relaxed, | ||
| Ordering::Relaxed, | ||
| ) { | ||
| Ok(_) => { | ||
| self.gauge.set(v); | ||
| break; |
There was a problem hiding this comment.
MaxGauge::observe updates the underlying Gauge directly, while collect()/metric() also mutate the same Gauge during a scrape/reset. Without synchronization, a concurrent observe() can race with collect() and cause a scrape to include values from after the reset window. To preserve "max since last scrape" semantics, consider not mutating the underlying gauge in observe() (only update the atomic max), and have collect()/metric() render from the atomic value, or add a lightweight lock around the reset+export path.
| self.sample_target.store(target, Ordering::Relaxed); | ||
| self.start_ns.store(now_ns(), Ordering::Relaxed); | ||
| } | ||
|
|
||
| let target = self.sample_target.load(Ordering::Relaxed); | ||
| if target == 0 || new_count < target { | ||
| return; | ||
| } | ||
|
|
||
| let start_ns = self.start_ns.load(Ordering::Relaxed); |
There was a problem hiding this comment.
BurstMonitor::on_enqueue uses only Relaxed orderings when publishing sample_target/start_ns on the first enqueue and reading them on subsequent enqueues. Under contention, other threads are allowed to observe stale values (e.g. start_ns == 0), which can skew throughput calculations significantly. Consider using Release on the stores and Acquire on the loads (or setting start_ns via CAS) so the window start/target become visible consistently once initialized.
| self.sample_target.store(target, Ordering::Relaxed); | |
| self.start_ns.store(now_ns(), Ordering::Relaxed); | |
| } | |
| let target = self.sample_target.load(Ordering::Relaxed); | |
| if target == 0 || new_count < target { | |
| return; | |
| } | |
| let start_ns = self.start_ns.load(Ordering::Relaxed); | |
| self.sample_target.store(target, Ordering::Release); | |
| self.start_ns.store(now_ns(), Ordering::Release); | |
| } | |
| let target = self.sample_target.load(Ordering::Acquire); | |
| if target == 0 || new_count < target { | |
| return; | |
| } | |
| let start_ns = self.start_ns.load(Ordering::Acquire); |
Signed-off-by: zyguan <zhongyangguan@gmail.com>
This PR adds burst monitoring for the global task injector to track maximum enqueue throughput (tasks/sec) since the last scrape.
MaxGaugeandMaxGaugeVectypes that track maximum values and reset on scrape/collection.BurstMonitorthat calculates throughput based on task sampling and records it inQUEUE_CORE_BURST_THROUGHPUT.BurstMonitorinto theQueueCoreand expose configuration through theBuilder.Summary by CodeRabbit
New Features
Tests