[ISSUE #6393]🚀Implement HAStatus Command for High Availability Runtime Monitoring#6481
[ISSUE #6393]🚀Implement HAStatus Command for High Availability Runtime Monitoring#6481
Conversation
…ntime Monitoring
|
🔊@WaterWhisperer 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis pull request implements the Changes
Sequence DiagramsequenceDiagram
participant Client as Admin Client
participant AdminAPI as DefaultMQAdminExt
participant ClientAPI as MQClientAPIImpl
participant Broker as AdminBrokerProcessor
participant Handler as GetBrokerHaStatusHandler
participant Store as MessageStore & HA Service
Client->>AdminAPI: get_broker_ha_status(broker_addr)
AdminAPI->>ClientAPI: get_broker_ha_status(broker_addr, timeout)
ClientAPI->>Broker: GetBrokerHaStatus request
Broker->>Handler: get_broker_ha_status(request)
Handler->>Store: Retrieve MessageStore
Handler->>Store: Get HA Service
Handler->>Store: Get max offset & HA runtime info
Handler->>Handler: Serialize HARuntimeInfo to JSON
Handler-->>Broker: Response with HA data
Broker-->>ClientAPI: RemotingCommand response
ClientAPI->>ClientAPI: Decode HARuntimeInfo from body
ClientAPI-->>AdminAPI: HARuntimeInfo
AdminAPI-->>Client: HARuntimeInfo
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/ha_commands/ha_status_sub_command.rs`:
- Around line 31-108: The HAStatusSubCommand currently allows both or neither of
-b/--brokerAddr and -c/--clusterName and silently prints an error instead of
failing; add an ArgGroup to the HAStatusSubCommand struct to require exactly one
of broker_addr and cluster_name (match patterns used in
BrokerStatusSubCommand/GetBrokerConfigSubCommand), remove the manual println
error in inner_exec, and return a RocketMQError (or any appropriate Err from
inner_exec) when neither is provided; update the struct annotation to include
#[group(...)] (or the clap ArgGroup equivalent) referencing "broker_addr" and
"cluster_name" and adjust inner_exec and execute error handling so the command
fails fast when arg validation fails.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
rocketmq-broker/src/processor/admin_broker_processor.rsrocketmq-broker/src/processor/admin_broker_processor/get_broker_ha_status_handler.rsrocketmq-client/src/admin/default_mq_admin_ext_impl.rsrocketmq-client/src/implementation/mq_client_api_impl.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/ha_commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/ha_commands/ha_status_sub_command.rs
| #[derive(Debug, Clone, Parser)] | ||
| pub struct HAStatusSubCommand { | ||
| #[arg( | ||
| short = 'b', | ||
| long = "brokerAddr", | ||
| value_name = "HOST:PORT", | ||
| required = false, | ||
| help = "which broker to fetch" | ||
| )] | ||
| broker_addr: Option<String>, | ||
|
|
||
| #[arg( | ||
| short = 'c', | ||
| long = "clusterName", | ||
| value_name = "CLUSTER", | ||
| required = false, | ||
| help = "which cluster" | ||
| )] | ||
| cluster_name: Option<String>, | ||
|
|
||
| #[arg( | ||
| short = 'i', | ||
| long = "interval", | ||
| value_name = "SECONDS", | ||
| required = false, | ||
| help = "the interval(second) of get info" | ||
| )] | ||
| interval: Option<u64>, | ||
| } | ||
|
|
||
| impl CommandExecute for HAStatusSubCommand { | ||
| async fn execute(&self, _rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> { | ||
| let mut default_mqadmin_ext = DefaultMQAdminExt::new(); | ||
| default_mqadmin_ext | ||
| .client_config_mut() | ||
| .set_instance_name(get_current_millis().to_string().into()); | ||
|
|
||
| let operation_result = async { | ||
| MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| { | ||
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to start MQAdminExt: {}", e)) | ||
| })?; | ||
|
|
||
| if let Some(interval) = self.interval { | ||
| let flush_second = if interval > 0 { interval } else { 3 }; | ||
| loop { | ||
| self.inner_exec(&default_mqadmin_ext).await?; | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(flush_second)).await; | ||
| } | ||
| } else { | ||
| self.inner_exec(&default_mqadmin_ext).await?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| .await; | ||
|
|
||
| MQAdminExt::shutdown(&mut default_mqadmin_ext).await; | ||
| operation_result | ||
| } | ||
| } | ||
|
|
||
| impl HAStatusSubCommand { | ||
| async fn inner_exec(&self, default_mqadmin_ext: &DefaultMQAdminExt) -> RocketMQResult<()> { | ||
| if let Some(ref broker_addr) = self.broker_addr { | ||
| Self::print_status(broker_addr.trim(), default_mqadmin_ext).await?; | ||
| } else if let Some(ref cluster_name) = self.cluster_name { | ||
| let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await.map_err(|e| { | ||
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to get cluster info: {}", e)) | ||
| })?; | ||
| let master_addrs = CommandUtil::fetch_master_addr_by_cluster_name(&cluster_info, cluster_name.trim())?; | ||
| for addr in master_addrs { | ||
| Self::print_status(&addr, default_mqadmin_ext).await?; | ||
| } | ||
| } else { | ||
| println!("Error: either -b (brokerAddr) or -c (clusterName) must be specified"); | ||
| } | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find Cargo.toml files and check clap version
fd -n "Cargo.toml" -x grep -l "clap" {} \; | head -5Repository: mxsm/rocketmq-rust
Length of output: 292
🏁 Script executed:
#!/bin/bash
# Read the main Cargo.toml to find clap version
find . -name "Cargo.toml" -type f | head -1 | xargs cat | grep -A 2 "clap"Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
#!/bin/bash
# Get the specific file and read lines 31-108
cat -n "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/ha_commands/ha_status_sub_command.rs" | sed -n '31,108p'Repository: mxsm/rocketmq-rust
Length of output: 3337
🏁 Script executed:
#!/bin/bash
# Check if there's a Cargo.lock and look for clap version there
if [ -f "Cargo.lock" ]; then
grep -A 2 "name = \"clap\"" Cargo.lock | head -10
fiRepository: mxsm/rocketmq-rust
Length of output: 159
🏁 Script executed:
#!/bin/bash
# Search for ArgGroup usage in the codebase
rg "ArgGroup" --type rustRepository: mxsm/rocketmq-rust
Length of output: 6687
🏁 Script executed:
#!/bin/bash
# Look for other command implementations to see patterns
fd "sub_command.rs" -x grep -l "impl CommandExecute" {} \; | head -3Repository: mxsm/rocketmq-rust
Length of output: 375
🏁 Script executed:
#!/bin/bash
# Check the main Cargo.toml for the clap dependency definition
cat rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml | grep -A 3 "clap"Repository: mxsm/rocketmq-rust
Length of output: 259
🏁 Script executed:
#!/bin/bash
# Check an example ArgGroup implementation to see the exact pattern
cat rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_status_sub_command.rs | head -100Repository: mxsm/rocketmq-rust
Length of output: 3847
🏁 Script executed:
#!/bin/bash
# Also check get_broker_config_sub_command.rs for comparison
cat rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/get_broker_config_sub_command.rs | head -100Repository: mxsm/rocketmq-rust
Length of output: 3433
🏁 Script executed:
#!/bin/bash
# Check the inner_exec error handling in broker_status_sub_command.rs
grep -A 20 "async fn inner_exec" rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_status_sub_command.rsRepository: mxsm/rocketmq-rust
Length of output: 44
Fix argument validation: use ArgGroup to enforce one-of-two requirement for -b or -c.
The struct accepts both flags (silently preferring -b when both provided) and prints an error without failing when neither is set. This violates the CLI contract. Use an ArgGroup like other similar commands in the codebase (e.g., BrokerStatusSubCommand, GetBrokerConfigSubCommand) and return a proper error instead of printing and returning Ok(()).
✅ Suggested fix
+use clap::ArgGroup;
#[derive(Debug, Clone, Parser)]
+#[command(group(
+ clap::ArgGroup::new("target")
+ .required(true)
+ .args(&["broker_addr", "cluster_name"])
+))]
pub struct HAStatusSubCommand {
#[arg(
short = 'b',
long = "brokerAddr",
value_name = "HOST:PORT",
required = false,
help = "which broker to fetch"
)]
broker_addr: Option<String>,
#[arg(
short = 'c',
long = "clusterName",
value_name = "CLUSTER",
required = false,
help = "which cluster"
)]
cluster_name: Option<String>,
@@
} else {
- println!("Error: either -b (brokerAddr) or -c (clusterName) must be specified");
+ return Err(RocketMQError::IllegalArgument(
+ "Either -b (brokerAddr) or -c (clusterName) must be specified".into(),
+ ));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[derive(Debug, Clone, Parser)] | |
| pub struct HAStatusSubCommand { | |
| #[arg( | |
| short = 'b', | |
| long = "brokerAddr", | |
| value_name = "HOST:PORT", | |
| required = false, | |
| help = "which broker to fetch" | |
| )] | |
| broker_addr: Option<String>, | |
| #[arg( | |
| short = 'c', | |
| long = "clusterName", | |
| value_name = "CLUSTER", | |
| required = false, | |
| help = "which cluster" | |
| )] | |
| cluster_name: Option<String>, | |
| #[arg( | |
| short = 'i', | |
| long = "interval", | |
| value_name = "SECONDS", | |
| required = false, | |
| help = "the interval(second) of get info" | |
| )] | |
| interval: Option<u64>, | |
| } | |
| impl CommandExecute for HAStatusSubCommand { | |
| async fn execute(&self, _rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> { | |
| let mut default_mqadmin_ext = DefaultMQAdminExt::new(); | |
| default_mqadmin_ext | |
| .client_config_mut() | |
| .set_instance_name(get_current_millis().to_string().into()); | |
| let operation_result = async { | |
| MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| { | |
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to start MQAdminExt: {}", e)) | |
| })?; | |
| if let Some(interval) = self.interval { | |
| let flush_second = if interval > 0 { interval } else { 3 }; | |
| loop { | |
| self.inner_exec(&default_mqadmin_ext).await?; | |
| tokio::time::sleep(tokio::time::Duration::from_secs(flush_second)).await; | |
| } | |
| } else { | |
| self.inner_exec(&default_mqadmin_ext).await?; | |
| } | |
| Ok(()) | |
| } | |
| .await; | |
| MQAdminExt::shutdown(&mut default_mqadmin_ext).await; | |
| operation_result | |
| } | |
| } | |
| impl HAStatusSubCommand { | |
| async fn inner_exec(&self, default_mqadmin_ext: &DefaultMQAdminExt) -> RocketMQResult<()> { | |
| if let Some(ref broker_addr) = self.broker_addr { | |
| Self::print_status(broker_addr.trim(), default_mqadmin_ext).await?; | |
| } else if let Some(ref cluster_name) = self.cluster_name { | |
| let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await.map_err(|e| { | |
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to get cluster info: {}", e)) | |
| })?; | |
| let master_addrs = CommandUtil::fetch_master_addr_by_cluster_name(&cluster_info, cluster_name.trim())?; | |
| for addr in master_addrs { | |
| Self::print_status(&addr, default_mqadmin_ext).await?; | |
| } | |
| } else { | |
| println!("Error: either -b (brokerAddr) or -c (clusterName) must be specified"); | |
| } | |
| Ok(()) | |
| use clap::ArgGroup; | |
| #[derive(Debug, Clone, Parser)] | |
| #[command(group( | |
| clap::ArgGroup::new("target") | |
| .required(true) | |
| .args(&["broker_addr", "cluster_name"]) | |
| ))] | |
| pub struct HAStatusSubCommand { | |
| #[arg( | |
| short = 'b', | |
| long = "brokerAddr", | |
| value_name = "HOST:PORT", | |
| required = false, | |
| help = "which broker to fetch" | |
| )] | |
| broker_addr: Option<String>, | |
| #[arg( | |
| short = 'c', | |
| long = "clusterName", | |
| value_name = "CLUSTER", | |
| required = false, | |
| help = "which cluster" | |
| )] | |
| cluster_name: Option<String>, | |
| #[arg( | |
| short = 'i', | |
| long = "interval", | |
| value_name = "SECONDS", | |
| required = false, | |
| help = "the interval(second) of get info" | |
| )] | |
| interval: Option<u64>, | |
| } | |
| impl CommandExecute for HAStatusSubCommand { | |
| async fn execute(&self, _rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> { | |
| let mut default_mqadmin_ext = DefaultMQAdminExt::new(); | |
| default_mqadmin_ext | |
| .client_config_mut() | |
| .set_instance_name(get_current_millis().to_string().into()); | |
| let operation_result = async { | |
| MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| { | |
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to start MQAdminExt: {}", e)) | |
| })?; | |
| if let Some(interval) = self.interval { | |
| let flush_second = if interval > 0 { interval } else { 3 }; | |
| loop { | |
| self.inner_exec(&default_mqadmin_ext).await?; | |
| tokio::time::sleep(tokio::time::Duration::from_secs(flush_second)).await; | |
| } | |
| } else { | |
| self.inner_exec(&default_mqadmin_ext).await?; | |
| } | |
| Ok(()) | |
| } | |
| .await; | |
| MQAdminExt::shutdown(&mut default_mqadmin_ext).await; | |
| operation_result | |
| } | |
| } | |
| impl HAStatusSubCommand { | |
| async fn inner_exec(&self, default_mqadmin_ext: &DefaultMQAdminExt) -> RocketMQResult<()> { | |
| if let Some(ref broker_addr) = self.broker_addr { | |
| Self::print_status(broker_addr.trim(), default_mqadmin_ext).await?; | |
| } else if let Some(ref cluster_name) = self.cluster_name { | |
| let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await.map_err(|e| { | |
| RocketMQError::Internal(format!("HAStatusSubCommand: Failed to get cluster info: {}", e)) | |
| })?; | |
| let master_addrs = CommandUtil::fetch_master_addr_by_cluster_name(&cluster_info, cluster_name.trim())?; | |
| for addr in master_addrs { | |
| Self::print_status(&addr, default_mqadmin_ext).await?; | |
| } | |
| } else { | |
| return Err(RocketMQError::IllegalArgument( | |
| "Either -b (brokerAddr) or -c (clusterName) must be specified".into(), | |
| )); | |
| } | |
| Ok(()) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/ha_commands/ha_status_sub_command.rs`
around lines 31 - 108, The HAStatusSubCommand currently allows both or neither
of -b/--brokerAddr and -c/--clusterName and silently prints an error instead of
failing; add an ArgGroup to the HAStatusSubCommand struct to require exactly one
of broker_addr and cluster_name (match patterns used in
BrokerStatusSubCommand/GetBrokerConfigSubCommand), remove the manual println
error in inner_exec, and return a RocketMQError (or any appropriate Err from
inner_exec) when neither is provided; update the struct annotation to include
#[group(...)] (or the clap ArgGroup equivalent) referencing "broker_addr" and
"cluster_name" and adjust inner_exec and execute error handling so the command
fails fast when arg validation fails.
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6481 +/- ##
==========================================
- Coverage 42.24% 42.19% -0.05%
==========================================
Files 940 942 +2
Lines 131560 131708 +148
==========================================
- Hits 55579 55577 -2
- Misses 75981 76131 +150 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit