Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod hook;
pub mod implementation;
mod latency;
pub mod producer;
pub mod stat;
mod trace;
mod types;
pub mod utils;
Expand Down
15 changes: 15 additions & 0 deletions rocketmq-client/src/stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod consumer_stats_manager;
213 changes: 213 additions & 0 deletions rocketmq-client/src/stat/consumer_stats_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use rocketmq_common::common::stats::stats_item_set::StatsItemSet;
use rocketmq_remoting::protocol::body::consume_status::ConsumeStatus;

const TOPIC_AND_GROUP_CONSUME_OK_TPS: &str = "CONSUME_OK_TPS";
const TOPIC_AND_GROUP_CONSUME_FAILED_TPS: &str = "CONSUME_FAILED_TPS";
const TOPIC_AND_GROUP_CONSUME_RT: &str = "CONSUME_RT";
const TOPIC_AND_GROUP_PULL_TPS: &str = "PULL_TPS";
const TOPIC_AND_GROUP_PULL_RT: &str = "PULL_RT";

/// Tracks consumer-side statistics for each topic/group pair.
///
/// Maintains five time-windowed metric sets: consume OK TPS, consume failed
/// TPS, consume RT, pull TPS, and pull RT. All `inc_*` methods are
/// thread-safe and intended to be called on the hot consumption path.
pub struct ConsumerStatsManager {
topic_and_group_consume_ok_tps: StatsItemSet,
topic_and_group_consume_rt: StatsItemSet,
topic_and_group_consume_failed_tps: StatsItemSet,
topic_and_group_pull_tps: StatsItemSet,
topic_and_group_pull_rt: StatsItemSet,
}

impl Default for ConsumerStatsManager {
fn default() -> Self {
Self::new()
}
}

impl ConsumerStatsManager {
/// Creates a new `ConsumerStatsManager` with all metric sets initialised.
pub fn new() -> Self {
Self {
topic_and_group_consume_ok_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_OK_TPS.to_string()),
topic_and_group_consume_rt: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_RT.to_string()),
topic_and_group_consume_failed_tps: StatsItemSet::new(TOPIC_AND_GROUP_CONSUME_FAILED_TPS.to_string()),
topic_and_group_pull_tps: StatsItemSet::new(TOPIC_AND_GROUP_PULL_TPS.to_string()),
topic_and_group_pull_rt: StatsItemSet::new(TOPIC_AND_GROUP_PULL_RT.to_string()),
}
}

/// Starts the stats manager. Currently a no-op.
pub fn start(&self) {}

/// Shuts down the stats manager. Currently a no-op.
pub fn shutdown(&self) {}

/// Records a single pull response-time observation in milliseconds.
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}

/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}

/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}

/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}

/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
Comment on lines +62 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential silent truncation when casting u64 to i32.

All inc_* methods cast u64 parameters to i32 using as i32. For values exceeding i32::MAX (2,147,483,647), this silently truncates/wraps the value, potentially causing incorrect statistics in high-throughput scenarios.

Consider using saturating conversion or validating bounds:

🛡️ Proposed fix using saturating conversion
     pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
+        let rt_clamped = rt.min(i32::MAX as u64) as i32;
         self.topic_and_group_pull_rt
-            .add_rt_value(&stats_key(topic, group), rt as i32, 1);
+            .add_rt_value(&stats_key(topic, group), rt_clamped, 1);
     }

