Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod controller;
mod export;
mod ha;
mod lite;
mod message;
mod namesrv;
mod offset;
mod target;
Expand Down Expand Up @@ -122,6 +123,11 @@ pub enum Commands {
#[command(name = "lite")]
Lite(lite::LiteCommands),

#[command(subcommand)]
#[command(about = "Message commands")]
#[command(name = "message")]
Message(message::MessageCommands),

#[command(subcommand)]
#[command(about = "Name server commands")]
#[command(name = "nameserver")]
Expand Down Expand Up @@ -152,6 +158,7 @@ impl CommandExecute for Commands {
Commands::Export(value) => value.execute(rpc_hook).await,
Commands::HA(value) => value.execute(rpc_hook).await,
Commands::Lite(value) => value.execute(rpc_hook).await,
Commands::Message(value) => value.execute(rpc_hook).await,
Commands::NameServer(value) => value.execute(rpc_hook).await,
Commands::Offset(value) => value.execute(rpc_hook).await,
Commands::Topic(value) => value.execute(rpc_hook).await,
Expand Down Expand Up @@ -415,6 +422,11 @@ impl CommandExecute for ClassificationTablePrint {
command: "triggerLiteDispatch",
remark: "Trigger Lite Dispatch.",
},
Command {
category: "Message",
command: "checkMsgSendRT",
remark: "Check message send response time.",
},
Command {
category: "NameServer",
command: "addWritePerm",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod check_msg_send_rt_sub_command;

use std::sync::Arc;

use clap::Subcommand;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::runtime::RPCHook;

use crate::commands::message::check_msg_send_rt_sub_command::CheckMsgSendRTSubCommand;
use crate::commands::CommandExecute;

#[derive(Subcommand)]
pub enum MessageCommands {
#[command(
name = "checkMsgSendRT",
about = "Check message send response time.",
long_about = None,
)]
CheckMsgSendRT(CheckMsgSendRTSubCommand),
}

impl CommandExecute for MessageCommands {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
match self {
MessageCommands::CheckMsgSendRT(value) => value.execute(rpc_hook).await,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 The RocketMQ Rust Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::Mutex;

use clap::Parser;
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::TimeUtils::current_millis;
use rocketmq_error::RocketMQError;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::runtime::RPCHook;

use crate::commands::CommandExecute;

fn get_string_by_size(size: usize) -> Vec<u8> {
vec![b'a'; size]
}

#[derive(Debug, Clone, Parser)]
pub struct CheckMsgSendRTSubCommand {
#[arg(short = 't', long = "topic", required = true, help = "topic name")]
topic: String,

#[arg(
short = 'a',
long = "amount",
required = false,
default_value = "100",
help = "message amount | default 100"
)]
amount: u64,

#[arg(
short = 's',
long = "size",
required = false,
default_value = "128",
help = "message size | default 128 Byte"
)]
size: usize,
}
Comment on lines +39 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Split rounds from per-round send count (amount vs count).

This implementation uses amount as total sends and does not expose a per-round count, so the required round-based test model cannot run.

Suggested structural fix
 pub struct CheckMsgSendRTSubCommand {
@@
     #[arg(
+        short = 'c',
+        long = "count",
+        required = false,
+        default_value = "50",
+        help = "messages per round | default 50"
+    )]
+    count: u64,
+
+    #[arg(
         short = 'a',
         long = "amount",
         required = false,
         default_value = "100",
-        help = "message amount | default 100"
+        help = "round amount | default 100"
     )]
     amount: u64,
@@
-            let amount = self.amount;
+            let rounds = self.amount;
+            let count = self.count;
@@
-            for i in 0..amount {
-                // send once
-            }
+            for round in 0..rounds {
+                for i in 0..count {
+                    // send once
+                }
+                // print per-round summary
+            }

