[ISSUE #6403]🚀Implement CheckMsgSendRT Command for Message Send Performance Testing#6607
[ISSUE #6403]🚀Implement CheckMsgSendRT Command for Message Send Performance Testing#6607
Conversation
…erformance Testing
|
🔊@WaterWhisperer 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis PR implements a new Changes
Sequence DiagramsequenceDiagram
actor User
participant CLI as CLI Parser
participant Cmd as MessageCommands
participant SubCmd as CheckMsgSendRTSubCommand
participant Producer as DefaultMQProducer
participant Broker
User->>CLI: Invoke checkMsgSendRT command
CLI->>Cmd: Parse command args
Cmd->>SubCmd: Delegate execute()
SubCmd->>Producer: Create & configure
SubCmd->>Producer: Start producer
loop For each message (amount times)
SubCmd->>SubCmd: Generate message body
SubCmd->>SubCmd: Record start time
SubCmd->>Producer: Send message
Producer->>Broker: Send request
Broker->>Producer: Send response
Producer->>SubCmd: Return send result
SubCmd->>SubCmd: Calculate duration
SubCmd->>SubCmd: Print result (broker, queue, success, duration)
end
SubCmd->>SubCmd: Calculate average RT
SubCmd->>SubCmd: Print summary stats
SubCmd->>Producer: Shutdown
SubCmd->>User: Display final results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`:
- Around line 39-56: The CLI currently only exposes amount and size but needs a
separate per-round send count; add a new arg field named count (e.g., short =
'c', long = "count", type u64, default_value = "1", help = "per-round message
count | default 1") in the same struct (the CheckMsgSendRtSubCommand struct),
change the existing amount help to indicate it represents rounds (e.g., "rounds
| default 100"), and update all code locations that consume amount to treat
amount as rounds and use count for per-round sends (compute total sends as
rounds * count or use both where the send loop is implemented); ensure help
texts and any validation or calculations that previously used amount are
adjusted to use count where appropriate and keep size unchanged.
- Around line 86-132: The loop currently only sums RT into time_elapsed and
divides by (amount - 1), which is unsafe and missing metrics; change
check_msg_send_rt_sub_command.rs to collect per-send round-trip times into a
Vec<u64> (e.g., rts), count successful sends (success_count) and total_attempts,
and accumulate total_time_ms; compute average RT as sum(rts)/rts.len() with a
guard for rts.is_empty(), compute percentiles (p50/p95/p99) by sorting rts, and
compute throughput as success_count / (total_time_ms as f64 / 1000.0); use
current_millis() to produce each per-send RT (end - start) and push to rts
instead of only adding to time_elapsed, and replace the unsafe rt calculation
(rt = time_elapsed as f64 / (amount as i64 - 1) as f64) with the safe
average/percentile/throughput outputs; keep usage of
producer.send_with_selector, broker_name_holder, queue_id_holder, amount
unchanged.
- Around line 95-100: The selector closure passed into send_select_impl can
panic when mqs is empty due to (*arg as usize) % mqs.len(); update the closure
(the selector used for send_select_impl that writes to bn and qi) to first check
if mqs.is_empty() and return None (or otherwise signal no selection) instead of
performing the modulo; this prevents a divide-by-zero panic and allows the
caller to treat the send as a failed send and continue counting failures rather
than aborting.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs
| #[arg( | ||
| short = 'a', | ||
| long = "amount", | ||
| required = false, | ||
| default_value = "100", | ||
| help = "message amount | default 100" | ||
| )] | ||
| amount: u64, | ||
|
|
||
| #[arg( | ||
| short = 's', | ||
| long = "size", | ||
| required = false, | ||
| default_value = "128", | ||
| help = "message size | default 128 Byte" | ||
| )] | ||
| size: usize, | ||
| } |
There was a problem hiding this comment.
Split rounds from per-round send count (amount vs count).
This implementation uses amount as total sends and does not expose a per-round count, so the required round-based test model cannot run.
Suggested structural fix
pub struct CheckMsgSendRTSubCommand {
@@
#[arg(
+ short = 'c',
+ long = "count",
+ required = false,
+ default_value = "50",
+ help = "messages per round | default 50"
+ )]
+ count: u64,
+
+ #[arg(
short = 'a',
long = "amount",
required = false,
default_value = "100",
- help = "message amount | default 100"
+ help = "round amount | default 100"
)]
amount: u64,
@@
- let amount = self.amount;
+ let rounds = self.amount;
+ let count = self.count;
@@
- for i in 0..amount {
- // send once
- }
+ for round in 0..rounds {
+ for i in 0..count {
+ // send once
+ }
+ // print per-round summary
+ }Also applies to: 73-74, 88-89
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 39 - 56, The CLI currently only exposes amount and size but needs a
separate per-round send count; add a new arg field named count (e.g., short =
'c', long = "count", type u64, default_value = "1", help = "per-round message
count | default 1") in the same struct (the CheckMsgSendRtSubCommand struct),
change the existing amount help to indicate it represents rounds (e.g., "rounds
| default 100"), and update all code locations that consume amount to treat
amount as rounds and use count for per-round sends (compute total sends as
rounds * count or use both where the send loop is implemented); ensure help
texts and any validation or calculations that previously used amount are
adjusted to use count where appropriate and keep size unchanged.
| let mut time_elapsed: u64 = 0; | ||
|
|
||
| for i in 0..amount { | ||
| let start = current_millis(); | ||
| let send_success; | ||
| let end; | ||
|
|
||
| let bn = broker_name_holder.clone(); | ||
| let qi = queue_id_holder.clone(); | ||
| let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> { | ||
| let queue_index = (*arg as usize) % mqs.len(); | ||
| let queue = &mqs[queue_index]; | ||
| *bn.lock().unwrap() = queue.broker_name().to_string(); | ||
| *qi.lock().unwrap() = queue.queue_id(); | ||
| Some(queue.clone()) | ||
| }; | ||
|
|
||
| match producer.send_with_selector(msg.clone(), selector, i).await { | ||
| Ok(_) => { | ||
| send_success = true; | ||
| end = current_millis(); | ||
| } | ||
| Err(_) => { | ||
| send_success = false; | ||
| end = current_millis(); | ||
| } | ||
| } | ||
|
|
||
| let broker_name = broker_name_holder.lock().unwrap().clone(); | ||
| let queue_id = *queue_id_holder.lock().unwrap(); | ||
|
|
||
| if i != 0 { | ||
| time_elapsed += end - start; | ||
| } | ||
|
|
||
| println!( | ||
| "{:<32} {:<4} {:<20} {}", | ||
| broker_name, | ||
| queue_id, | ||
| send_success, | ||
| end - start | ||
| ); | ||
| } | ||
|
|
||
| let rt = time_elapsed as f64 / (amount as i64 - 1) as f64; | ||
| println!("Avg RT: {:.2}", rt); | ||
|
|
There was a problem hiding this comment.
RT aggregation is incomplete and average calculation is unsafe for low sample counts.
The command currently emits per-send rows plus one average, but does not compute required totals/percentiles/throughput, and amount - 1 produces invalid math for amount <= 1.
Suggested metrics shape
- let mut time_elapsed: u64 = 0;
+ let mut samples_ms: Vec<u64> = Vec::new();
+ let mut success: u64 = 0;
+ let mut failed: u64 = 0;
+ let test_begin = current_millis();
@@
- if i != 0 {
- time_elapsed += end - start;
- }
+ let cost = end - start;
+ samples_ms.push(cost);
+ if send_success { success += 1; } else { failed += 1; }
@@
- let rt = time_elapsed as f64 / (amount as i64 - 1) as f64;
- println!("Avg RT: {:.2}", rt);
+ if samples_ms.is_empty() {
+ println!("No samples collected.");
+ return Ok(());
+ }
+ samples_ms.sort_unstable();
+ let sum: u64 = samples_ms.iter().sum();
+ let avg = sum as f64 / samples_ms.len() as f64;
+ let p50 = samples_ms[(samples_ms.len() as f64 * 0.50) as usize];
+ let p95 = samples_ms[(samples_ms.len() as f64 * 0.95) as usize.min(samples_ms.len()-1)];
+ let p99 = samples_ms[(samples_ms.len() as f64 * 0.99) as usize.min(samples_ms.len()-1)];
+ let duration_ms = current_millis() - test_begin;
+ let tps = if duration_ms == 0 { 0.0 } else { (success + failed) as f64 * 1000.0 / duration_ms as f64 };
+ println!("total={}, success={}, failed={}, min={}, max={}, avg={:.2}, p50={}, p95={}, p99={}, durationMs={}, tps={:.2}",
+ success + failed, success, failed, samples_ms[0], samples_ms[samples_ms.len()-1], avg, p50, p95, p99, duration_ms, tps);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 86 - 132, The loop currently only sums RT into time_elapsed and
divides by (amount - 1), which is unsafe and missing metrics; change
check_msg_send_rt_sub_command.rs to collect per-send round-trip times into a
Vec<u64> (e.g., rts), count successful sends (success_count) and total_attempts,
and accumulate total_time_ms; compute average RT as sum(rts)/rts.len() with a
guard for rts.is_empty(), compute percentiles (p50/p95/p99) by sorting rts, and
compute throughput as success_count / (total_time_ms as f64 / 1000.0); use
current_millis() to produce each per-send RT (end - start) and push to rts
instead of only adding to time_elapsed, and replace the unsafe rt calculation
(rt = time_elapsed as f64 / (amount as i64 - 1) as f64) with the safe
average/percentile/throughput outputs; keep usage of
producer.send_with_selector, broker_name_holder, queue_id_holder, amount
unchanged.
| let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> { | ||
| let queue_index = (*arg as usize) % mqs.len(); | ||
| let queue = &mqs[queue_index]; | ||
| *bn.lock().unwrap() = queue.broker_name().to_string(); | ||
| *qi.lock().unwrap() = queue.queue_id(); | ||
| Some(queue.clone()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get the file structure first
fd -e rs check_msg_send_rt_sub_command.rs
# Read the file to understand context
if [ -f "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs" ]; then
echo "=== File found, reading around lines 90-110 ==="
sed -n '85,115p' rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs
fiRepository: mxsm/rocketmq-rust
Length of output: 1465
🏁 Script executed:
#!/bin/bash
# Search for send_with_selector usage to understand how mqs is populated
rg -n 'send_with_selector' --type rust -B5 -A5Repository: mxsm/rocketmq-rust
Length of output: 40971
🏁 Script executed:
#!/bin/bash
# Search for MessageQueue and queue validation patterns
rg -n 'MessageQueue' rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs -B2 -A2 | head -100Repository: mxsm/rocketmq-rust
Length of output: 742
🏁 Script executed:
#!/bin/bash
# Find the send_with_selector implementation to see queue validation
rg -n 'pub async fn send_with_selector' --type rust -A 20 | head -60Repository: mxsm/rocketmq-rust
Length of output: 4707
🏁 Script executed:
#!/bin/bash
# Look for select_one_message_queue or queue validation patterns
rg -n 'select_one_message_queue|\.queues\(\)|topic_route' --type rust rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs -A3 -B3 | head -100Repository: mxsm/rocketmq-rust
Length of output: 2965
🏁 Script executed:
#!/bin/bash
# Find send_select_impl to see how queues are passed to selector
rg -n 'async fn send_select_impl' --type rust -A 30Repository: mxsm/rocketmq-rust
Length of output: 3806
🏁 Script executed:
#!/bin/bash
# Continue reading send_select_impl to see selector invocation
sed -n '632,730p' rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 4194
Guard the selector against empty queue lists to avoid a hard panic.
The send_select_impl implementation passes message_queue_list directly to the selector closure without validating it is non-empty. Since (*arg as usize) % mqs.len() panics when mqs is empty, this aborts the command instead of counting a failed send and continuing.
Safe selector guard
let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> {
+ if mqs.is_empty() {
+ return None;
+ }
let queue_index = (*arg as usize) % mqs.len();
let queue = &mqs[queue_index];
*bn.lock().unwrap() = queue.broker_name().to_string();
*qi.lock().unwrap() = queue.queue_id();
Some(queue.clone())
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> { | |
| let queue_index = (*arg as usize) % mqs.len(); | |
| let queue = &mqs[queue_index]; | |
| *bn.lock().unwrap() = queue.broker_name().to_string(); | |
| *qi.lock().unwrap() = queue.queue_id(); | |
| Some(queue.clone()) | |
| let selector = move |mqs: &[MessageQueue], _msg: &Message, arg: &u64| -> Option<MessageQueue> { | |
| if mqs.is_empty() { | |
| return None; | |
| } | |
| let queue_index = (*arg as usize) % mqs.len(); | |
| let queue = &mqs[queue_index]; | |
| *bn.lock().unwrap() = queue.broker_name().to_string(); | |
| *qi.lock().unwrap() = queue.queue_id(); | |
| Some(queue.clone()) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/check_msg_send_rt_sub_command.rs`
around lines 95 - 100, The selector closure passed into send_select_impl can
panic when mqs is empty due to (*arg as usize) % mqs.len(); update the closure
(the selector used for send_select_impl that writes to bn and qi) to first check
if mqs.is_empty() and return None (or otherwise signal no selection) instead of
performing the modulo; this prevents a divide-by-zero panic and allows the
caller to treat the send as a failed send and continue counting failures rather
than aborting.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6607 +/- ##
==========================================
- Coverage 41.90% 41.54% -0.36%
==========================================
Files 955 961 +6
Lines 133409 134560 +1151
==========================================
- Hits 55910 55909 -1
- Misses 77499 78651 +1152 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features