Apply similar pattern to other inc_* methods.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}
/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt as i32, 1);
}
/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs as i32, 1);
}
pub fn inc_pull_rt(&self, group: &str, topic: &str, rt: u64) {
let rt_clamped = rt.min(i32::MAX as u64) as i32;
self.topic_and_group_pull_rt
.add_rt_value(&stats_key(topic, group), rt_clamped, 1);
}
/// Records `msgs` messages successfully pulled in one batch.
pub fn inc_pull_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_pull_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
/// Records a single consume response-time observation in milliseconds.
pub fn inc_consume_rt(&self, group: &str, topic: &str, rt: u64) {
let rt_clamped = rt.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_rt
.add_rt_value(&stats_key(topic, group), rt_clamped, 1);
}
/// Records `msgs` messages consumed successfully in one batch.
pub fn inc_consume_ok_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_ok_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
/// Records `msgs` messages that failed consumption in one batch.
pub fn inc_consume_failed_tps(&self, group: &str, topic: &str, msgs: u64) {
let msgs_clamped = msgs.min(i32::MAX as u64) as i32;
self.topic_and_group_consume_failed_tps
.add_value(&stats_key(topic, group), msgs_clamped, 1);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/stat/consumer_stats_manager.rs` around lines 62 - 89, The
four inc_* methods (inc_pull_rt, inc_pull_tps, inc_consume_rt,
inc_consume_ok_tps, inc_consume_failed_tps) currently cast u64 -> i32 with `as
i32`, which can silently wrap; change each to perform a safe, saturating
conversion (e.g., clamp the u64 to i32::MAX and cast: let val = if msgs_or_rt >
i32::MAX as u64 { i32::MAX } else { msgs_or_rt as i32 }) before calling
add_value/add_rt_value, and apply the same pattern for both rt and msgs usages
to avoid truncation when forwarding values to topic_and_group_* methods.


/// Returns a point-in-time [`ConsumeStatus`] snapshot for the given
/// consumer group and topic.
///
/// - Pull / consume RT uses the per-minute average; consume RT falls back to the per-hour
/// average when the per-minute window is empty.
/// - `consume_failed_msgs` accumulates the per-hour sum of failed messages.
pub fn consume_status(&self, group: &str, topic: &str) -> ConsumeStatus {
let key = stats_key(topic, group);

let pull_rt = self.topic_and_group_pull_rt.get_stats_data_in_minute(&key).get_avgpt();

let pull_tps = self.topic_and_group_pull_tps.get_stats_data_in_minute(&key).get_tps();

let consume_rt = {
let minute = self.topic_and_group_consume_rt.get_stats_data_in_minute(&key);
if minute.get_sum() == 0 {
self.topic_and_group_consume_rt.get_stats_data_in_hour(&key).get_avgpt()
} else {
minute.get_avgpt()
}
};

let consume_ok_tps = self
.topic_and_group_consume_ok_tps
.get_stats_data_in_minute(&key)
.get_tps();

let consume_failed_tps = self
.topic_and_group_consume_failed_tps
.get_stats_data_in_minute(&key)
.get_tps();

let consume_failed_msgs = self
.topic_and_group_consume_failed_tps
.get_stats_data_in_hour(&key)
.get_sum() as i64;

ConsumeStatus {
pull_rt,
pull_tps,
consume_rt,
consume_ok_tps,
consume_failed_tps,
consume_failed_msgs,
}
}
}

/// Builds the canonical stats key `"topic@group"` used by all metric sets.
#[inline]
fn stats_key(topic: &str, group: &str) -> String {
format!("{topic}@{group}")
}

#[cfg(test)]
mod tests {
use super::*;

fn make_manager() -> ConsumerStatsManager {
ConsumerStatsManager::new()
}

#[test]
fn stats_key_format() {
assert_eq!(stats_key("TopicA", "GroupA"), "TopicA@GroupA");
}

#[test]
fn smoke_inc_consume_ok_tps() {
let mgr = make_manager();
mgr.inc_consume_ok_tps("GroupA", "TopicA", 5);
}

#[test]
fn smoke_inc_consume_failed_tps() {
let mgr = make_manager();
mgr.inc_consume_failed_tps("GroupA", "TopicA", 3);
}

#[test]
fn smoke_inc_consume_rt() {
let mgr = make_manager();
mgr.inc_consume_rt("GroupA", "TopicA", 42);
}

#[test]
fn smoke_inc_pull_rt() {
let mgr = make_manager();
mgr.inc_pull_rt("GroupA", "TopicA", 10);
}

#[test]
fn smoke_inc_pull_tps() {
let mgr = make_manager();
mgr.inc_pull_tps("GroupA", "TopicA", 100);
}

#[test]
fn consume_status_returns_zero_for_empty_stats() {
let mgr = make_manager();
let status = mgr.consume_status("GroupA", "TopicA");
assert_eq!(status.pull_rt, 0.0);
assert_eq!(status.pull_tps, 0.0);
assert_eq!(status.consume_rt, 0.0);
assert_eq!(status.consume_ok_tps, 0.0);
assert_eq!(status.consume_failed_tps, 0.0);
assert_eq!(status.consume_failed_msgs, 0);
}

#[test]
fn start_and_shutdown_are_no_ops() {
let mgr = make_manager();
mgr.start();
mgr.shutdown();
}

#[test]
fn default_creates_valid_manager() {
let mgr = ConsumerStatsManager::default();
// Should not panic when querying uninitialised key.
let _ = mgr.consume_status("G", "T");
}
}
8 changes: 8 additions & 0 deletions rocketmq-common/src/common/stats/stats_item_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ impl StatsItemSet {
.unwrap_or_default()
}

/// Get statistics snapshot for the last hour
pub fn get_stats_data_in_hour(&self, stats_key: &str) -> StatsSnapshot {
self.items
.get(stats_key)
.map(|item| item.get_stats_data_in_hour())
.unwrap_or_default()
}

/// Get a StatsItem by key
pub fn get_stats_item(&self, stats_key: &str) -> Option<Arc<StatsItem>> {
self.items.get(stats_key).map(|entry| entry.clone())
Expand Down
Loading