Skip to content

feature: Implement QueryMsgById Command for Message ID-Based Lookup#6750

Merged
mxsm merged 3 commits intomxsm:mainfrom
onenewcode:fe_6407
Mar 15, 2026
Merged

feature: Implement QueryMsgById Command for Message ID-Based Lookup#6750
mxsm merged 3 commits intomxsm:mainfrom
onenewcode:fe_6407

Conversation

@onenewcode
Copy link
Contributor

@onenewcode onenewcode commented Mar 15, 2026

Which Issue(s) This PR Fixes(Closes)

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Query messages by ID end-to-end: CLI subcommand, admin API wiring and new client call to fetch a message by ID.
  • Improvements

    • Stronger message-ID validation and safer decoding with explicit error results and consistent IllegalArgument mappings.
    • Richer CLI output: timestamps, hosts, metadata, properties, charset-aware payload display, hex dump, per-query timing and not-found handling.
  • Tests

    • Added unit tests for ID validation, timestamp/formatting, and message-printing.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@onenewcode 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project. labels Mar 15, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 15, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Implements query-by-message-ID: adds message-id validation/decoding, a broker-targeted view_message API in the MQ client, completes admin-layer query_message plumbing, registers a CLI command, and adds a full queryMsgById subcommand with formatted output and tests.

Changes

Cohort / File(s) Summary
Message ID decoding & validation
rocketmq-common/src/common/message/message_decoder.rs
Made decode_message_id return Result<MessageId, String>; added validate_message_id; switched to hex-to-bytes decoding, construct SocketAddr, and return errors instead of panics.
MQ client: view message API
rocketmq-client/src/implementation/mq_client_api_impl.rs
Added pub async fn view_message(...) to send ViewMessageById, handle response codes, decode response body into MessageExt, and map errors.
Admin implementation: query_message
rocketmq-client/src/admin/default_mq_admin_ext_impl.rs, rocketmq-tools/rocketmq-admin/.../admin/default_mq_admin_ext.rs
Implemented query_message to get MQ client, validate/decode msg ID, derive broker address, build ViewMessageRequestHeader, and call view_message; admin façade delegates to the impl.
Producer error mapping
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
Map message ID decode errors to RocketMQError::IllegalArgument(...) in transaction and offset paths instead of propagating raw decode errors.
CLI wiring & commands table
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs, .../commands/message.rs
Registered queryMsgById in the classification table and added QueryMsgById enum variant with dispatch handling.
New subcommand: queryMsgById
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs
Added Clap-based QueryMsgByIdSubCommand: args (message-ids, topic, print-body, charset, timeout), ID validation, admin init, per-ID queries, detailed formatted output (metadata, timestamps, hosts, properties, optional body/hex), and unit tests.
Decode command update
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/decode_message_id_sub_command.rs
Replaced manual hex/length checks with validate_message_id; use decode_message_id(msg_id) Result handling and improved error messages.
Commands registration
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs
Added new "queryMsgById" entry to the command classification table.

Sequence Diagram

sequenceDiagram
    participant User
    participant CLI as QueryMsgByIdSubCommand
    participant Decoder as MessageDecoder
    participant Admin as DefaultMQAdminExt
    participant Impl as DefaultMQAdminExtImpl
    participant API as MQClientAPIImpl
    participant Broker

    User->>CLI: invoke queryMsgById (message_id, topic?)
    CLI->>Decoder: validate_message_id(msg_id)
    Decoder-->>CLI: Ok / Err
    CLI->>Admin: query_message(cluster?, topic, msg_id)
    Admin->>Impl: query_message(cluster?, topic, msg_id)
    Impl->>Decoder: decode_message_id(msg_id)
    Decoder-->>Impl: Ok(MessageId{address,offset}) / Err
    Impl->>Impl: build ViewMessageRequestHeader(topic, offset)
    Impl->>API: view_message(broker_addr, header, timeout)
    API->>Broker: send ViewMessageById request
    Broker-->>API: response (code, body/remark)
    API-->>Impl: MessageExt or Error
    Impl-->>Admin: MessageExt or Error
    Admin-->>CLI: MessageExt or Error
    CLI->>User: pretty-print result or error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 I nibbled hex and chased each byte,
I hopped to brokers through the night,
From CLI burrow to API glade,
The message found — no longer staid,
Hooray — a rabbit's byte-made light!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.32% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: implementing a QueryMsgById command for message ID-based message lookup, which is the primary objective of this PR.
Linked Issues check ✅ Passed The PR fully implements all coding requirements from issue #6407: query message by broker-assigned message ID, command parameters (--message-id, --topic, --print-body, --charset), message ID parsing and validation, direct broker lookup, comprehensive output formatting with metadata/properties/body, multiple charset support, error handling, and performance optimization.
Out of Scope Changes check ✅ Passed All changes are directly aligned with issue #6407 objectives: message decoder updates enable safe message ID decoding; MQClientAPIImpl and MQAdminExt changes support broker queries; default_mq_producer_impl error handling aligns with message ID validation; and new command/subcommand implementations deliver the full queryMsgById feature.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rocketmq-common/src/common/message/message_decoder.rs (1)

519-537: ⚠️ Potential issue | 🔴 Critical

Make decode_message_id fully fallible.

This function still skips its own validation and silently truncates the 32-bit port with as u16. A 34/36/... hex string will panic inside get_i32(), and an out-of-range port will decode to the wrong broker instead of returning Err. DefaultMQProducerImpl::end_transaction now treats this as a fallible API, so the decoder itself needs to trim, validate, and range-check before reading.

Suggested fix
 pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
+    let msg_id = msg_id.trim();
+    validate_message_id(msg_id)?;
+
     let bytes = util_all::string_to_bytes(msg_id)
         .ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
     let mut buffer = Bytes::from(bytes);
     let address = if msg_id.len() == 32 {
         let mut ip = [0u8; 4];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
-        SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
+        let port = u16::try_from(port).map_err(|_| format!("Invalid port in message ID: {}", port))?;
+        SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port)
     } else {
         let mut ip = [0u8; 16];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
-        SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
+        let port = u16::try_from(port).map_err(|_| format!("Invalid port in message ID: {}", port))?;
+        SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port)
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-common/src/common/message/message_decoder.rs` around lines 519 -
537, The decoder must be fully fallible: validate the decoded byte length and
ranges before reading so we never panic or silently truncate the port. In
decode_message_id, after util_all::string_to_bytes(msg_id) returns bytes, check
bytes.len() exactly equals the expected number of bytes for IPv4 (16 bytes = 4
ip + 4 port + 8 offset) or IPv6 (28 bytes = 16 ip + 4 port + 8 offset) and
return Err otherwise; ensure buffer.remaining() is sufficient before each read;
when reading the port with buffer.get_i32() treat it as a signed i32, verify it
lies in 0..=65535 and return Err if out of range (do not cast with as u16
blindly), then construct the SocketAddr using the validated port and finally
read the 8-byte offset only after confirming remaining >= 8; update error
messages to include context (e.g., "invalid hex length", "insufficient bytes for
port/offset", "port out of range") and return Err on any validation failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rocketmq-client/src/admin/default_mq_admin_ext_impl.rs`:
- Around line 1776-1777: The broker address must be built from the SocketAddr
instead of concatenating ip and port (which breaks IPv6); replace
CheetahString::from_string(format!("{}:{}", message_id.address.ip(),
message_id.address.port())) with
CheetahString::from_string(message_id.address.to_string() or format!("{}",
message_id.address)) so IPv6 addresses are properly bracketed, and make the same
change where the same "{ip}:{port}" pattern is used in the
query_msg_by_id_sub_command.rs code path that constructs broker addresses for
view_message.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs`:
- Around line 3280-3294: The view_message path currently calls
MessageDecoder::decode directly on response.get_body() under
ResponseCode::Success which can panic on truncated bodies (unchecked
get_i32/copy_to_slice); change this to a fallible decode or add explicit frame
validation before calling decode: either use a provided safe/fallible API (e.g.,
a MessageDecoder::decode_checked or similar) and propagate its Result, or
validate response.get_body() length and required header/frame fields (compare to
expected minimum header size and payload lengths) and return Err(...) when too
short so the ok_or_else runs instead of letting decode panic; update the
view_message handling to call the safe variant or perform checks around
MessageDecoder::decode and map failures to mq_client_err.

In `@rocketmq-common/src/common/message/message_decoder.rs`:
- Around line 505-510: The length check for message IDs in the msg_id validation
rejects valid IPv6 broker IDs; update the condition in message_decoder (the
branch that currently checks msg_id.len() != 32 && msg_id.len() != 40) to also
accept 56 (i.e., allow msg_id.len() == 56) so the IPv6 decode path (used by
DecodeMessageIdSubCommand) can parse 16-byte IP + 4-byte port + 8-byte offset;
after changing the check, add a regression test that supplies a 56-character hex
IPv6 broker message ID and verifies DecodeMessageIdSubCommand successfully
decodes it and that queryMsgById proceeds as expected.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`:
- Around line 203-217: The current branch only treats non-UTF-8 charsets as
UTF-8 then falls back to hex; change the logic around the charset variable
(charset_upper) so non-UTF-8 values actually decode the body: if charset_upper
is UTF-8/UTF8, keep the existing from_utf8 path; otherwise attempt to resolve
the requested encoding (e.g., via encoding_rs or the encoding crate using the
charset string), decode `body` with that encoder and print the decoded string on
success, and call `Self::print_hex_dump(&body)` or return an error when the
charset is unknown or decoding fails; reference the variables/functions
charset_upper, body, and Self::print_hex_dump to locate where to implement the
real decoder or the upfront rejection of unsupported charsets.
- Around line 273-289: The branch that constructs default_mqadmin_ext using
DefaultMQAdminExt::with_rpc_hook(rpc_hook) leaves inner uninitialized and will
panic in MQAdminExt::start; change the construction to ensure inner is
initialized consistently—e.g., always create the admin with
DefaultMQAdminExt::new() (which calls set_inner(...)) and then attach the RPC
hook via the setter or call set_inner(...) after using with_rpc_hook—so that
default_mqadmin_ext has its inner field initialized before calling
MQAdminExt::start; refer to DefaultMQAdminExt::with_rpc_hook,
DefaultMQAdminExt::new, set_inner, MQAdminExt::start and default_mqadmin_ext to
locate the code to update.

---

Outside diff comments:
In `@rocketmq-common/src/common/message/message_decoder.rs`:
- Around line 519-537: The decoder must be fully fallible: validate the decoded
byte length and ranges before reading so we never panic or silently truncate the
port. In decode_message_id, after util_all::string_to_bytes(msg_id) returns
bytes, check bytes.len() exactly equals the expected number of bytes for IPv4
(16 bytes = 4 ip + 4 port + 8 offset) or IPv6 (28 bytes = 16 ip + 4 port + 8
offset) and return Err otherwise; ensure buffer.remaining() is sufficient before
each read; when reading the port with buffer.get_i32() treat it as a signed i32,
verify it lies in 0..=65535 and return Err if out of range (do not cast with as
u16 blindly), then construct the SocketAddr using the validated port and finally
read the 8-byte offset only after confirming remaining >= 8; update error
messages to include context (e.g., "invalid hex length", "insufficient bytes for
port/offset", "port out of range") and return Err on any validation failure.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b47dce14-9ec6-4f4e-a4b0-6f491afedc48

📥 Commits

Reviewing files that changed from the base of the PR and between 4ad5ab7 and 75308e2.

📒 Files selected for processing (9)
  • rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
  • rocketmq-client/src/implementation/mq_client_api_impl.rs
  • rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
  • rocketmq-common/src/common/message/message_decoder.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/decode_message_id_sub_command.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs

Comment on lines +3280 to +3294
match ResponseCode::from(response.code()) {
ResponseCode::Success => {
if let Some(body) = response.get_body() {
let mut bytes = body.clone();
let body_len = bytes.len();
MessageDecoder::decode(&mut bytes, true, true, false, false, false).ok_or_else(|| {
mq_client_err!(format!(
"Failed to decode message from view_message response body: body_len={}, possible causes: \
CRC check failed or malformed message data",
body_len
))
})
} else {
Err(mq_client_err!("view_message response body is empty".to_string()))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid the panicking decoder on broker responses.

MessageDecoder::decode still performs unchecked get_i32 / copy_to_slice reads. If the broker returns Success with an empty or truncated body, this new path panics before the ok_or_else runs. Please route view_message through a fallible decoder or add explicit frame validation so malformed responses return Err instead of crashing the client.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs` around lines 3280 -
3294, The view_message path currently calls MessageDecoder::decode directly on
response.get_body() under ResponseCode::Success which can panic on truncated
bodies (unchecked get_i32/copy_to_slice); change this to a fallible decode or
add explicit frame validation before calling decode: either use a provided
safe/fallible API (e.g., a MessageDecoder::decode_checked or similar) and
propagate its Result, or validate response.get_body() length and required
header/frame fields (compare to expected minimum header size and payload
lengths) and return Err(...) when too short so the ok_or_else runs instead of
letting decode panic; update the view_message handling to call the safe variant
or perform checks around MessageDecoder::decode and map failures to
mq_client_err.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs (1)

135-150: ⚠️ Potential issue | 🔴 Critical

with_rpc_hook does not initialize inner, causing panics.

Unlike new() (lines 98-116) which calls set_inner(inner), with_rpc_hook leaves default_mqadmin_ext_impl.inner as None. When QueryMsgByIdSubCommand uses this constructor path, MQAdminExt::start will panic. Apply the same set_inner pattern here.

🔧 Proposed fix
     pub fn with_rpc_hook(rpc_hook: Arc<dyn RPCHook>) -> Self {
         let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
         let client_config = ArcMut::new(ClientConfig::new());
+        let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
+            Some(rpc_hook),
+            Duration::from_millis(5000),
+            client_config.clone(),
+            admin_ext_group.clone(),
+        ));
+        let inner = default_mqadmin_ext_impl.clone();
+        default_mqadmin_ext_impl.set_inner(inner);
         Self {
-            client_config: client_config.clone(),
-            default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
-                Some(rpc_hook),
-                Duration::from_millis(5000),
-                client_config,
-                admin_ext_group.clone(),
-            )),
+            client_config,
+            default_mqadmin_ext_impl,
             admin_ext_group,
             create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
             timeout_millis: Duration::from_millis(5000),
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs`
around lines 135 - 150, with_rpc_hook currently constructs
default_mqadmin_ext_impl via DefaultMQAdminExtImpl::new but does not call
set_inner(...) so default_mqadmin_ext_impl.inner remains None and later
MQAdminExt::start (e.g. when invoked by QueryMsgByIdSubCommand) panics; fix by
mirroring new()’s pattern: after creating the ArcMut<DefaultMQAdminExtImpl>
assign it to a local variable, create the inner value the same way new() does,
call set_inner(inner) on that ArcMut (default_mqadmin_ext_impl) before returning
the Self from with_rpc_hook so inner is initialized.
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs (1)

100-108: ⚠️ Potential issue | 🔴 Critical

Missing match arm for QueryMsgById causes build failure.

The pipeline error indicates non-exhaustive patterns: &commands::message::MessageCommands::QueryMsgById(_) not covered. Add the missing dispatch arm.

🔧 Proposed fix
 impl CommandExecute for MessageCommands {
     async fn execute(&self, rpc_hook: Option<Arc<dyn RPCHook>>) -> RocketMQResult<()> {
         match self {
             MessageCommands::CheckMsgSendRT(value) => value.execute(rpc_hook).await,
             MessageCommands::DecodeMessageId(value) => value.execute(rpc_hook).await,
             MessageCommands::DumpCompactionLog(value) => value.execute(rpc_hook).await,
             MessageCommands::PrintMessage(value) => value.execute(rpc_hook).await,
             MessageCommands::PrintMsgByQueue(value) => value.execute(rpc_hook).await,
+            MessageCommands::QueryMsgById(value) => value.execute(rpc_hook).await,
             MessageCommands::QueryMsgByKey(value) => value.execute(rpc_hook).await,
             MessageCommands::SendMessage(value) => value.execute(rpc_hook).await,
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs`
around lines 100 - 108, The match over the MessageCommands enum is missing the
QueryMsgById variant, causing a non-exhaustive pattern error; update the match
(where MessageCommands is being matched and each variant calls
.execute(rpc_hook).await) to include the missing arm for
MessageCommands::QueryMsgById(value) and dispatch it the same way as the other
variants (i.e., call value.execute(rpc_hook).await) so the match becomes
exhaustive.
♻️ Duplicate comments (6)
rocketmq-common/src/common/message/message_decoder.rs (2)

914-919: ⚠️ Potential issue | 🟡 Minor

Add an IPv6 decode regression test.

Line 916 updates IPv4 usage to Result, but there’s still no IPv6 test to lock the 56-char path.

🧪 Suggested test addition
 #[test]
 fn decode_message_id_ipv4() {
     let msg_id = "7F0000010007D8260BF075769D36C348";
     let message_id = decode_message_id(msg_id).unwrap();
     assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
     assert_eq!(message_id.offset, 860316681131967304);
 }

+#[test]
+fn decode_message_id_ipv6() {
+    let msg_id = "0000000000000000000000000000000100002A9F0000000000000001";
+    let message_id = decode_message_id(msg_id).unwrap();
+    assert_eq!(message_id.address, "[::1]:10911".parse().unwrap());
+    assert_eq!(message_id.offset, 1);
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-common/src/common/message/message_decoder.rs` around lines 914 -
919, Add a new regression test mirroring decode_message_id_ipv4 that verifies
IPv6 decoding for the 56-character path: create a test function (e.g.,
decode_message_id_ipv6) that calls decode_message_id with a 56-char IPv6
message-id string, unwraps the Result, and assert_eq! the decoded
message_id.address against the expected IPv6 SocketAddr (use
".parse().unwrap()") and assert_eq! the decoded message_id.offset to the
expected offset value; follow the existing test style in message_decoder.rs and
use the Result/unwrap pattern updated on line 916.

505-537: ⚠️ Potential issue | 🔴 Critical

Fix IPv6 message-id length handling (32/56), and validate before decode.

Line 505 currently accepts 40 chars for IPv6, but IPv6 offset message IDs are 56 hex chars. This rejects valid IDs and leaves decode behavior inconsistent. Please gate decode with validate_message_id and switch IPv6 length to 56.

🔧 Proposed fix
 pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
     let msg_id = msg_id.trim();
@@
-    if msg_id.len() != 32 && msg_id.len() != 40 {
+    if msg_id.len() != 32 && msg_id.len() != 56 {
         return Err(format!(
-            "Invalid message ID length: {}. Expected 32 characters (IPv4) or 40 characters (IPv6)",
+            "Invalid message ID length: {}. Expected 32 characters (IPv4) or 56 characters (IPv6)",
             msg_id.len()
         ));
     }
@@
 pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
-    let bytes = util_all::string_to_bytes(msg_id)
+    let msg_id = msg_id.trim();
+    validate_message_id(msg_id)?;
+    let bytes = util_all::string_to_bytes(msg_id)
         .ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-common/src/common/message/message_decoder.rs` around lines 505 -
537, Update message ID validation and decoding so they agree: change the IPv6
expected length from 40 to 56 in validate_message_id (the earlier length check
that currently compares msg_id.len() != 32 && msg_id.len() != 40) and then
ensure decode_message_id first calls validate_message_id(msg_id) (propagating
any error) before attempting to hex-decode; inside decode_message_id keep the
current byte parsing logic but branch on msg_id.len() == 32 (IPv4) vs
msg_id.len() == 56 (IPv6) so the 56-char IPv6 IDs are accepted and parsed
consistently (references: validate_message_id, decode_message_id, MessageId,
util_all::string_to_bytes, Bytes).
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

3316-3326: ⚠️ Potential issue | 🔴 Critical

Guard view_message decode against malformed frames/panic.

Line 3320 still uses the panicking decode path directly. A truncated or malformed broker body can crash here before your ok_or_else runs.

🛡️ Proposed hardening
             ResponseCode::Success => {
                 if let Some(body) = response.get_body() {
+                    if body.len() < 4 {
+                        return Err(mq_client_err!("view_message response body is too short".to_string()));
+                    }
                     let mut bytes = body.clone();
                     let body_len = bytes.len();
-                    MessageDecoder::decode(&mut bytes, true, true, false, false, false).ok_or_else(|| {
-                        mq_client_err!(format!(
-                            "Failed to decode message from view_message response body: body_len={}, possible causes: \
-                             CRC check failed or malformed message data",
-                            body_len
-                        ))
-                    })
+                    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+                        MessageDecoder::decode(&mut bytes, true, true, false, false, false)
+                    })) {
+                        Ok(Some(msg)) => Ok(msg),
+                        Ok(None) => Err(mq_client_err!(format!(
+                            "Failed to decode message from view_message response body: body_len={}, malformed message data",
+                            body_len
+                        ))),
+                        Err(_) => Err(mq_client_err!(format!(
+                            "view_message response body is malformed (decoder panic), body_len={}",
+                            body_len
+                        ))),
+                    }
                 } else {
                     Err(mq_client_err!("view_message response body is empty".to_string()))
                 }
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs` around lines 3316 -
3326, The MessageDecoder::decode is being called directly in the view_message
response path and can panic on malformed/truncated broker frames, so wrap the
decode call in std::panic::catch_unwind (with std::panic::AssertUnwindSafe) and
map any panic or Err into the existing mq_client_err result; specifically, in
the block handling ResponseCode::Success when response.get_body() yields bytes,
run catch_unwind(|| MessageDecoder::decode(...)) and on Err (panic) or on decode
returning Err produce the mq_client_err with the body_len and the existing
diagnostic message instead of allowing a panic to unwind the thread.
rocketmq-client/src/admin/default_mq_admin_ext_impl.rs (1)

1842-1843: ⚠️ Potential issue | 🟠 Major

Handle IPv6 when rebuilding broker_addr.

The format "{ip}:{port}" produces an invalid broker address for IPv6 (e.g., 2001:db8::1:10911 instead of [2001:db8::1]:10911). Use message_id.address.to_string() directly, which properly brackets IPv6 addresses.

🔧 Proposed fix
-        let broker_addr =
-            CheetahString::from_string(format!("{}:{}", message_id.address.ip(), message_id.address.port()));
+        let broker_addr = CheetahString::from_string(message_id.address.to_string());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/admin/default_mq_admin_ext_impl.rs` around lines 1842 -
1843, The broker_addr construction currently builds a string with
format!("{}:{}", message_id.address.ip(), message_id.address.port()) which
breaks IPv6 formatting; replace that formatting and construct broker_addr from
the address' canonical string (use message_id.address.to_string()) and pass that
into CheetahString::from_string so IPv6 addresses are properly bracketed; update
the assignment to the broker_addr variable (the line creating broker_addr)
accordingly.
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs (2)

203-217: ⚠️ Potential issue | 🟠 Major

--charset is currently cosmetic.

The non-UTF-8 branch still decodes as UTF-8 and falls back to hex dump, ignoring the specified charset. Either implement real charset decoding (e.g., using encoding_rs) or reject unsupported charsets upfront.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
around lines 203 - 217, The charset handling currently ignores the requested
encoding by always attempting UTF-8; update the branch using charset_upper to
actually decode with the requested charset (or reject unsupported ones). Replace
the second branch that re-calls std::str::from_utf8 with real decoding via a
library like encoding_rs: map the charset label (e.g., via Encoding::for_label)
and decode the body bytes to a string, printing the decoded text if successful
or calling Self::print_hex_dump(&body) on unrecoverable errors; alternatively,
if you prefer not to add a dependency, detect unsupported charset names
(charset_upper) up front and return an error message rather than silently
falling back to UTF-8.

273-277: ⚠️ Potential issue | 🔴 Critical

with_rpc_hook path leaves inner uninitialized and will panic.

The DefaultMQAdminExt::with_rpc_hook(rpc_hook) constructor does not call set_inner(...), unlike DefaultMQAdminExt::new(). This causes MQAdminExt::start to panic when unwrapping self.inner. Use DefaultMQAdminExt::new() and then attach the RPC hook, or ensure with_rpc_hook initializes inner.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
around lines 273 - 277, The code currently calls
DefaultMQAdminExt::with_rpc_hook(rpc_hook) which does not initialize the
internal inner field and causes MQAdminExt::start to panic; replace this by
creating the admin ext with DefaultMQAdminExt::new() and then attach the RPC
hook on the resulting instance (e.g., create mut default_mqadmin_ext =
DefaultMQAdminExt::new() and then call the appropriate setter method to register
rpc_hook on default_mqadmin_ext) so that inner is initialized before start is
called.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`:
- Line 305: The broker address is built incorrectly for IPv6 by using
format!("{}:{}", msg.store_host().ip(), msg.store_host().port()); replace that
construction with the store_host's string representation (use
msg.store_host().to_string()) when setting broker_addr so IPv6 addresses are
preserved correctly (update the assignment that defines broker_addr using
msg.store_host()).
- Line 1: Update the file header comment in query_msg_by_id_sub_command.rs to
use "Copyright 2023" instead of "Copyright 2026"; locate the top-of-file
module/file header comment and change the year accordingly so it matches the
repository's enforced copyright year.

---

Outside diff comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs`:
- Around line 135-150: with_rpc_hook currently constructs
default_mqadmin_ext_impl via DefaultMQAdminExtImpl::new but does not call
set_inner(...) so default_mqadmin_ext_impl.inner remains None and later
MQAdminExt::start (e.g. when invoked by QueryMsgByIdSubCommand) panics; fix by
mirroring new()’s pattern: after creating the ArcMut<DefaultMQAdminExtImpl>
assign it to a local variable, create the inner value the same way new() does,
call set_inner(inner) on that ArcMut (default_mqadmin_ext_impl) before returning
the Self from with_rpc_hook so inner is initialized.

In `@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs`:
- Around line 100-108: The match over the MessageCommands enum is missing the
QueryMsgById variant, causing a non-exhaustive pattern error; update the match
(where MessageCommands is being matched and each variant calls
.execute(rpc_hook).await) to include the missing arm for
MessageCommands::QueryMsgById(value) and dispatch it the same way as the other
variants (i.e., call value.execute(rpc_hook).await) so the match becomes
exhaustive.

---

Duplicate comments:
In `@rocketmq-client/src/admin/default_mq_admin_ext_impl.rs`:
- Around line 1842-1843: The broker_addr construction currently builds a string
with format!("{}:{}", message_id.address.ip(), message_id.address.port()) which
breaks IPv6 formatting; replace that formatting and construct broker_addr from
the address' canonical string (use message_id.address.to_string()) and pass that
into CheetahString::from_string so IPv6 addresses are properly bracketed; update
the assignment to the broker_addr variable (the line creating broker_addr)
accordingly.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs`:
- Around line 3316-3326: The MessageDecoder::decode is being called directly in
the view_message response path and can panic on malformed/truncated broker
frames, so wrap the decode call in std::panic::catch_unwind (with
std::panic::AssertUnwindSafe) and map any panic or Err into the existing
mq_client_err result; specifically, in the block handling ResponseCode::Success
when response.get_body() yields bytes, run catch_unwind(||
MessageDecoder::decode(...)) and on Err (panic) or on decode returning Err
produce the mq_client_err with the body_len and the existing diagnostic message
instead of allowing a panic to unwind the thread.

In `@rocketmq-common/src/common/message/message_decoder.rs`:
- Around line 914-919: Add a new regression test mirroring
decode_message_id_ipv4 that verifies IPv6 decoding for the 56-character path:
create a test function (e.g., decode_message_id_ipv6) that calls
decode_message_id with a 56-char IPv6 message-id string, unwraps the Result, and
assert_eq! the decoded message_id.address against the expected IPv6 SocketAddr
(use ".parse().unwrap()") and assert_eq! the decoded message_id.offset to the
expected offset value; follow the existing test style in message_decoder.rs and
use the Result/unwrap pattern updated on line 916.
- Around line 505-537: Update message ID validation and decoding so they agree:
change the IPv6 expected length from 40 to 56 in validate_message_id (the
earlier length check that currently compares msg_id.len() != 32 && msg_id.len()
!= 40) and then ensure decode_message_id first calls validate_message_id(msg_id)
(propagating any error) before attempting to hex-decode; inside
decode_message_id keep the current byte parsing logic but branch on msg_id.len()
== 32 (IPv4) vs msg_id.len() == 56 (IPv6) so the 56-char IPv6 IDs are accepted
and parsed consistently (references: validate_message_id, decode_message_id,
MessageId, util_all::string_to_bytes, Bytes).

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`:
- Around line 203-217: The charset handling currently ignores the requested
encoding by always attempting UTF-8; update the branch using charset_upper to
actually decode with the requested charset (or reject unsupported ones). Replace
the second branch that re-calls std::str::from_utf8 with real decoding via a
library like encoding_rs: map the charset label (e.g., via Encoding::for_label)
and decode the body bytes to a string, printing the decoded text if successful
or calling Self::print_hex_dump(&body) on unrecoverable errors; alternatively,
if you prefer not to add a dependency, detect unsupported charset names
(charset_upper) up front and return an error message rather than silently
falling back to UTF-8.
- Around line 273-277: The code currently calls
DefaultMQAdminExt::with_rpc_hook(rpc_hook) which does not initialize the
internal inner field and causes MQAdminExt::start to panic; replace this by
creating the admin ext with DefaultMQAdminExt::new() and then attach the RPC
hook on the resulting instance (e.g., create mut default_mqadmin_ext =
DefaultMQAdminExt::new() and then call the appropriate setter method to register
rpc_hook on default_mqadmin_ext) so that inner is initialized before start is
called.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 14eb9913-ed6f-4d89-946c-23c6c839f330

📥 Commits

Reviewing files that changed from the base of the PR and between 75308e2 and af2b151.

📒 Files selected for processing (9)
  • rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
  • rocketmq-client/src/implementation/mq_client_api_impl.rs
  • rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
  • rocketmq-common/src/common/message/message_decoder.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/decode_message_id_sub_command.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (4)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs (4)

1-1: ⚠️ Potential issue | 🟡 Minor

Use the repository’s standard copyright year.

This repo expects new Rust files to use Copyright 2023, not the current calendar year. Based on learnings, the rocketmq-rust repository enforces using "Copyright 2023" as the year in Rust source files.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
at line 1, Update the top-of-file copyright header to use the repository
standard year "Copyright 2023" instead of the current calendar year; modify the
file-level comment at the beginning of query_msg_by_id_sub_command.rs so the
copyright line reads "Copyright 2023 The RocketMQ Rust Authors".

365-365: ⚠️ Potential issue | 🟠 Major

Broker address formatting still breaks on IPv6.

format!("{}:{}", msg.store_host().ip(), msg.store_host().port()) drops the required brackets for IPv6, so successful IPv6 lookups print an invalid broker address. Use the socket address’s Display implementation instead.

Suggested fix
-                    let broker_addr = format!("{}:{}", msg.store_host().ip(), msg.store_host().port());
+                    let broker_addr = msg.store_host().to_string();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
at line 365, The broker address construction currently uses format!("{}:{}",
msg.store_host().ip(), msg.store_host().port()) which produces invalid addresses
for IPv6; update the code that sets broker_addr (the line constructing
broker_addr from msg.store_host()) to use the socket-address's Display
implementation instead (e.g., format!("{}", msg.store_host()) or
msg.store_host().to_string()) so IPv6 addresses are correctly bracketed and
printed.

331-335: ⚠️ Potential issue | 🔴 Critical

The RPC-hook constructor path can still panic in start().

This branch still uses DefaultMQAdminExt::with_rpc_hook(rpc_hook), but that constructor does not initialize inner the same way DefaultMQAdminExt::new() does. MQAdminExt::start later unwraps inner, so any invocation with an RPC hook can crash before the query runs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
around lines 331 - 335, The RPC-hook branch uses
DefaultMQAdminExt::with_rpc_hook(rpc_hook) which leaves inner uninitialized and
causes MQAdminExt::start to unwrap None; instead construct with
DefaultMQAdminExt::new() and then attach the rpc_hook (e.g., via a setter or by
assigning the rpc_hook field) so inner is initialized the same way as the new()
path before calling start; update the code around
DefaultMQAdminExt::with_rpc_hook, DefaultMQAdminExt::new, the rpc_hook variable
and ensure inner is set prior to MQAdminExt::start to avoid the panic.

63-68: ⚠️ Potential issue | 🟠 Major

--charset still accepts values the decoder cannot handle.

The help text advertises encodings like GBK, but anything outside UTF-8/ASCII/ISO-8859-1 falls through to from_utf8(). That means an unsupported or mistyped charset can silently “work” on UTF-8 payloads instead of being rejected up front.

Also applies to: 207-236

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`
around lines 63 - 68, The CLI currently accepts any --charset string (the
charset field on QueryMsgByIdSubCommand) but decoding falls back to utf8 via
from_utf8() which silently accepts unsupported names; change the flow to
validate and use a real decoder: replace the naïve from_utf8() usage in the
message body decoding logic (e.g., the function/method that reads/decodes the
message body where charset is used) with a lookup against a supported encodings
map (or use a library like encoding_rs/encoding) that recognizes common names
(UTF-8, GBK, ISO-8859-1, etc.), attempt to decode with the chosen decoder, and
return an error (or exit with a clear message) when the provided --charset is
not supported or decoding fails instead of silently falling back to UTF-8;
update any other decoding sites mentioned (the other block around the code
referenced) to use the same validated/explicit decoder path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`:
- Around line 71-77: The timeout flag (timeout: u64) is parsed but never applied
— thread self.timeout into the async call path: either pass it into the admin
client configuration when constructing the client (e.g., when calling
AdminClient::new / AdminClient::with_timeout) or wrap the query future in
tokio::time::timeout(Duration::from_millis(self.timeout), ...) before awaiting.
Update the places where queries are performed (e.g.,
QueryMsgByIdSubCommand::execute / the method that performs the broker RPC and
the analogous code at the other occurrence around lines 344-359) to use
tokio::time::timeout and return a clear timeout error when the duration elapses.
- Around line 40-46: The CLI currently only accepts a single message_id string;
change the struct field message_id: String to message_id: Vec<String> and update
the #[arg(...)] attribute to accept multiple values (e.g. num_args = 1..) so
repeated -i flags are parsed; then iterate over message_id in the command
handler and call query_message for each id (update the code paths that use
message_id, including the other occurrences referenced around lines 357-359) so
the command supports batch lookups like `-i id1 -i id2`.
- Around line 245-283: The message body printing is unbounded; update the
decoded-body path (where Self::decode_body_with_charset is used) and the
print_hex_dump function to truncate large payloads: choose a constant limit
(e.g., MAX_BODY_CHARS for decoded text and MAX_HEX_LINES / MAX_HEX_BYTES for
binary) and when output exceeds the limit print only the first chunk and append
a clear "(truncated: X bytes remaining)" or similar indicator; in
print_hex_dump, limit the number of hex lines printed and show total bytes and a
truncation line instead of streaming everything. Ensure you reference and modify
the decode branch that currently prints body_str and the fn print_hex_dump(data:
&[u8]) to enforce these limits and add the truncation indicator.

---

Duplicate comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs`:
- Line 1: Update the top-of-file copyright header to use the repository standard
year "Copyright 2023" instead of the current calendar year; modify the
file-level comment at the beginning of query_msg_by_id_sub_command.rs so the
copyright line reads "Copyright 2023 The RocketMQ Rust Authors".
- Line 365: The broker address construction currently uses format!("{}:{}",
msg.store_host().ip(), msg.store_host().port()) which produces invalid addresses
for IPv6; update the code that sets broker_addr (the line constructing
broker_addr from msg.store_host()) to use the socket-address's Display
implementation instead (e.g., format!("{}", msg.store_host()) or
msg.store_host().to_string()) so IPv6 addresses are correctly bracketed and
printed.
- Around line 331-335: The RPC-hook branch uses
DefaultMQAdminExt::with_rpc_hook(rpc_hook) which leaves inner uninitialized and
causes MQAdminExt::start to unwrap None; instead construct with
DefaultMQAdminExt::new() and then attach the rpc_hook (e.g., via a setter or by
assigning the rpc_hook field) so inner is initialized the same way as the new()
path before calling start; update the code around
DefaultMQAdminExt::with_rpc_hook, DefaultMQAdminExt::new, the rpc_hook variable
and ensure inner is set prior to MQAdminExt::start to avoid the panic.
- Around line 63-68: The CLI currently accepts any --charset string (the charset
field on QueryMsgByIdSubCommand) but decoding falls back to utf8 via from_utf8()
which silently accepts unsupported names; change the flow to validate and use a
real decoder: replace the naïve from_utf8() usage in the message body decoding
logic (e.g., the function/method that reads/decodes the message body where
charset is used) with a lookup against a supported encodings map (or use a
library like encoding_rs/encoding) that recognizes common names (UTF-8, GBK,
ISO-8859-1, etc.), attempt to decode with the chosen decoder, and return an
error (or exit with a clear message) when the provided --charset is not
supported or decoding fails instead of silently falling back to UTF-8; update
any other decoding sites mentioned (the other block around the code referenced)
to use the same validated/explicit decoder path.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 0b8c04ed-6a35-4b5e-8c76-d274f0bc970f

📥 Commits

Reviewing files that changed from the base of the PR and between af2b151 and 9dca5bf.

📒 Files selected for processing (2)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs (1)

118-219: Consider extracting common constructor logic to reduce duplication.

All five with_* constructors repeat the same pattern: create DefaultMQAdminExtImpl, clone it, call set_inner(), and build Self. A private helper could reduce duplication:

♻️ Suggested helper method
impl DefaultMQAdminExt {
    fn build(
        rpc_hook: Option<Arc<dyn RPCHook>>,
        timeout_millis: Duration,
        admin_ext_group: CheetahString,
    ) -> Self {
        let client_config = ArcMut::new(ClientConfig::new());
        let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
            rpc_hook,
            timeout_millis,
            client_config.clone(),
            admin_ext_group.clone(),
        ));
        let inner = default_mqadmin_ext_impl.clone();
        default_mqadmin_ext_impl.set_inner(inner);
        Self {
            client_config,
            default_mqadmin_ext_impl,
            admin_ext_group,
            create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
            timeout_millis,
        }
    }

    pub fn with_timeout(timeout_millis: Duration) -> Self {
        Self::build(None, timeout_millis, CheetahString::from_static_str(ADMIN_EXT_GROUP))
    }

    pub fn with_rpc_hook(rpc_hook: Arc<dyn RPCHook>) -> Self {
        Self::build(Some(rpc_hook), Duration::from_millis(5000), CheetahString::from_static_str(ADMIN_EXT_GROUP))
    }
    // ... and so on for other constructors
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs`
around lines 118 - 219, The five constructors (with_timeout, with_rpc_hook,
with_rpc_hook_and_timeout, with_admin_ext_group,
with_admin_ext_group_and_timeout) duplicate the same setup logic; extract that
into a private helper (e.g., fn build(rpc_hook: Option<Arc<dyn RPCHook>>,
timeout_millis: Duration, admin_ext_group: CheetahString) -> Self) that creates
client_config, constructs DefaultMQAdminExtImpl, clones it, calls set_inner, and
returns Self, then refactor each public constructor to call this helper with the
appropriate arguments (use CheetahString::from_static_str(ADMIN_EXT_GROUP) and
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC where needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In
`@rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs`:
- Around line 118-219: The five constructors (with_timeout, with_rpc_hook,
with_rpc_hook_and_timeout, with_admin_ext_group,
with_admin_ext_group_and_timeout) duplicate the same setup logic; extract that
into a private helper (e.g., fn build(rpc_hook: Option<Arc<dyn RPCHook>>,
timeout_millis: Duration, admin_ext_group: CheetahString) -> Self) that creates
client_config, constructs DefaultMQAdminExtImpl, clones it, calls set_inner, and
returns Self, then refactor each public constructor to call this helper with the
appropriate arguments (use CheetahString::from_static_str(ADMIN_EXT_GROUP) and
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC where needed).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: f017b290-e4c2-401e-8c1f-4896d1b1ee90

📥 Commits

Reviewing files that changed from the base of the PR and between 9dca5bf and 71de569.

📒 Files selected for processing (2)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rocketmq-common/src/common/message/message_decoder.rs (1)

519-538: ⚠️ Potential issue | 🔴 Critical

decode_message_id can panic on invalid input despite returning Result.

The function returns Result<MessageId, String> but can panic on malformed input. When msg_id.len() is neither 32 nor 56, the code treats it as IPv6 and attempts fixed-size reads (copy_to_slice, get_i32, get_i64) that panic if buffer has insufficient data. validate_message_id already exists and enforces the required lengths—call it at the start to make this function panic-safe by contract.

Suggested fix
 pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
+    validate_message_id(msg_id)?;
+
     let bytes = util_all::string_to_bytes(msg_id)
         .ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
     let mut buffer = Bytes::from(bytes);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-common/src/common/message/message_decoder.rs` around lines 519 -
538, The decode_message_id function can panic on malformed input; call the
existing validate_message_id(msg_id) at the start of decode_message_id and
return its Err early if validation fails so the function never proceeds to
unsafe fixed-size reads; this ensures decode_message_id only runs
buffer.copy_to_slice / buffer.get_i32 / buffer.get_i64 when validate_message_id
has guaranteed the msg_id length and prevents panics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@rocketmq-common/src/common/message/message_decoder.rs`:
- Around line 519-538: The decode_message_id function can panic on malformed
input; call the existing validate_message_id(msg_id) at the start of
decode_message_id and return its Err early if validation fails so the function
never proceeds to unsafe fixed-size reads; this ensures decode_message_id only
runs buffer.copy_to_slice / buffer.get_i32 / buffer.get_i64 when
validate_message_id has guaranteed the msg_id length and prevents panics.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6abeab1e-0cfe-41dc-a3c9-6e9224c0abe6

📥 Commits

Reviewing files that changed from the base of the PR and between 71de569 and 7749457.

📒 Files selected for processing (4)
  • rocketmq-common/src/common/message/message_decoder.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs
  • rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message/query_msg_by_id_sub_command.rs

@codecov
Copy link

codecov bot commented Mar 15, 2026

Codecov Report

❌ Patch coverage is 60.33163% with 311 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.87%. Comparing base (4ad5ab7) to head (fe0de90).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...rc/commands/message/query_msg_by_id_sub_command.rs 68.68% 191 Missing ⚠️
...ketmq-admin-core/src/admin/default_mq_admin_ext.rs 0.00% 50 Missing ⚠️
...mq-client/src/implementation/mq_client_api_impl.rs 0.00% 26 Missing ⚠️
...etmq-client/src/admin/default_mq_admin_ext_impl.rs 0.00% 25 Missing ⚠️
...producer/producer_impl/default_mq_producer_impl.rs 0.00% 6 Missing ⚠️
.../commands/message/decode_message_id_sub_command.rs 0.00% 6 Missing ⚠️
...rocketmq-admin/rocketmq-admin-core/src/commands.rs 0.00% 5 Missing ⚠️
...ketmq-common/src/common/message/message_decoder.rs 98.18% 1 Missing ⚠️
...-admin/rocketmq-admin-core/src/commands/message.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6750      +/-   ##
==========================================
- Coverage   41.90%   41.87%   -0.03%     
==========================================
  Files         998      999       +1     
  Lines      140372   141659    +1287     
==========================================
+ Hits        58820    59325     +505     
- Misses      81552    82334     +782     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@mxsm mxsm merged commit b3a120c into mxsm:main Mar 15, 2026
8 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement QueryMsgById Command for Message ID-Based Lookup

4 participants