[ISSUE #6535]✨Enhance ConsumerStatsManager with background sampling tasks and improve StatsItem structure for better performance and clarity#6536
Conversation
…asks and improve StatsItem structure for better performance and clarity
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughImplements background sampling tasks in ConsumerStatsManager, refactors StatsItem to remove Arc indirection and add last-update timestamps, changes internal metrics to use 64-bit values, and extends StatsItemSet with sampling and idle-item eviction logic. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant CSM as ConsumerStatsManager
participant Tokio as Tokio Runtime
participant SIS as StatsItemSet
participant SI as StatsItem
App->>CSM: start()
CSM->>Tokio: spawn task (10s loop)
CSM->>Tokio: spawn task (10m loop)
CSM->>Tokio: spawn task (1h loop)
loop Every 10s
Tokio->>SIS: sampling_in_seconds()
SIS->>SI: sampling_in_seconds() per item
SI->>SI: update snapshots & last_update_timestamp
end
loop Every 10m
Tokio->>SIS: sampling_in_minutes()
SIS->>SI: sampling_in_minutes_level() per item
SI->>SI: advance minute snapshot
end
loop Every 1h
Tokio->>SIS: sampling_in_hours()
SIS->>SI: sampling_in_hour_level() per item
SI->>SI: advance hour snapshot
end
App->>CSM: shutdown()
CSM->>Tokio: abort/stop tasks
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-common/src/common/stats/stats_item.rs (1)
107-112:⚠️ Potential issue | 🟡 MinorGuard TPS calculation against zero elapsed time.
Line [111] divides by
(last_timestamp - first_timestamp)with no zero check. Equal timestamps yieldinf/NaNTPS and can pollute downstream metrics.✅ Proposed fix
if !cs_list.is_empty() { let first = cs_list.front().unwrap(); let last = cs_list.back().unwrap(); let sum = last.get_value() - first.get_value(); - let tps = (sum as f64 * 1000.0) / (last.get_timestamp() - first.get_timestamp()) as f64; + let elapsed = last.get_timestamp().saturating_sub(first.get_timestamp()); + let tps = if elapsed > 0 { + (sum as f64 * 1000.0) / elapsed as f64 + } else { + 0.0 + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-common/src/common/stats/stats_item.rs` around lines 107 - 112, The TPS calculation divides by (last.get_timestamp() - first.get_timestamp()) with no zero check; update the code around cs_list.front()/cs_list.back() (variables first, last) to compute an elapsed = last.get_timestamp() - first.get_timestamp() and guard against zero (or <= 0) before computing tps, e.g., set tps = 0.0 (or skip computation) when elapsed is zero to avoid inf/NaN, otherwise compute tps = (sum as f64 * 1000.0) / (elapsed as f64).
🧹 Nitpick comments (1)
rocketmq-client/src/stat/consumer_stats_manager.rs (1)
261-266: Rename or strengthen this test to match asserted behavior.Line [262] says “launches background tasks”, but the body only verifies no panic. Either rename to reflect that scope or assert observable sampling side effects.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 261 - 266, The test named start_launches_background_tasks only calls make_manager(), mgr.start(), and mgr.shutdown() and therefore only verifies that start() doesn't panic; either rename the test to something like start_does_not_panic to reflect the actual assertion, or strengthen it by asserting observable side-effects of the background tasks (e.g., call start() on the manager returned by make_manager(), then assert an internal running flag, a metrics/sampling counter, or that a sample/update method produced expected state changes before calling shutdown()); update the test function start_launches_background_tasks accordingly to use make_manager(), call mgr.start(), then perform the chosen assertion(s) and finally call mgr.shutdown().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rocketmq-client/src/stat/consumer_stats_manager.rs`:
- Around line 63-120: start() currently spawns three detached infinite tasks
(using tokio::spawn) and shutdown() is a no-op, so repeated start() calls
duplicate tasks and there is no way to stop them; make start() idempotent and
retain handles so shutdown() can abort them: add an AtomicBool started flag
(e.g., AtomicBool in the struct) and return immediately if already set, add a
Mutex<Vec<tokio::task::JoinHandle<()>>> (or RwLock) field to store the
JoinHandle returned from each tokio::spawn when creating the 10-second,
10-minute, and 1-hour tasks that call sampling_in_seconds/minutes/hours on the
sets (e.g., topic_and_group_consume_ok_tps, topic_and_group_consume_rt,
topic_and_group_consume_failed_tps, topic_and_group_pull_tps,
topic_and_group_pull_rt), and implement shutdown() to take the handles vector
and call abort() on each handle (and clear the vector and reset started if
desired) so tasks are cancelled and start() cannot spawn duplicates.
In `@rocketmq-common/src/common/stats/stats_item.rs`:
- Around line 95-99: The doc for get_last_update_timestamp disagrees with how
last_update_timestamp is initialized: either make the value actually default to
0 on creation or update the documentation to reflect the real behavior. Locate
the StatsItem constructor/Default impl that sets last_update_timestamp and the
get_last_update_timestamp method, then either (A) change the initialization to 0
so the doc ("0 means never written") is accurate, or (B) update the doc comment
on get_last_update_timestamp to state it returns the milliseconds of the last
add/increment (or the creation timestamp if never updated) and remove the claim
that 0 indicates "never written." Ensure the chosen fix references
last_update_timestamp and get_last_update_timestamp consistently.
---
Outside diff comments:
In `@rocketmq-common/src/common/stats/stats_item.rs`:
- Around line 107-112: The TPS calculation divides by (last.get_timestamp() -
first.get_timestamp()) with no zero check; update the code around
cs_list.front()/cs_list.back() (variables first, last) to compute an elapsed =
last.get_timestamp() - first.get_timestamp() and guard against zero (or <= 0)
before computing tps, e.g., set tps = 0.0 (or skip computation) when elapsed is
zero to avoid inf/NaN, otherwise compute tps = (sum as f64 * 1000.0) / (elapsed
as f64).
---
Nitpick comments:
In `@rocketmq-client/src/stat/consumer_stats_manager.rs`:
- Around line 261-266: The test named start_launches_background_tasks only calls
make_manager(), mgr.start(), and mgr.shutdown() and therefore only verifies that
start() doesn't panic; either rename the test to something like
start_does_not_panic to reflect the actual assertion, or strengthen it by
asserting observable side-effects of the background tasks (e.g., call start() on
the manager returned by make_manager(), then assert an internal running flag, a
metrics/sampling counter, or that a sample/update method produced expected state
changes before calling shutdown()); update the test function
start_launches_background_tasks accordingly to use make_manager(), call
mgr.start(), then perform the chosen assertion(s) and finally call
mgr.shutdown().
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
rocketmq-example/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
rocketmq-client/src/stat/consumer_stats_manager.rsrocketmq-common/src/common/stats/stats_item.rsrocketmq-common/src/common/stats/stats_item_set.rs
| pub fn start(&self) { | ||
| // 10-second tick — drives cs_list_minute on each StatsItem. | ||
| let sets_sec = [ | ||
| self.topic_and_group_consume_ok_tps.clone(), | ||
| self.topic_and_group_consume_rt.clone(), | ||
| self.topic_and_group_consume_failed_tps.clone(), | ||
| self.topic_and_group_pull_tps.clone(), | ||
| self.topic_and_group_pull_rt.clone(), | ||
| ]; | ||
| tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); | ||
| loop { | ||
| interval.tick().await; | ||
| for set in &sets_sec { | ||
| set.sampling_in_seconds(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // 10-minute tick — drives cs_list_hour on each StatsItem. | ||
| let sets_min = [ | ||
| self.topic_and_group_consume_ok_tps.clone(), | ||
| self.topic_and_group_consume_rt.clone(), | ||
| self.topic_and_group_consume_failed_tps.clone(), | ||
| self.topic_and_group_pull_tps.clone(), | ||
| self.topic_and_group_pull_rt.clone(), | ||
| ]; | ||
| tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(std::time::Duration::from_secs(10 * 60)); | ||
| loop { | ||
| interval.tick().await; | ||
| for set in &sets_min { | ||
| set.sampling_in_minutes(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // 1-hour tick — drives cs_list_day on each StatsItem. | ||
| let sets_hour = [ | ||
| self.topic_and_group_consume_ok_tps.clone(), | ||
| self.topic_and_group_consume_rt.clone(), | ||
| self.topic_and_group_consume_failed_tps.clone(), | ||
| self.topic_and_group_pull_tps.clone(), | ||
| self.topic_and_group_pull_rt.clone(), | ||
| ]; | ||
| tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); | ||
| loop { | ||
| interval.tick().await; | ||
| for set in &sets_hour { | ||
| set.sampling_in_hours(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /// Shuts down the stats manager. Currently a no-op. | ||
| /// Shuts down the stats manager. | ||
| pub fn shutdown(&self) {} |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the file and examine its structure
find . -name "consumer_stats_manager.rs" -type fRepository: mxsm/rocketmq-rust
Length of output: 114
🏁 Script executed:
# Read the file to understand the struct and lifecycle methods
cat -n rocketmq-client/src/stat/consumer_stats_manager.rs | head -150Repository: mxsm/rocketmq-rust
Length of output: 7316
🏁 Script executed:
# Search for where start() and shutdown() are called to understand caller patterns
rg -n '\.start\(\)|\.shutdown\(\)' --type rust -B2 -A2Repository: mxsm/rocketmq-rust
Length of output: 50377
🏁 Script executed:
# Check if there are any existing guards or task handle storage in ConsumerStatsManager
rg -n 'struct ConsumerStatsManager|started|task|JoinHandle|CancellationToken' rocketmq-client/src/stat/consumer_stats_manager.rs -B2 -A2Repository: mxsm/rocketmq-rust
Length of output: 764
🏁 Script executed:
# Verify the end of the file to check if there are any tests related to start/shutdown behavior
tail -100 rocketmq-client/src/stat/consumer_stats_manager.rsRepository: mxsm/rocketmq-rust
Length of output: 2659
🏁 Script executed:
# Search for where ConsumerStatsManager is instantiated and used to understand caller patterns
rg -n 'ConsumerStatsManager::new|ConsumerStatsManager {' --type rust -B2 -A2Repository: mxsm/rocketmq-rust
Length of output: 221
Store task handles and add idempotent start guard to prevent task duplication and enable proper cleanup.
Lines 63–117 spawn three infinite detached tasks without retaining their JoinHandles, and line 120's empty shutdown() cannot stop them. Repeated start() calls spawn additional tasks; previous tasks continue indefinitely. The struct has no fields to store task handles, cancellation tokens, or a started flag.
Add:
AtomicBoolor similar to guard idempotentstart()callsMutex<Vec<JoinHandle<()>>>or similar to store task handlesshutdown()implementation that callsabort()on each handle
Proposed fix
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
+use tokio::task::JoinHandle;
pub struct ConsumerStatsManager {
topic_and_group_consume_ok_tps: StatsItemSet,
topic_and_group_consume_rt: StatsItemSet,
topic_and_group_consume_failed_tps: StatsItemSet,
topic_and_group_pull_tps: StatsItemSet,
topic_and_group_pull_rt: StatsItemSet,
+ started: AtomicBool,
+ sampler_tasks: Mutex<Vec<JoinHandle<()>>>,
}
pub fn start(&self) {
+ if self.started.swap(true, Ordering::AcqRel) {
+ return;
+ }
+ let mut handles = self.sampler_tasks.lock().expect("sampler_tasks poisoned");
- tokio::spawn(async move { /* ... */ });
+ handles.push(tokio::spawn(async move { /* ... */ }));
}
pub fn shutdown(&self) {
+ for handle in self.sampler_tasks.lock().expect("sampler_tasks poisoned").drain(..) {
+ handle.abort();
+ }
+ self.started.store(false, Ordering::Release);
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 63 - 120,
start() currently spawns three detached infinite tasks (using tokio::spawn) and
shutdown() is a no-op, so repeated start() calls duplicate tasks and there is no
way to stop them; make start() idempotent and retain handles so shutdown() can
abort them: add an AtomicBool started flag (e.g., AtomicBool in the struct) and
return immediately if already set, add a Mutex<Vec<tokio::task::JoinHandle<()>>>
(or RwLock) field to store the JoinHandle returned from each tokio::spawn when
creating the 10-second, 10-minute, and 1-hour tasks that call
sampling_in_seconds/minutes/hours on the sets (e.g.,
topic_and_group_consume_ok_tps, topic_and_group_consume_rt,
topic_and_group_consume_failed_tps, topic_and_group_pull_tps,
topic_and_group_pull_rt), and implement shutdown() to take the handles vector
and call abort() on each handle (and clear the vector and reset started if
desired) so tasks are cancelled and start() cannot spawn duplicates.
| /// Returns the wall-clock timestamp (milliseconds since UNIX epoch) of the last | ||
| /// [`add`](Self::add) or [`increment`](Self::increment) call. | ||
| /// | ||
| /// A value of `0` means the item has never been written to. | ||
| #[inline] |
There was a problem hiding this comment.
Fix get_last_update_timestamp doc contract mismatch.
Line [98] says 0 means “never written,” but Line [50] initializes last_update_timestamp at creation time, so that state never occurs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rocketmq-common/src/common/stats/stats_item.rs` around lines 95 - 99, The doc
for get_last_update_timestamp disagrees with how last_update_timestamp is
initialized: either make the value actually default to 0 on creation or update
the documentation to reflect the real behavior. Locate the StatsItem
constructor/Default impl that sets last_update_timestamp and the
get_last_update_timestamp method, then either (A) change the initialization to 0
so the doc ("0 means never written") is accurate, or (B) update the doc comment
on get_last_update_timestamp to state it returns the milliseconds of the last
add/increment (or the creation timestamp if never updated) and remove the claim
that 0 indicates "never written." Ensure the chosen fix references
last_update_timestamp and get_last_update_timestamp consistently.
…asks and improve StatsItem structure for better performance and clarity
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rocketmq-common/src/common/stats/stats_item_set.rs`:
- Around line 53-55: The add_value signature currently accepts impl Into<i64>
and then casts to u64 which can silently wrap negative inputs; change add_value
to take unsigned types (e.g., inc_value: impl Into<u64>, inc_times: impl
Into<u64>) and remove the i64->u64 casts so callers pass/convert unsigned
values, updating callers to provide u64/u32 as needed; ensure the call to
item.add(…) uses the new unsigned types and keep get_or_create_stats_item
unchanged.
| pub fn add_value(&self, stats_key: &str, inc_value: impl Into<i64>, inc_times: impl Into<i64>) { | ||
| let item = self.get_or_create_stats_item(stats_key); | ||
| for _ in 0..inc_times { | ||
| item.increment(inc_value as u64); | ||
| } | ||
| item.add(inc_value.into() as u64, inc_times.into() as u64); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate the file and understand its context
find . -name "stats_item_set.rs" -type fRepository: mxsm/rocketmq-rust
Length of output: 114
🏁 Script executed:
# Find all callers of add_value and add_rt_value
echo "=== Callers of add_value ==="
rg -n '\.add_value\s*\(' --type=rust -A 2 -B 2
echo ""
echo "=== Callers of add_rt_value ==="
rg -n '\.add_rt_value\s*\(' --type=rust -A 2 -B 2Repository: mxsm/rocketmq-rust
Length of output: 18314
🏁 Script executed:
# Check the StatsItem implementation to understand the data types
find . -name "stats_item.rs" -type f | head -5Repository: mxsm/rocketmq-rust
Length of output: 110
🏁 Script executed:
# Look at the file under review to see full context
cat -n rocketmq-common/src/common/stats/stats_item_set.rs | sed -n '45,70p'Repository: mxsm/rocketmq-rust
Length of output: 1333
🏁 Script executed:
# Check the StatsItem implementation
cat -n rocketmq-common/src/common/stats/stats_item.rs | head -100Repository: mxsm/rocketmq-rust
Length of output: 4308
The signature allows negative values but all callers pass positive values only; however, the type permissiveness creates a risk of silent data corruption.
The methods accept impl Into<i64> but cast directly to u64. While all current callers pass positive values (from i32 increments and u64 casts), if a negative value were passed, it would wrap to a very large positive number (e.g., -1i64 as u64 = 18446744073709551615), corrupting the stats.
Consider either:
- Adding runtime assertions to enforce non-negative values, or
- Changing the signature to accept unsigned types (
u64oru32) if negative values are never valid.
🛡️ Proposed fix to add validation
pub fn add_value(&self, stats_key: &str, inc_value: impl Into<i64>, inc_times: impl Into<i64>) {
+ let inc_value = inc_value.into();
+ let inc_times = inc_times.into();
+ debug_assert!(inc_value >= 0, "inc_value must be non-negative");
+ debug_assert!(inc_times >= 0, "inc_times must be non-negative");
let item = self.get_or_create_stats_item(stats_key);
- item.add(inc_value.into() as u64, inc_times.into() as u64);
+ item.add(inc_value as u64, inc_times as u64);
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rocketmq-common/src/common/stats/stats_item_set.rs` around lines 53 - 55, The
add_value signature currently accepts impl Into<i64> and then casts to u64 which
can silently wrap negative inputs; change add_value to take unsigned types
(e.g., inc_value: impl Into<u64>, inc_times: impl Into<u64>) and remove the
i64->u64 casts so callers pass/convert unsigned values, updating callers to
provide u64/u32 as needed; ensure the call to item.add(…) uses the new unsigned
types and keep get_or_create_stats_item unchanged.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6536 +/- ##
==========================================
+ Coverage 42.10% 42.12% +0.01%
==========================================
Files 949 949
Lines 132503 132576 +73
==========================================
+ Hits 55795 55842 +47
- Misses 76708 76734 +26 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Bug Fixes
New Features
Performance
Tests