[ISSUE #6389]🚀Implement ExportPopRecord Command in rocketmq-admin-core#6517
[ISSUE #6389]🚀Implement ExportPopRecord Command in rocketmq-admin-core#6517
Conversation
|
🔊@WaterWhisperer 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughAdds a new CLI subcommand Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant CLI as CLI Parser
participant Cmd as ExportPopRecordSubCommand
participant Admin as DefaultMQAdminExt
participant Broker as Broker
User->>CLI: exportPopRecord --broker-addr <addr> [--dry-run]
CLI->>Cmd: parse args & execute()
Cmd->>Admin: init & start (with optional RPCHook)
Admin-->>Cmd: ready
rect rgba(100, 150, 200, 0.5)
Cmd->>Admin: get_broker_config(<addr>)
Admin->>Broker: query config
Broker-->>Admin: broker_name
Admin-->>Cmd: broker_name
alt dry_run = false
Cmd->>Admin: export_pop_records(<addr>, timeout)
Admin->>Broker: trigger export
Broker-->>Admin: success/failure
Admin-->>Cmd: result
Cmd->>User: print status
else
Cmd->>User: print dry-run preview
end
end
Cmd->>Admin: shutdown
Admin-->>Cmd: closed
sequenceDiagram
actor User
participant CLI as CLI Parser
participant Cmd as ExportPopRecordSubCommand
participant Admin as DefaultMQAdminExt
participant Broker1 as Broker 1
participant BrokerN as Broker N
User->>CLI: exportPopRecord --cluster-name <name> [--dry-run]
CLI->>Cmd: parse args & execute()
Cmd->>Admin: init & start
Admin-->>Cmd: ready
rect rgba(150, 100, 200, 0.5)
Cmd->>Admin: examine_broker_cluster_info()
Admin->>Broker1: request topology
Admin->>BrokerN: request topology
Broker1-->>Admin: info
BrokerN-->>Admin: info
Admin-->>Cmd: list of broker addrs
loop for each broker
alt dry_run = false
Cmd->>Admin: export_pop_records(broker_addr, timeout)
Admin->>Broker1: trigger export
Broker1-->>Admin: result
Admin-->>Cmd: result
Cmd->>User: print status
else
Cmd->>User: print dry-run preview
end
end
end
Cmd->>Admin: shutdown
Admin-->>Cmd: closed
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs (1)
148-151: Unreachableelsebranch is dead codeThe early validation at lines 95–99 guarantees that at least one of
broker_addrorcluster_nameisSomebefore reaching theif/else if/elsechain. The finalelseat line 148 can never be reached. It is harmless but adds noise. Consider replacing it withunreachable!()to make the intent explicit.- } else { - Err(RocketMQError::IllegalArgument( - "ExportPopRecordSubCommand: Either brokerAddr (-b) or clusterName (-c) must be provided".into(), - )) - }; + } else { + unreachable!("validation at entry guarantees broker_addr or cluster_name is Some") + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs` around lines 148 - 151, The final else branch in ExportPopRecordSubCommand that returns Err(RocketMQError::IllegalArgument(...)) is unreachable because earlier validation ensures broker_addr or cluster_name is Some; replace that unreachable error branch with unreachable!() (or remove it) to make intent explicit and avoid dead code—locate the if/else if/else chain that checks broker_addr and cluster_name and change the final else arm to unreachable!() referencing ExportPopRecordSubCommand, broker_addr, cluster_name and the current RocketMQError::IllegalArgument usage so callers and tools see the unreachable state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs`:
- Around line 118-128: When get_broker_config on admin_ext fails you currently
swallow the error and set broker_name to empty which leads to confusing output;
update the match for admin_ext.get_broker_config(...) in
export_pop_record_sub_command.rs so that on Err(e) you emit a warning (to stderr
/ process logger) including the error and the broker_addr (e.g. via
e.to_string()) before falling back to CheetahString::from(""), then continue to
call Self::export(&admin_ext, broker_addr, broker_name.as_str(),
self.dry_run).await; reference get_broker_config, broker_name, broker_addr,
admin_ext and Self::export to locate and modify the match arm.
- Around line 129-154: The call to
admin_ext.examine_broker_cluster_info().await? in execute can early-return on
error and skip admin_ext.shutdown().await; replace the `?` usage by explicitly
matching the Result from admin_ext.examine_broker_cluster_info().await (e.g.,
let cluster_info = match admin_ext.examine_broker_cluster_info().await { Ok(ci)
=> ci, Err(e) => { admin_ext.shutdown().await; return Err(e); } }) so that on
error you first await admin_ext.shutdown() and then return the error; keep the
rest of the logic using cluster_info as before.
- Around line 65-90: The helper async function export currently swallows errors
and prints them to stdout; change export(admin_ext: &DefaultMQAdminExt,
broker_addr: &str, broker_name: &str, dry_run: bool) to return Result<(),
ExportError> (or anyhow::Result) so callers can detect failures, replace
println! for error cases with eprintln! when handling Err from
export_pop_records, propagate or map the error from
admin_ext.export_pop_records(...) into the function's Err, and then update all
call sites (e.g. the Self::export(...).await; Ok(()) call) to await and
propagate/collect the Result instead of always returning Ok(()). Ensure success
paths still return Ok(()).
---
Nitpick comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs`:
- Around line 148-151: The final else branch in ExportPopRecordSubCommand that
returns Err(RocketMQError::IllegalArgument(...)) is unreachable because earlier
validation ensures broker_addr or cluster_name is Some; replace that unreachable
error branch with unreachable!() (or remove it) to make intent explicit and
avoid dead code—locate the if/else if/else chain that checks broker_addr and
cluster_name and change the final else arm to unreachable!() referencing
ExportPopRecordSubCommand, broker_addr, cluster_name and the current
RocketMQError::IllegalArgument usage so callers and tools see the unreachable
state.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
...ools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
Show resolved
Hide resolved
...ools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
Show resolved
Hide resolved
...ools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
Show resolved
Hide resolved
22a2df2 to
28f7ac1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs (1)
65-90:⚠️ Potential issue | 🟠 MajorPropagate export failures instead of always succeeding.
Line 65 defines
exportas(), so failures at Line 77 are only logged; then Line 127 and Lines 139-145 still lead toOk(()). This makes failed exports look successful.🐛 Proposed fix
- async fn export(admin_ext: &DefaultMQAdminExt, broker_addr: &str, broker_name: &str, dry_run: bool) { + async fn export( + admin_ext: &DefaultMQAdminExt, + broker_addr: &str, + broker_name: &str, + dry_run: bool, + ) -> RocketMQResult<()> { if !dry_run { match admin_ext .export_pop_records(CheetahString::from(broker_addr), EXPORT_POP_RECORD_TIMEOUT_MILLIS) .await { Ok(()) => { println!( "Export broker records, brokerName={}, brokerAddr={}, dryRun={}", broker_name, broker_addr, dry_run ); + Ok(()) } Err(e) => { eprintln!( "Export broker records error, brokerName={}, brokerAddr={}, dryRun={}\n{}", broker_name, broker_addr, dry_run, e ); + Err(e) } } } else { println!( "Export broker records, brokerName={}, brokerAddr={}, dryRun={}", broker_name, broker_addr, dry_run ); + Ok(()) } }- Self::export(&admin_ext, broker_addr, broker_name.as_str(), self.dry_run).await; - Ok(()) + Self::export(&admin_ext, broker_addr, broker_name.as_str(), self.dry_run).await- Self::export( + Self::export( &admin_ext, broker_addr.as_str(), broker_name.as_str(), self.dry_run, ) - .await; + .await?;Also applies to: 127-128, 139-146
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs` around lines 65 - 90, The export function currently swallows failures from admin_ext.export_pop_records and always returns success; change export (the async fn export) to return Result<(), E> (or boxed error) and propagate errors from admin_ext.export_pop_records by returning Err(e) when the call fails (keep the existing println for the Ok path and for dry_run return Ok(())), and update any callers that assumed export returned () (those invoking export at the sites referenced) to handle the Result (use ? or map_err to propagate). Ensure the exported timeout constant EXPORT_POP_RECORD_TIMEOUT_MILLIS and the method export_pop_records are left intact, only change the export function signature and its error handling/propagation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs`:
- Around line 133-152: The current block in the export_pop_record_sub_command
early-returns Ok(()) when cluster resolution yields no broker targets; change it
to return an explicit error instead. Inside the logic that inspects
cluster_info.cluster_addr_table and cluster_info.broker_addr_table for
CheetahString::from(cluster_name) and iterates broker_name_set/broker_addr,
detect the case where no broker addresses are found (e.g. broker_name_set is
missing/empty or broker_addr_table has no entries) and return a suitable Err
variant (propagate the command's error type) with a clear message like "no
brokers found for cluster {cluster_name}"; keep the existing successful path
that calls Self::export(...).await but ensure the function returns Err when
resolution fails rather than Ok(()).
---
Duplicate comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs`:
- Around line 65-90: The export function currently swallows failures from
admin_ext.export_pop_records and always returns success; change export (the
async fn export) to return Result<(), E> (or boxed error) and propagate errors
from admin_ext.export_pop_records by returning Err(e) when the call fails (keep
the existing println for the Ok path and for dry_run return Ok(())), and update
any callers that assumed export returned () (those invoking export at the sites
referenced) to handle the Result (use ? or map_err to propagate). Ensure the
exported timeout constant EXPORT_POP_RECORD_TIMEOUT_MILLIS and the method
export_pop_records are left intact, only change the export function signature
and its error handling/propagation.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
...ools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6517 +/- ##
==========================================
- Coverage 42.10% 42.07% -0.03%
==========================================
Files 949 950 +1
Lines 132503 132590 +87
==========================================
- Hits 55795 55793 -2
- Misses 76708 76797 +89 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
28f7ac1 to
30a6960
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (2)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs (2)
65-90:⚠️ Potential issue | 🟠 MajorPropagate export failures instead of always reporting success.
export()swallowsErrand returns(), so callers (e.g., Line 127 and Line 139-146) cannot fail the command when export fails.🐛 Proposed fix
- async fn export(admin_ext: &DefaultMQAdminExt, broker_addr: &str, broker_name: &str, dry_run: bool) { + async fn export( + admin_ext: &DefaultMQAdminExt, + broker_addr: &str, + broker_name: &str, + dry_run: bool, + ) -> RocketMQResult<()> { if !dry_run { match admin_ext .export_pop_records(CheetahString::from(broker_addr), EXPORT_POP_RECORD_TIMEOUT_MILLIS) .await { Ok(()) => { println!( "Export broker records, brokerName={}, brokerAddr={}, dryRun={}", broker_name, broker_addr, dry_run ); + Ok(()) } Err(e) => { eprintln!( "Export broker records error, brokerName={}, brokerAddr={}, dryRun={}\n{}", broker_name, broker_addr, dry_run, e ); + Err(e) } } } else { println!( - "Export broker records, brokerName={}, brokerAddr={}, dryRun={}", + "DRY RUN: Would export broker records, brokerName={}, brokerAddr={}, dryRun={}", broker_name, broker_addr, dry_run ); + Ok(()) } }- Self::export(&admin_ext, broker_addr, broker_name.as_str(), self.dry_run).await; - Ok(()) + Self::export(&admin_ext, broker_addr, broker_name.as_str(), self.dry_run).await- Self::export( + Self::export( &admin_ext, broker_addr.as_str(), broker_name.as_str(), self.dry_run, ) - .await; + .await?;Also applies to: 127-128, 139-146
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs` around lines 65 - 90, The export function currently swallows errors and returns () which prevents callers from detecting failures; change async fn export(...) to return a Result<(), E> (use the crate's common error type or Box<dyn Error>) and propagate the error from admin_ext.export_pop_records(...) by returning Err(e) when it fails, while keeping the success branch as Ok(()). Update the callers that invoke export (the call sites referenced around lines where export is invoked) to handle the Result (propagate with ? or map_err and fail the command) so command execution correctly fails when export_pop_records on DefaultMQAdminExt returns an error.
133-152:⚠️ Potential issue | 🟠 MajorReturn an explicit error when cluster resolution finds no brokers.
If
cluster_nameis missing in tables or resolves to no broker addresses, this branch currently falls through toOk(()), which is misleading for operators and scripts.🔍 Proposed fix
Ok(cluster_info) => { - if let Some(cluster_addr_table) = &cluster_info.cluster_addr_table { - if let Some(broker_name_set) = cluster_addr_table.get(&CheetahString::from(cluster_name)) { - if let Some(broker_addr_table) = &cluster_info.broker_addr_table { - for broker_name in broker_name_set { - if let Some(broker_data) = broker_addr_table.get(broker_name) { - for broker_addr in broker_data.broker_addrs().values() { - Self::export( - &admin_ext, - broker_addr.as_str(), - broker_name.as_str(), - self.dry_run, - ) - .await; - } - } - } - } - } - } + let cluster_addr_table = cluster_info + .cluster_addr_table + .as_ref() + .ok_or_else(|| RocketMQError::IllegalArgument("cluster address table is empty".into()))?; + let broker_name_set = cluster_addr_table + .get(&CheetahString::from(cluster_name)) + .ok_or_else(|| { + RocketMQError::IllegalArgument( + format!("no brokers found for cluster '{}'", cluster_name).into(), + ) + })?; + let broker_addr_table = cluster_info + .broker_addr_table + .as_ref() + .ok_or_else(|| RocketMQError::IllegalArgument("broker address table is empty".into()))?; + + let mut exported_any = false; + for broker_name in broker_name_set { + if let Some(broker_data) = broker_addr_table.get(broker_name) { + for broker_addr in broker_data.broker_addrs().values() { + exported_any = true; + Self::export( + &admin_ext, + broker_addr.as_str(), + broker_name.as_str(), + self.dry_run, + ) + .await?; + } + } + } + if !exported_any { + return Err(RocketMQError::IllegalArgument( + format!("no brokers found for cluster '{}'", cluster_name).into(), + )); + } Ok(()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs` around lines 133 - 152, The code currently silently returns Ok(()) when cluster resolution yields no brokers; update the logic in the block that uses cluster_info.cluster_addr_table, cluster_addr_table.get(&CheetahString::from(cluster_name)), and broker_addr_table.get(broker_name) so that if the cluster_name is not found or the resolved broker_name_set is empty or no broker_addrs are found you return an explicit Err with a descriptive error (e.g., "cluster not found" or "no broker addresses for cluster") instead of falling through to Ok(()); keep the existing loop that calls Self::export(&admin_ext, broker_addr.as_str(), broker_name.as_str(), self.dry_run).await when brokers are present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs`:
- Around line 65-90: The export function currently swallows errors and returns
() which prevents callers from detecting failures; change async fn export(...)
to return a Result<(), E> (use the crate's common error type or Box<dyn Error>)
and propagate the error from admin_ext.export_pop_records(...) by returning
Err(e) when it fails, while keeping the success branch as Ok(()). Update the
callers that invoke export (the call sites referenced around lines where export
is invoked) to handle the Result (propagate with ? or map_err and fail the
command) so command execution correctly fails when export_pop_records on
DefaultMQAdminExt returns an error.
- Around line 133-152: The code currently silently returns Ok(()) when cluster
resolution yields no brokers; update the logic in the block that uses
cluster_info.cluster_addr_table,
cluster_addr_table.get(&CheetahString::from(cluster_name)), and
broker_addr_table.get(broker_name) so that if the cluster_name is not found or
the resolved broker_name_set is empty or no broker_addrs are found you return an
explicit Err with a descriptive error (e.g., "cluster not found" or "no broker
addresses for cluster") instead of falling through to Ok(()); keep the existing
loop that calls Self::export(&admin_ext, broker_addr.as_str(),
broker_name.as_str(), self.dry_run).await when brokers are present.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/export/export_pop_record_sub_command.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit