Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 72 additions & 10 deletions rocketmq-client/src/stat/consumer_stats_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,101 @@ impl ConsumerStatsManager {
}
}

/// Starts the stats manager. Currently a no-op.
pub fn start(&self) {}
/// Starts background sampling tasks.
///
/// Spawns three Tokio tasks that advance the sliding-window snapshots used
/// by [`consume_status`](Self::consume_status):
///
/// - every 10 s → `cs_list_minute` (drives per-minute stats)
/// - every 10 min → `cs_list_hour` (drives per-hour stats)
/// - every 1 h → `cs_list_day` (drives per-day stats)
pub fn start(&self) {
// 10-second tick — drives cs_list_minute on each StatsItem.
let sets_sec = [
self.topic_and_group_consume_ok_tps.clone(),
self.topic_and_group_consume_rt.clone(),
self.topic_and_group_consume_failed_tps.clone(),
self.topic_and_group_pull_tps.clone(),
self.topic_and_group_pull_rt.clone(),
];
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
interval.tick().await;
for set in &sets_sec {
set.sampling_in_seconds();
}
}
});

// 10-minute tick — drives cs_list_hour on each StatsItem.
let sets_min = [
self.topic_and_group_consume_ok_tps.clone(),
self.topic_and_group_consume_rt.clone(),
self.topic_and_group_consume_failed_tps.clone(),
self.topic_and_group_pull_tps.clone(),
self.topic_and_group_pull_rt.clone(),
];
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10 * 60));
loop {
interval.tick().await;
for set in &sets_min {
set.sampling_in_minutes();
}
}
});

// 1-hour tick — drives cs_list_day on each StatsItem.
let sets_hour = [
self.topic_and_group_consume_ok_tps.clone(),
self.topic_and_group_consume_rt.clone(),
self.topic_and_group_consume_failed_tps.clone(),
self.topic_and_group_pull_tps.clone(),
self.topic_and_group_pull_rt.clone(),
];
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
loop {
interval.tick().await;
for set in &sets_hour {
set.sampling_in_hours();
}
}
});
}

/// Shuts down the stats manager. Currently a no-op.
/// Shuts down the stats manager.
pub fn shutdown(&self) {}
Comment on lines +63 to 120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find the file and examine its structure
find . -name "consumer_stats_manager.rs" -type f

Repository: mxsm/rocketmq-rust

Length of output: 114


🏁 Script executed:

# Read the file to understand the struct and lifecycle methods
cat -n rocketmq-client/src/stat/consumer_stats_manager.rs | head -150

Repository: mxsm/rocketmq-rust

Length of output: 7316


🏁 Script executed:

# Search for where start() and shutdown() are called to understand caller patterns
rg -n '\.start\(\)|\.shutdown\(\)' --type rust -B2 -A2

Repository: mxsm/rocketmq-rust

Length of output: 50377


🏁 Script executed:

# Check if there are any existing guards or task handle storage in ConsumerStatsManager
rg -n 'struct ConsumerStatsManager|started|task|JoinHandle|CancellationToken' rocketmq-client/src/stat/consumer_stats_manager.rs -B2 -A2

Repository: mxsm/rocketmq-rust

Length of output: 764


🏁 Script executed:

# Verify the end of the file to check if there are any tests related to start/shutdown behavior
tail -100 rocketmq-client/src/stat/consumer_stats_manager.rs

Repository: mxsm/rocketmq-rust

Length of output: 2659


🏁 Script executed:

# Search for where ConsumerStatsManager is instantiated and used to understand caller patterns
rg -n 'ConsumerStatsManager::new|ConsumerStatsManager {' --type rust -B2 -A2

Repository: mxsm/rocketmq-rust

Length of output: 221


Store task handles and add idempotent start guard to prevent task duplication and enable proper cleanup.

Lines 63–117 spawn three infinite detached tasks without retaining their JoinHandles, and line 120's empty shutdown() cannot stop them. Repeated start() calls spawn additional tasks; previous tasks continue indefinitely. The struct has no fields to store task handles, cancellation tokens, or a started flag.

Add:

  • AtomicBool or similar to guard idempotent start() calls
  • Mutex<Vec<JoinHandle<()>>> or similar to store task handles
  • shutdown() implementation that calls abort() on each handle
Proposed fix
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
+use tokio::task::JoinHandle;

 pub struct ConsumerStatsManager {
     topic_and_group_consume_ok_tps: StatsItemSet,
     topic_and_group_consume_rt: StatsItemSet,
     topic_and_group_consume_failed_tps: StatsItemSet,
     topic_and_group_pull_tps: StatsItemSet,
     topic_and_group_pull_rt: StatsItemSet,
+    started: AtomicBool,
+    sampler_tasks: Mutex<Vec<JoinHandle<()>>>,
 }

 pub fn start(&self) {
+    if self.started.swap(true, Ordering::AcqRel) {
+        return;
+    }
+    let mut handles = self.sampler_tasks.lock().expect("sampler_tasks poisoned");
-    tokio::spawn(async move { /* ... */ });
+    handles.push(tokio::spawn(async move { /* ... */ }));
 }

 pub fn shutdown(&self) {
+    for handle in self.sampler_tasks.lock().expect("sampler_tasks poisoned").drain(..) {
+        handle.abort();
+    }
+    self.started.store(false, Ordering::Release);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 63 - 120,
start() currently spawns three detached infinite tasks (using tokio::spawn) and
shutdown() is a no-op, so repeated start() calls duplicate tasks and there is no
way to stop them; make start() idempotent and retain handles so shutdown() can
abort them: add an AtomicBool started flag (e.g., AtomicBool in the struct) and
return immediately if already set, add a Mutex<Vec<tokio::task::JoinHandle<()>>>
(or RwLock) field to store the JoinHandle returned from each tokio::spawn when
creating the 10-second, 10-minute, and 1-hour tasks that call
sampling_in_seconds/minutes/hours on the sets (e.g.,
topic_and_group_consume_ok_tps, topic_and_group_consume_rt,
topic_and_group_consume_failed_tps, topic_and_group_pull_tps,
topic_and_group_pull_rt), and implement shutdown() to take the handles vector
and call abort() on each handle (and clear the vector and reset started if
desired) so tasks are cancelled and start() cannot spawn duplicates.


/// Records a single pull response-time observation in milliseconds.
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
.add_rt_value(&stats_key(topic, group), rt as i64, 1);
}

/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
.add_value(&stats_key(topic, group), msgs as i64, 1);
}

/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
.add_rt_value(&stats_key(topic, group), rt as i64, 1);
}

/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
.add_value(&stats_key(topic, group), msgs as i64, 1);
}

/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
.add_value(&stats_key(topic, group), msgs as i64, 1);
}

/// Returns a point-in-time [`ConsumeStatus`] snapshot for the given
Expand Down Expand Up @@ -197,9 +258,10 @@ mod tests {
assert_eq!(status.consume_failed_msgs, 0);
}

#[test]
fn start_and_shutdown_are_no_ops() {
#[tokio::test]
async fn start_launches_background_tasks() {
let mgr = make_manager();
// Verifies start() does not panic in a Tokio runtime context.
mgr.start();
mgr.shutdown();
}
Expand Down
Loading
Loading