Skip to content

[ISSUE #6533]🚀Add ConsumerStatsManager and related statistics functionality for improved consumer metrics tracking#6534

Merged
rocketmq-rust-bot merged 2 commits intomainfrom
feat-6533
Feb 27, 2026
Merged

[ISSUE #6533]🚀Add ConsumerStatsManager and related statistics functionality for improved consumer metrics tracking#6534
rocketmq-rust-bot merged 2 commits intomainfrom
feat-6533

Conversation

@mxsm
Copy link
Owner

@mxsm mxsm commented Feb 27, 2026

Which Issue(s) This PR Fixes(Closes)

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features
    • Added consumer statistics tracking module enabling monitoring of consumer performance metrics including pull and consumption throughput (TPS), response times, and success/failure rates
    • Extended statistics API with hourly data retrieval capability alongside existing minute-level metrics

mxsm added 2 commits February 27, 2026 18:15
…nality for improved consumer metrics tracking
…nality for improved consumer metrics tracking
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 27, 2026

Walkthrough

This pull request introduces consumer statistics management capabilities to the RocketMQ client. A new ConsumerStatsManager module is added that tracks pull and consume metrics (throughput and response time) across topics and consumer groups. An hourly statistics retrieval method is also added to the existing StatsItemSet infrastructure.

Changes

Cohort / File(s) Summary
Consumer Stats Module
rocketmq-client/src/lib.rs, rocketmq-client/src/stat.rs, rocketmq-client/src/stat/consumer_stats_manager.rs
Introduces new ConsumerStatsManager struct encapsulating five StatsItemSet instances for tracking pull/consume throughput and response times. Provides methods to increment metrics by topic and group, and compute aggregate ConsumeStatus with per-minute and per-hour fallback logic. Includes unit tests for key formatting, increment operations, and status computation.
Stats Infrastructure Enhancement
rocketmq-common/src/common/stats/stats_item_set.rs
Adds get_stats_data_in_hour() method to StatsItemSet to retrieve hourly statistics snapshots, mirroring existing per-minute data retrieval capability.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 Hops of joy for metrics new,
Pull and consume stats ring true!
Topics grouped, their moments traced,
Response times and throughput paced,
Consumer wisdom, hour by hour,
Statistics give us super-power!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly describes the main change: adding ConsumerStatsManager and statistics functionality, matching the changeset which introduces this new component across multiple files.
Linked Issues check ✅ Passed The PR successfully implements ConsumerStatsManager with statistics tracking capabilities as indicated by issue #6533, though the issue provided no detailed requirements to validate comprehensively.
Out of Scope Changes check ✅ Passed All changes are directly aligned with adding ConsumerStatsManager: new module structure, public API exposure, stats manager implementation, and supporting stats method.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-6533

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
rocketmq-client/src/stat/consumer_stats_manager.rs (1)

158-186: Consider adding tests that verify recorded values.

The current smoke tests verify methods don't panic, but don't assert that incremented values are reflected in subsequent consume_status calls. Adding at least one integration-style test would strengthen confidence in the end-to-end flow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 158 - 186,
Add assertions to the smoke tests to verify the manager actually records
increments: after calling inc_consume_ok_tps, inc_consume_failed_tps,
inc_consume_rt, inc_pull_rt, and inc_pull_tps on the same group/topic, call the
lookup method (consume_status or the manager method that returns stored metrics)
and assert the returned TPS/RT values reflect the increments (e.g., that
ok/failed TPS counters increased and RT values match or aggregate as expected).
Update one or more tests (e.g., smoke_inc_consume_ok_tps and/or a new
integration-style test) to create the manager via make_manager(), perform
increments, call consume_status (or the appropriate getter) and assert on its
fields to confirm end-to-end recording.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rocketmq-client/src/stat/consumer_stats_manager.rs`:
- Around line 62-89: The four inc_* methods (inc_pull_rt, inc_pull_tps,
inc_consume_rt, inc_consume_ok_tps, inc_consume_failed_tps) currently cast u64
-> i32 with `as i32`, which can silently wrap; change each to perform a safe,
saturating conversion (e.g., clamp the u64 to i32::MAX and cast: let val = if
msgs_or_rt > i32::MAX as u64 { i32::MAX } else { msgs_or_rt as i32 }) before
calling add_value/add_rt_value, and apply the same pattern for both rt and msgs
usages to avoid truncation when forwarding values to topic_and_group_* methods.

---

Nitpick comments:
In `@rocketmq-client/src/stat/consumer_stats_manager.rs`:
- Around line 158-186: Add assertions to the smoke tests to verify the manager
actually records increments: after calling inc_consume_ok_tps,
inc_consume_failed_tps, inc_consume_rt, inc_pull_rt, and inc_pull_tps on the
same group/topic, call the lookup method (consume_status or the manager method
that returns stored metrics) and assert the returned TPS/RT values reflect the
increments (e.g., that ok/failed TPS counters increased and RT values match or
aggregate as expected). Update one or more tests (e.g., smoke_inc_consume_ok_tps
and/or a new integration-style test) to create the manager via make_manager(),
perform increments, call consume_status (or the appropriate getter) and assert
on its fields to confirm end-to-end recording.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c426b64 and 74602e0.

📒 Files selected for processing (4)
  • rocketmq-client/src/lib.rs
  • rocketmq-client/src/stat.rs
  • rocketmq-client/src/stat/consumer_stats_manager.rs
  • rocketmq-common/src/common/stats/stats_item_set.rs

Comment on lines +62 to +89
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}

/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}

/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}

/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}

/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential silent truncation when casting u64 to i32.

All inc_* methods cast u64 parameters to i32 using as i32. For values exceeding i32::MAX (2,147,483,647), this silently truncates/wraps the value, potentially causing incorrect statistics in high-throughput scenarios.

Consider using saturating conversion or validating bounds:

🛡️ Proposed fix using saturating conversion
     pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
+        let rt_clamped = rt.min(i32::MAX as u64) as i32;
         self.topic_and_group_pull_rt
-            .add_rt_value(&stats_key(topic, group), rt as i32, 1);
+            .add_rt_value(&stats_key(topic, group), rt_clamped, 1);
     }

Apply similar pattern to other inc_* methods.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}
/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}
/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
let rt_clamped = rt.min(i32::MAX as u64) as i32;
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt_clamped, 1);
}
/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
let rt_clamped = rt.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt_clamped, 1);
}
/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 62 - 89, The
four inc_* methods (inc_pull_rt, inc_pull_tps, inc_consume_rt,
inc_consume_ok_tps, inc_consume_failed_tps) currently cast u64 -> i32 with `as
i32`, which can silently wrap; change each to perform a safe, saturating
conversion (e.g., clamp the u64 to i32::MAX and cast: let val = if msgs_or_rt >
i32::MAX as u64 { i32::MAX } else { msgs_or_rt as i32 }) before calling
add_value/add_rt_value, and apply the same pattern for both rt and msgs usages
to avoid truncation when forwarding values to topic_and_group_* methods.

@codecov
Copy link

codecov bot commented Feb 27, 2026

Codecov Report

❌ Patch coverage is 99.15254% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 42.10%. Comparing base (c426b64) to head (74602e0).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-client/src/stat/consumer_stats_manager.rs 99.10% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6534      +/-   ##
==========================================
+ Coverage   42.04%   42.10%   +0.05%     
==========================================
  Files         948      949       +1     
  Lines      132385   132503     +118     
==========================================
+ Hits        55664    55793     +129     
+ Misses      76721    76710      -11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

@rocketmq-rust-bot rocketmq-rust-bot merged commit c5e507e into main Feb 27, 2026
20 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Add ConsumerStatsManager and related statistics functionality for improved consumer metrics tracking

3 participants