Also applies to: 73-74, 88-89

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 39 - 56, The CLI currently only exposes amount and size but needs a
separate per-round send count; add a new arg field named count (e.g., short =
'c', long = "count", type u64, default_value = "1", help = "per-round message
count | default 1") in the same struct (the CheckMsgSendRtSubCommand struct),
change the existing amount help to indicate it represents rounds (e.g., "rounds
| default 100"), and update all code locations that consume amount to treat
amount as rounds and use count for per-round sends (compute total sends as
rounds * count or use both where the send loop is implemented); ensure help
texts and any validation or calculations that previously used amount are
adjusted to use count where appropriate and keep size unchanged.


impl CommandExecute for CheckMsgSendRTSubCommand {
async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
let mut builder = DefaultMQProducer::builder().producer_group(current_millis().to_string());
if let Some(rpc_hook) = rpc_hook {
builder = builder.rpc_hook(rpc_hook);
}
let mut producer = builder.build();

let operation_result = async {
producer
.start()
.await
.map_err(|e| RocketMQError::Internal(format!("CheckMsgSendRTSubCommand command failed: {}", e)))?;

let topic = self.topic.trim();
let amount = self.amount;
let msg_size = self.size;

let msg = Message::builder()
.topic(topic)
.body_slice(&get_string_by_size(msg_size))
.build_unchecked();

let broker_name_holder: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
let queue_id_holder: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));

println!("{:<32} {:<4} {:<20} #RT", "#Broker Name", "#QID", "#Send Result");

let mut time_elapsed: u64 = 0;

for i in 0..amount {
let start = current_millis();
let send_success;
let end;

let bn = broker_name_holder.clone();
let qi = queue_id_holder.clone();
let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> {
let queue_index = (*arg as usize) % mqs.len();
let queue = &mqs[queue_index];
*bn.lock().unwrap() = queue.broker_name().to_string();
*qi.lock().unwrap() = queue.queue_id();
Some(queue.clone())
Comment on lines +95 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the file structure first
fd -e rs check_msg_send_rt_sub_command.rs

# Read the file to understand context
if [ -f "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs" ]; then
  echo "=== File found, reading around lines 90-110 ==="
  sed -n '85,115p' rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs
fi

Repository: mxsm/rocketmq-rust

Length of output: 1465


🏁 Script executed:

#!/bin/bash
# Search for send_with_selector usage to understand how mqs is populated
rg -n 'send_with_selector' --type rust -B5 -A5

Repository: mxsm/rocketmq-rust

Length of output: 40971


🏁 Script executed:

#!/bin/bash
# Search for MessageQueue and queue validation patterns
rg -n 'MessageQueue' rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs -B2 -A2 | head -100

Repository: mxsm/rocketmq-rust

Length of output: 742


🏁 Script executed:

#!/bin/bash
# Find the send_with_selector implementation to see queue validation
rg -n 'pub async fn send_with_selector' --type rust -A 20 | head -60

Repository: mxsm/rocketmq-rust

Length of output: 4707


🏁 Script executed:

#!/bin/bash
# Look for select_one_message_queue or queue validation patterns
rg -n 'select_one_message_queue|\.queues\(\)|topic_route' --type rust rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs -A3 -B3 | head -100

Repository: mxsm/rocketmq-rust

Length of output: 2965


🏁 Script executed:

#!/bin/bash
# Find send_select_impl to see how queues are passed to selector
rg -n 'async fn send_select_impl' --type rust -A 30

Repository: mxsm/rocketmq-rust

Length of output: 3806


🏁 Script executed:

#!/bin/bash
# Continue reading send_select_impl to see selector invocation
sed -n '632,730p' rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs

Repository: mxsm/rocketmq-rust

Length of output: 4194


Guard the selector against empty queue lists to avoid a hard panic.

The send_select_impl implementation passes message_queue_list directly to the selector closure without validating it is non-empty. Since (*arg as usize) % mqs.len() panics when mqs is empty, this aborts the command instead of counting a failed send and continuing.

Safe selector guard
 let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> {
+    if mqs.is_empty() {
+        return None;
+    }
     let queue_index = (*arg as usize) % mqs.len();
     let queue = &mqs[queue_index];
     *bn.lock().unwrap() = queue.broker_name().to_string();
     *qi.lock().unwrap() = queue.queue_id();
     Some(queue.clone())
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> {
let queue_index = (*arg as usize) % mqs.len();
let queue = &mqs[queue_index];
*bn.lock().unwrap() = queue.broker_name().to_string();
*qi.lock().unwrap() = queue.queue_id();
Some(queue.clone())
let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> {
if mqs.is_empty() {
return None;
}
let queue_index = (*arg as usize) % mqs.len();
let queue = &mqs[queue_index];
*bn.lock().unwrap() = queue.broker_name().to_string();
*qi.lock().unwrap() = queue.queue_id();
Some(queue.clone())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 95 - 100, The selector closure passed into send_select_impl can
panic when mqs is empty due to (*arg as usize) % mqs.len(); update the closure
(the selector used for send_select_impl that writes to bn and qi) to first check
if mqs.is_empty() and return None (or otherwise signal no selection) instead of
performing the modulo; this prevents a divide-by-zero panic and allows the
caller to treat the send as a failed send and continue counting failures rather
than aborting.

};

match producer.send_with_selector(msg.clone(), selector, i).await {
Ok(_) => {
send_success = true;
end = current_millis();
}
Err(_) => {
send_success = false;
end = current_millis();
}
}

let broker_name = broker_name_holder.lock().unwrap().clone();
let queue_id = *queue_id_holder.lock().unwrap();

if i != 0 {
time_elapsed += end - start;
}

println!(
"{:<32} {:<4} {:<20} {}",
broker_name,
queue_id,
send_success,
end - start
);
}

let rt = time_elapsed as f64 / (amount as i64 - 1) as f64;
println!("Avg RT: {:.2}", rt);

Comment on lines +86 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

RT aggregation is incomplete and average calculation is unsafe for low sample counts.

The command currently emits per-send rows plus one average, but does not compute required totals/percentiles/throughput, and amount - 1 produces invalid math for amount <= 1.

Suggested metrics shape
-            let mut time_elapsed: u64 = 0;
+            let mut samples_ms: Vec<u64> = Vec::new();
+            let mut success: u64 = 0;
+            let mut failed: u64 = 0;
+            let test_begin = current_millis();
@@
-                if i != 0 {
-                    time_elapsed += end - start;
-                }
+                let cost = end - start;
+                samples_ms.push(cost);
+                if send_success { success += 1; } else { failed += 1; }
@@
-            let rt = time_elapsed as f64 / (amount as i64 - 1) as f64;
-            println!("Avg RT: {:.2}", rt);
+            if samples_ms.is_empty() {
+                println!("No samples collected.");
+                return Ok(());
+            }
+            samples_ms.sort_unstable();
+            let sum: u64 = samples_ms.iter().sum();
+            let avg = sum as f64 / samples_ms.len() as f64;
+            let p50 = samples_ms[(samples_ms.len() as f64 * 0.50) as usize];
+            let p95 = samples_ms[(samples_ms.len() as f64 * 0.95) as usize.min(samples_ms.len()-1)];
+            let p99 = samples_ms[(samples_ms.len() as f64 * 0.99) as usize.min(samples_ms.len()-1)];
+            let duration_ms = current_millis() - test_begin;
+            let tps = if duration_ms == 0 { 0.0 } else { (success + failed) as f64 * 1000.0 / duration_ms as f64 };
+            println!("total={}, success={}, failed={}, min={}, max={}, avg={:.2}, p50={}, p95={}, p99={}, durationMs={}, tps={:.2}",
+                success + failed, success, failed, samples_ms[0], samples_ms[samples_ms.len()-1], avg, p50, p95, p99, duration_ms, tps);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 86 - 132, The loop currently only sums RT into time_elapsed and
divides by (amount - 1), which is unsafe and missing metrics; change
check_msg_send_rt_sub_command.rs to collect per-send round-trip times into a
Vec<u64> (e.g., rts), count successful sends (success_count) and total_attempts,
and accumulate total_time_ms; compute average RT as sum(rts)/rts.len() with a
guard for rts.is_empty(), compute percentiles (p50/p95/p99) by sorting rts, and
compute throughput as success_count / (total_time_ms as f64 / 1000.0); use
current_millis() to produce each per-send RT (end - start) and push to rts
instead of only adding to time_elapsed, and replace the unsafe rt calculation
(rt = time_elapsed as f64 / (amount as i64 - 1) as f64) with the safe
average/percentile/throughput outputs; keep usage of
producer.send_with_selector, broker_name_holder, queue_id_holder, amount
unchanged.

Ok(())
}
.await;

producer.shutdown().await;
operation_result
}
}
Loading