-
Notifications
You must be signed in to change notification settings - Fork 226
[ISSUE #6276]🚀Implement ResetMasterFlushOffset command in rocketmq-admin-core #6299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
🔊@WaterWhisperer 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughImplements ResetMasterFlushOffset end-to-end: adds MQClientAPI RPC, plugs it into DefaultMQAdminExt implementation, exposes a new CLI subcommand and registers it in broker commands to send reset requests to a broker. Changes
Sequence DiagramsequenceDiagram
participant User
participant CLI as "CLI Command"
participant Cmd as "ResetMasterFlushOffsetSubCommand"
participant AdminExt as "DefaultMQAdminExt"
participant Impl as "DefaultMQAdminExtImpl"
participant ClientAPI as "MQClientAPIImpl"
participant Broker
User->>CLI: invoke resetMasterFlushOffset
CLI->>Cmd: execute(rpc_hook)
Cmd->>AdminExt: create & start
Cmd->>AdminExt: reset_master_flush_offset(brokerAddr, offset)
AdminExt->>Impl: reset_master_flush_offset(brokerAddr, offset)
Impl->>ClientAPI: reset_master_flush_offset(brokerAddr, offset)
ClientAPI->>Broker: send ResetMasterFlushOffset request (header with offset)
Broker-->>ClientAPI: response (success / error)
ClientAPI-->>Impl: RocketMQResult
Impl-->>AdminExt: RocketMQResult
AdminExt-->>Cmd: RocketMQResult
Cmd->>AdminExt: shutdown
Cmd-->>User: print result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-client/src/admin/default_mq_admin_ext_impl.rs (1)
924-936:⚠️ Potential issue | 🟠 MajorGuard the u64 → i64 cast to avoid wraparound.
master_flush_offsetisu64but the client API takesi64. If a value exceedsi64::MAX, the cast wraps and sends a negative offset.🛡️ Proposed fix
async fn reset_master_flush_offset( &self, broker_addr: CheetahString, master_flush_offset: u64, ) -> rocketmq_error::RocketMQResult<()> { + let master_flush_offset = i64::try_from(master_flush_offset).map_err(|_| { + rocketmq_error::RocketMQError::IllegalArgument( + "master_flush_offset exceeds i64::MAX".into(), + ) + })?; if let Some(ref mq_client_instance) = self.client_instance { mq_client_instance .get_mq_client_api_impl() - .reset_master_flush_offset(&broker_addr, master_flush_offset as i64) + .reset_master_flush_offset(&broker_addr, master_flush_offset) .await } else { Err(rocketmq_error::RocketMQError::ClientNotStarted) } }
🤖 Fix all issues with AI agents
In `@rocketmq-client/src/implementation/mq_client_api_impl.rs`:
- Around line 583-606: In reset_master_flush_offset, validate
master_flush_offset before creating the RemotingCommand: if master_flush_offset
is negative, return an Err immediately (use the existing mq_client_err! macro
with a descriptive message such as "invalid negative master_flush_offset")
instead of issuing the RPC to the broker; place this check at the top of the
function (inside reset_master_flush_offset, before constructing request_header
or calling remoting_client.invoke_request) and keep existing behavior for
non-negative values.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/reset_master_flush_offset_sub_command.rs`:
- Around line 27-76: Add a confirmation/dry‑run flow and before/after reporting
around the destructive reset: add a new --dry-run flag to
ResetMasterFlushOffsetSubCommand, check for a global --yes override when
prompting, and before calling DefaultMQAdminExt::reset_master_flush_offset(...)
fetch and print the current master flush offset using the admin query method
(use the existing DefaultMQAdminExt query method available in your codebase,
e.g., a query_master_flush_offset or similar), then if --dry-run is set print
what would be changed and skip calling reset_master_flush_offset; otherwise
prompt the user to confirm (respecting global --yes to auto-confirm), call
reset_master_flush_offset only after confirmation, re-query and print the offset
after the operation, and ensure MQAdminExt::shutdown(...) still runs in all
paths.
- Around line 29-63: The execute method of ResetMasterFlushOffsetSubCommand
currently defaults broker_addr to empty and offset to 0 and then casts negative
offsets to u64; update execute to require a non-empty broker address (return a
RocketMQError if self.broker_addr is None or trimmed string is empty) and
validate self.offset: reject None or negative values (return a RocketMQError
with a clear message), only then assign master_flush_offset as a u64 and call
default_mqadmin_ext.reset_master_flush_offset(broker_addr.into(),
master_flush_offset). Ensure the error messages reference
ResetMasterFlushOffsetSubCommand, broker_addr and offset for clarity.
🧹 Nitpick comments (1)
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
583-606: Make the timeout configurable instead of hard‑coding 3000ms.This method ignores the admin-configured timeout and can fail in slow networks. Consider accepting a
timeout_millisparameter (or using a configured default) to keep behavior consistent with other admin calls.
...in/rocketmq-admin-core/src/commands/broker_commands/reset_master_flush_offset_sub_command.rs
Show resolved
Hide resolved
...in/rocketmq-admin-core/src/commands/broker_commands/reset_master_flush_offset_sub_command.rs
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6299 +/- ##
==========================================
- Coverage 42.28% 42.26% -0.03%
==========================================
Files 908 909 +1
Lines 126807 126876 +69
==========================================
- Hits 53620 53618 -2
- Misses 73187 73258 +71 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - All CI checks passed ✅
daa4bb3 to
08f8956
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@rocketmq-client/src/implementation/mq_client_api_impl.rs`:
- Around line 583-607: The method reset_master_flush_offset currently hardcodes
a 3000ms timeout which breaks API consistency; update the function signature
reset_master_flush_offset to accept a timeout_millis: i64 (or u64 as used
elsewhere), replace the literal 3000 in the call to
self.remoting_client.invoke_request(Some(broker_addr), request, 3000).await?
with the new timeout_millis parameter, and ensure any call sites of
reset_master_flush_offset are updated to pass through a timeout value; keep the
existing behavior when callers supply the previous default by passing the
original 3000 where needed.
mxsm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WaterWhisperer Pls fix ci error and conflicts
08f8956 to
6c74778
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/reset_master_flush_offset_sub_command.rs`:
- Around line 47-62: Validation for broker_addr and offset must occur before
starting the MQAdminExt to avoid leaking the started client; move the
broker_addr and offset extraction/validation (the calls using self.broker_addr
and self.offset) above the MQAdminExt::start(&mut default_mqadmin_ext).await
call, or alternatively introduce a scope/guard that will always call
MQAdminExt::shutdown(&mut default_mqadmin_ext).await on early returns; ensure
references to default_mqadmin_ext, MQAdminExt::start and MQAdminExt::shutdown
are updated accordingly so shutdown is guaranteed if start succeeds.
🧹 Nitpick comments (1)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/reset_master_flush_offset_sub_command.rs (1)
29-33: CLI args arerequired = falsebut both are mandatory at runtime — considerrequired = true.Both
broker_addrandoffsetare validated as required insideexecute(lines 54–62), but the clap attributes sayrequired = false. Settingrequired = truein the clap attribute gives the user an immediate CLI error with usage help rather than a less descriptive runtime error.♻️ Proposed fix
- #[arg(short = 'b', long = "brokerAddr", required = false, help = "which broker to reset")] - broker_addr: Option<String>, + #[arg(short = 'b', long = "brokerAddr", required = true, help = "which broker to reset")] + broker_addr: Option<String>, - #[arg(short = 'o', long = "offset", required = false, help = "the offset to reset at")] - offset: Option<i64>, + #[arg(short = 'o', long = "offset", required = true, help = "the offset to reset at")] + offset: Option<i64>,Or, even simpler, remove
Optionandrequiredentirely and use non-optional types, letting clap enforce presence:- #[arg(short = 'b', long = "brokerAddr", required = false, help = "which broker to reset")] - broker_addr: Option<String>, - - #[arg(short = 'o', long = "offset", required = false, help = "the offset to reset at")] - offset: Option<i64>, + #[arg(short = 'b', long = "brokerAddr", help = "which broker to reset")] + broker_addr: String, + + #[arg(short = 'o', long = "offset", help = "the offset to reset at")] + offset: i64,
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes