[Issue #6416] Feature🚀 Implement ResetOffsetByTime Command for Time-Based Offset#6505
[Issue #6416] Feature🚀 Implement ResetOffsetByTime Command for Time-Based Offset#6505
Conversation
…me-Based Offset
|
🔊@MasterOogway1466 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughIntroduces a new offset command group for RocketMQ admin core. Adds a ResetOffsetByTime subcommand that resets consumer group offsets to a specified timestamp, with fallback support for legacy API when the new method is unavailable. Implements command dispatch routing, timestamp parsing, and broker-level offset reset operations. Changes
Sequence DiagramsequenceDiagram
participant User as User/CLI
participant Dispatch as Command Dispatcher
participant OffsetCmd as OffsetCommands
participant ResetCmd as ResetOffsetByTimeSubCommand
participant Admin as DefaultMQAdminExt
participant Broker as Broker API
User->>Dispatch: Execute offset command
Dispatch->>OffsetCmd: Route to ResetOffsetByTime
OffsetCmd->>ResetCmd: Dispatch via CommandExecute
ResetCmd->>ResetCmd: Parse timestamp & validate inputs
ResetCmd->>Admin: Start admin client
ResetCmd->>Admin: Call resetOffsetByTimestamp (new API)
Admin->>Broker: Reset offset by timestamp
alt API Supported
Broker-->>Admin: Success with per-queue results
Admin-->>ResetCmd: Return offset mappings
ResetCmd->>User: Print detailed per-queue summary
else API Unsupported/Fallback
Broker-->>Admin: Failure or offline consumers
ResetCmd->>Admin: Call resetOffsetByTimestampOld (legacy)
Admin->>Broker: Reset offset (legacy, requires restart)
Broker-->>Admin: Success with offset table
Admin-->>ResetCmd: Return legacy offset data
ResetCmd->>User: Print legacy fallback table
end
ResetCmd->>Admin: Shutdown admin client
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs`:
- Around line 219-253: Before calling Self::reset_topic_level in
ResetOffsetByTimeSubCommand::execute, add explicit existence checks for the
consumer group and topic using the admin client (DefaultMQAdminExt) after
admin.start() succeeds; call the appropriate admin query methods to verify the
group exists (returning RocketMQError::IllegalArgument with a clear message if
missing) and to verify the topic exists (similarly returning an error if
missing), and only then proceed to call reset_topic_level; ensure any admin call
failures translate to RocketMQError::Internal with contextual text referencing
ResetOffsetByTimeSubCommand.
- Around line 127-163: The current Err(e) arm always falls back to
Self::reset_topic_level_old which can hide real errors; change the Err handling
so you inspect the error returned by admin.reset_offset_by_timestamp (e.g., via
error kind, status code, or pattern match on the error type) and only call
Self::reset_topic_level_old(admin, group, topic, timestamp).await for
broker-offline/unsupported/legacy-capability errors, while returning Err(e) (or
propagating the original error) for other cases like invalid timestamp, auth
failures, or topic/group not found; keep references to the existing symbols
reset_offset_by_timestamp and reset_topic_level_old and use the error’s
discriminants (e.g., is_offline(), is_unsupported(), specific variant names) to
decide whether to fallback.
- Around line 83-105: The ResetOffsetByTimeSubCommand struct only defines group,
topic and timestamp but must also expose the additional CLI flags and modes
required by the change: add boolean/flag fields for force (e.g., force: bool),
broker (broker: Option<String>), queue (queue: Option<u32>), offset (offset:
Option<i64>), and LMQ mode flags cluster and cplus (e.g., cluster: bool, cplus:
bool), plus an enum or flags to select queue-level vs exact-offset reset modes;
update ResetOffsetByTimeSubCommand to include these new fields with appropriate
#[arg(...)] annotations and required/optional settings, then propagate those new
members into the command handling code that performs the reset (where
ResetOffsetByTimeSubCommand is consumed) to implement broker-scoped,
queue-scoped, exact-offset, force, and LMQ cluster/cplus flows. Ensure help text
matches the new options and that parsing and downstream logic use the new fields
to drive the different reset flows.
- Around line 31-65: Update TIMESTAMP_FORMAT to use "%.3f" (matching
format_timestamp) and adjust the documented example accordingly; in
parse_timestamp (function parse_timestamp) change the NaiveDateTime parsing
format constant to the new TIMESTAMP_FORMAT and after computing millis compare
against current time (get_current_millis()) to reject future timestamps by
returning a RocketMQError::IllegalArgument when millis > now; also update the
final error message text to reflect the corrected documented format
"yyyy-MM-dd#HH:mm:ss.SSS" so it matches the parser/formatter.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs
| /// Timestamp format used by the Java reference implementation. | ||
| const TIMESTAMP_FORMAT: &str = "%Y-%m-%d#%H:%M:%S:%3f"; | ||
|
|
||
| /// Parse a timestamp string in one of the three supported forms: | ||
| /// - `"now"` → current system time in milliseconds | ||
| /// - a plain decimal integer → milliseconds since epoch | ||
| /// - `"yyyy-MM-dd#HH:mm:ss:SSS"` formatted string | ||
| fn parse_timestamp(s: &str) -> RocketMQResult<u64> { | ||
| let s = s.trim(); | ||
| if s.eq_ignore_ascii_case("now") { | ||
| return Ok(get_current_millis()); | ||
| } | ||
| // Try parsing as plain milliseconds integer first. | ||
| if let Ok(ms) = s.parse::<u64>() { | ||
| return Ok(ms); | ||
| } | ||
| // Try parsing as formatted datetime string. | ||
| if let Ok(ndt) = NaiveDateTime::parse_from_str(s, TIMESTAMP_FORMAT) { | ||
| let millis = Local | ||
| .from_local_datetime(&ndt) | ||
| .single() | ||
| .ok_or_else(|| RocketMQError::IllegalArgument(format!("Ambiguous local datetime: {s}")))? | ||
| .timestamp_millis(); | ||
| if millis < 0 { | ||
| return Err(RocketMQError::IllegalArgument(format!( | ||
| "Parsed timestamp is negative (before epoch): {millis}" | ||
| ))); | ||
| } | ||
| return Ok(millis as u64); | ||
| } | ||
| Err(RocketMQError::IllegalArgument(format!( | ||
| "Cannot parse timestamp '{}'. Supported formats: 'now', milliseconds (integer), 'yyyy-MM-dd#HH:mm:ss:SSS'", | ||
| s | ||
| ))) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
chrono Rust format specifier milliseconds %3f %.3f difference yyyy-MM-dd HH:mm:ss SSS
💡 Result:
In Rust chrono’s strftime syntax, the two millisecond-ish specifiers differ only in whether they include the decimal point:
%.3f→ fractional seconds with a leading dot, fixed 3 digits (e.g..026) (docs.rs)%3f→ fractional seconds without the dot, fixed 3 digits (e.g.026) (docs.rs)
So the Java-style pattern:
yyyy-MM-dd HH:mm:ss SSS
is equivalent in chrono to either of these:
"%Y-%m-%d %H:%M:%S%.3f"(the dot comes from%.3f) (docs.rs)"%Y-%m-%d %H:%M:%S.%3f"(you type the dot, then%3f) (docs.rs)
Notes:
- In
chrono,%fis 9 digits (nanoseconds);%3f/%6f/%9fcontrol width, and%.3f/%.6f/%.9finclude the dot. (docs.rs)
Citations:
- 1: https://docs.rs/chrono/latest/chrono/format/strftime/index.html?utm_source=openai
- 2: https://docs.rs/chrono/latest/chrono/format/strftime/index.html?utm_source=openai
- 3: https://docs.rs/chrono/latest/chrono/format/strftime/index.html?utm_source=openai
- 4: https://docs.rs/chrono/latest/chrono/format/strftime/index.html?utm_source=openai
- 5: https://docs.rs/chrono/latest/chrono/format/strftime/index.html?utm_source=openai
🏁 Script executed:
find . -name "reset_offset_by_time_sub_command.rs" -type f | head -5Repository: mxsm/rocketmq-rust
Length of output: 169
🏁 Script executed:
cat -n "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs" | head -100Repository: mxsm/rocketmq-rust
Length of output: 4535
🏁 Script executed:
cat -n "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs" | sed -n '100,200p'Repository: mxsm/rocketmq-rust
Length of output: 4534
🏁 Script executed:
cat -n "rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs" | sed -n '200,250p'Repository: mxsm/rocketmq-rust
Length of output: 2177
🏁 Script executed:
rg -i "6416\|future.*timestamp\|reject.*future" --max-count=10Repository: mxsm/rocketmq-rust
Length of output: 44
Add future timestamp validation and fix millisecond format inconsistency.
The parse_timestamp function lacks validation to reject future timestamps. Additionally, there's a format mismatch: the parse function uses %3f (expecting input like 14:30:45:123 with no dot), while format_timestamp outputs %.3f (producing 14:30:45.123 with a dot). These should be consistent. Use %.3f in both and update the documented format string and example accordingly.
✅ Proposed fix
/// Timestamp format used by the Java reference implementation.
-const TIMESTAMP_FORMAT: &str = "%Y-%m-%d#%H:%M:%S:%3f";
+const TIMESTAMP_FORMAT: &str = "%Y-%m-%d#%H:%M:%S%.3f";
/// Parse a timestamp string in one of the three supported forms:
/// - `"now"` → current system time in milliseconds
/// - a plain decimal integer → milliseconds since epoch
-/// - `"yyyy-MM-dd#HH:mm:ss:SSS"` formatted string
+/// - `"yyyy-MM-dd#HH:mm:ss.SSS"` formatted string (e.g. `2024-02-19#10:00:00.000`)
fn parse_timestamp(s: &str) -> RocketMQResult<u64> {
let s = s.trim();
- if s.eq_ignore_ascii_case("now") {
- return Ok(get_current_millis());
- }
- // Try parsing as plain milliseconds integer first.
- if let Ok(ms) = s.parse::<u64>() {
- return Ok(ms);
- }
- // Try parsing as formatted datetime string.
- if let Ok(ndt) = NaiveDateTime::parse_from_str(s, TIMESTAMP_FORMAT) {
- let millis = Local
+ let ms = if s.eq_ignore_ascii_case("now") {
+ get_current_millis()
+ } else if let Ok(ms) = s.parse::<u64>() {
+ ms
+ } else if let Ok(ndt) = NaiveDateTime::parse_from_str(s, TIMESTAMP_FORMAT) {
+ let millis = Local
.from_local_datetime(&ndt)
.single()
.ok_or_else(|| RocketMQError::IllegalArgument(format!("Ambiguous local datetime: {s}")))?
.timestamp_millis();
if millis < 0 {
return Err(RocketMQError::IllegalArgument(format!(
"Parsed timestamp is negative (before epoch): {millis}"
)));
}
- return Ok(millis as u64);
- }
- Err(RocketMQError::IllegalArgument(format!(
- "Cannot parse timestamp '{}'. Supported formats: 'now', milliseconds (integer), 'yyyy-MM-dd#HH:mm:ss:SSS'",
- s
- )))
+ millis as u64
+ } else {
+ return Err(RocketMQError::IllegalArgument(format!(
+ "Cannot parse timestamp '{}'. Supported formats: 'now', milliseconds (integer), 'yyyy-MM-dd#HH:mm:ss.SSS'",
+ s
+ )));
+ };
+
+ let now = get_current_millis();
+ if ms > now {
+ return Err(RocketMQError::IllegalArgument(format!(
+ "Timestamp {ms} is in the future (current time: {now})"
+ )));
+ }
+ Ok(ms)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs`
around lines 31 - 65, Update TIMESTAMP_FORMAT to use "%.3f" (matching
format_timestamp) and adjust the documented example accordingly; in
parse_timestamp (function parse_timestamp) change the NaiveDateTime parsing
format constant to the new TIMESTAMP_FORMAT and after computing millis compare
against current time (get_current_millis()) to reject future timestamps by
returning a RocketMQError::IllegalArgument when millis > now; also update the
final error message text to reflect the corrected documented format
"yyyy-MM-dd#HH:mm:ss.SSS" so it matches the parser/formatter.
| #[derive(Debug, Clone, Parser)] | ||
| pub struct ResetOffsetByTimeSubCommand { | ||
| #[command(flatten)] | ||
| common_args: CommonArgs, | ||
|
|
||
| /// Consumer group name. | ||
| #[arg(short = 'g', long = "group", required = true, help = "consumer group name")] | ||
| group: String, | ||
|
|
||
| /// Topic name. | ||
| #[arg(short = 't', long = "topic", required = true, help = "topic name")] | ||
| topic: String, | ||
|
|
||
| /// Target timestamp. Accepts: `now`, milliseconds since epoch, or | ||
| /// `yyyy-MM-dd#HH:mm:ss:SSS` (e.g. `2024-02-19#10:00:00:000`). | ||
| #[arg( | ||
| short = 's', | ||
| long = "timestamp", | ||
| required = true, | ||
| help = "target timestamp: 'now', milliseconds, or 'yyyy-MM-dd#HH:mm:ss:SSS'" | ||
| )] | ||
| timestamp: String, | ||
| } |
There was a problem hiding this comment.
CLI surface is missing required flags and reset modes from #6416.
The subcommand only exposes --group, --topic, and --timestamp, but the issue requires --force, --broker, --queue, --offset, and LMQ --cluster/--cplus, plus queue-level and exact-offset modes. As implemented, these flows are not possible.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs`
around lines 83 - 105, The ResetOffsetByTimeSubCommand struct only defines
group, topic and timestamp but must also expose the additional CLI flags and
modes required by the change: add boolean/flag fields for force (e.g., force:
bool), broker (broker: Option<String>), queue (queue: Option<u32>), offset
(offset: Option<i64>), and LMQ mode flags cluster and cplus (e.g., cluster:
bool, cplus: bool), plus an enum or flags to select queue-level vs exact-offset
reset modes; update ResetOffsetByTimeSubCommand to include these new fields with
appropriate #[arg(...)] annotations and required/optional settings, then
propagate those new members into the command handling code that performs the
reset (where ResetOffsetByTimeSubCommand is consumed) to implement
broker-scoped, queue-scoped, exact-offset, force, and LMQ cluster/cplus flows.
Ensure help text matches the new options and that parsing and downstream logic
use the new fields to drive the different reset flows.
| let result = admin | ||
| .reset_offset_by_timestamp(None, topic.into(), group.into(), timestamp, false) | ||
| .await; | ||
|
|
||
| match result { | ||
| Ok(offset_map) => { | ||
| let mut entries: Vec<_> = offset_map.iter().collect(); | ||
| entries.sort_by(|(a, _), (b, _)| { | ||
| a.broker_name() | ||
| .cmp(b.broker_name()) | ||
| .then_with(|| a.queue_id().cmp(&b.queue_id())) | ||
| }); | ||
|
|
||
| for (mq, new_offset) in &entries { | ||
| println!( | ||
| "Broker: {:<35} Queue: {:<6} New Offset: {}", | ||
| mq.broker_name(), | ||
| mq.queue_id(), | ||
| new_offset | ||
| ); | ||
| } | ||
|
|
||
| println!(); | ||
| println!("{}", "-".repeat(50)); | ||
| println!("Reset Summary:"); | ||
| println!(" Total Queues: {}", entries.len()); | ||
| println!(" All offsets reset to timestamp: {}", format_timestamp(timestamp)); | ||
| println!(); | ||
| println!("Note: Consumers will automatically resume from new offsets (no restart required)"); | ||
| Ok(()) | ||
| } | ||
| Err(e) => { | ||
| // Attempt the legacy fallback path. | ||
| eprintln!("New reset method failed ({e}). Trying legacy method…"); | ||
| Self::reset_topic_level_old(admin, group, topic, timestamp).await | ||
| } | ||
| } |
There was a problem hiding this comment.
Fallback should be limited to offline/unsupported errors.
The new method fallback triggers on any error, which can mask real failures (invalid timestamp, auth, topic/group not found) and still runs the legacy method. Limit fallback to the specific broker/offline/unsupported cases and surface other errors directly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs`
around lines 127 - 163, The current Err(e) arm always falls back to
Self::reset_topic_level_old which can hide real errors; change the Err handling
so you inspect the error returned by admin.reset_offset_by_timestamp (e.g., via
error kind, status code, or pattern match on the error type) and only call
Self::reset_topic_level_old(admin, group, topic, timestamp).await for
broker-offline/unsupported/legacy-capability errors, while returning Err(e) (or
propagating the original error) for other cases like invalid timestamp, auth
failures, or topic/group not found; keep references to the existing symbols
reset_offset_by_timestamp and reset_topic_level_old and use the error’s
discriminants (e.g., is_offline(), is_unsupported(), specific variant names) to
decide whether to fallback.
| impl CommandExecute for ResetOffsetByTimeSubCommand { | ||
| async fn execute(&self, _rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> { | ||
| // ── Validate arguments ─────────────────────────────────────────────── | ||
| let group = self.group.trim(); | ||
| if group.is_empty() { | ||
| return Err(RocketMQError::IllegalArgument( | ||
| "Consumer group name (--group / -g) cannot be empty".into(), | ||
| )); | ||
| } | ||
|
|
||
| let topic = self.topic.trim(); | ||
| if topic.is_empty() { | ||
| return Err(RocketMQError::IllegalArgument( | ||
| "Topic name (--topic / -t) cannot be empty".into(), | ||
| )); | ||
| } | ||
|
|
||
| let timestamp = parse_timestamp(&self.timestamp)?; | ||
|
|
||
| // ── Initialise admin client ─────────────────────────────────────────── | ||
| let mut admin = DefaultMQAdminExt::new(); | ||
| admin | ||
| .client_config_mut() | ||
| .set_instance_name(get_current_millis().to_string().into()); | ||
|
|
||
| if let Some(addr) = &self.common_args.namesrv_addr { | ||
| admin.set_namesrv_addr(addr.trim()); | ||
| } | ||
|
|
||
| admin.start().await.map_err(|e| { | ||
| RocketMQError::Internal(format!("ResetOffsetByTimeSubCommand: Failed to start MQAdminExt: {e}")) | ||
| })?; | ||
|
|
||
| let result = Self::reset_topic_level(&mut admin, group, topic, timestamp).await; | ||
|
|
There was a problem hiding this comment.
Validate consumer group and topic existence before resetting.
The issue requires validating that the consumer group and topic exist. The current flow proceeds without those checks, which can lead to confusing broker errors or unintended legacy fallback. Consider validating via admin calls before the reset.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/offset/reset_offset_by_time_sub_command.rs`
around lines 219 - 253, Before calling Self::reset_topic_level in
ResetOffsetByTimeSubCommand::execute, add explicit existence checks for the
consumer group and topic using the admin client (DefaultMQAdminExt) after
admin.start() succeeds; call the appropriate admin query methods to verify the
group exists (returning RocketMQError::IllegalArgument with a clear message if
missing) and to verify the topic exists (similarly returning an error if
missing), and only then proceed to call reset_topic_level; ensure any admin call
failures translate to RocketMQError::Internal with contextual text referencing
ResetOffsetByTimeSubCommand.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6505 +/- ##
==========================================
- Coverage 42.18% 42.17% -0.01%
==========================================
Files 942 946 +4
Lines 131785 132075 +290
==========================================
+ Hits 55595 55706 +111
- Misses 76190 76369 +179 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Brief Description
Implements the resetOffsetByTime command in the rocketmq-admin-core crate, inspired from the Java reference implementation.
How Did You Test This Change?
Code is building with no warnings from clippy and I can see the command from rocketmq-admin-cli-rust
Summary by CodeRabbit
Release Notes