Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use rocketmq_remoting::protocol::header::get_meta_data_response_header::GetMetaD
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
use rocketmq_remoting::protocol::header::query_topic_consume_by_who_request_header::QueryTopicConsumeByWhoRequestHeader;
use rocketmq_remoting::protocol::header::view_broker_stats_data_request_header::ViewBrokerStatsDataRequestHeader;
use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
Expand Down Expand Up @@ -1818,10 +1819,38 @@ impl MQAdminExt for DefaultMQAdminExtImpl {
async fn query_message(
&self,
_cluster_name: CheetahString,
_topic: CheetahString,
_msg_id: CheetahString,
topic: CheetahString,
msg_id: CheetahString,
) -> rocketmq_error::RocketMQResult<MessageExt> {
unimplemented!("query_message not implemented yet")
let client_instance = self
.client_instance
.as_ref()
.ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;

let msg_id_str = msg_id.as_str();

if let Err(e) = message_decoder::validate_message_id(msg_id_str) {
return Err(rocketmq_error::RocketMQError::IllegalArgument(format!(
"Invalid message ID: {}",
e
)));
}

let message_id = message_decoder::decode_message_id(msg_id_str).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?;
let broker_addr =
CheetahString::from_string(format!("{}:{}", message_id.address.ip(), message_id.address.port()));

let request_header = ViewMessageRequestHeader {
topic: Some(topic),
offset: message_id.offset,
};

client_instance
.get_mq_client_api_impl()
.view_message(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
.await
}

async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
Expand Down
36 changes: 36 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ use rocketmq_remoting::protocol::header::unlock_batch_mq_request_header::UnlockB
use rocketmq_remoting::protocol::header::unregister_client_request_header::UnregisterClientRequestHeader;
use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
use rocketmq_remoting::protocol::header::update_user_request_header::UpdateUserRequestHeader;
use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
use rocketmq_remoting::protocol::headers::client::GetConsumerConnectionListRequestHeader;
use rocketmq_remoting::protocol::headers::view::SearchOffsetRequestHeader;
use rocketmq_remoting::protocol::headers::view::SearchOffsetResponseHeader;
Expand Down Expand Up @@ -3299,6 +3300,41 @@ impl MQClientAPIImpl {
))
}

pub async fn view_message(
&mut self,
addr: &CheetahString,
request_header: ViewMessageRequestHeader,
timeout_millis: u64,
) -> RocketMQResult<MessageExt> {
let request = RemotingCommand::create_request_command(RequestCode::ViewMessageById, request_header);
let response = self
.remoting_client
.invoke_request(Some(addr), request, timeout_millis)
.await?;

match ResponseCode::from(response.code()) {
ResponseCode::Success => {
if let Some(body) = response.get_body() {
let mut bytes = body.clone();
let body_len = bytes.len();
MessageDecoder::decode(&mut bytes, true, true, false, false, false).ok_or_else(|| {
mq_client_err!(format!(
"Failed to decode message from view_message response body: body_len={}, possible causes: \
CRC check failed or malformed message data",
body_len
))
})
} else {
Err(mq_client_err!("view_message response body is empty".to_string()))
}
Comment on lines +3315 to +3329
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid the panicking decoder on broker responses.

MessageDecoder::decode still performs unchecked get_i32 / copy_to_slice reads. If the broker returns Success with an empty or truncated body, this new path panics before the ok_or_else runs. Please route view_message through a fallible decoder or add explicit frame validation so malformed responses return Err instead of crashing the client.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs` around lines 3280 -
3294, The view_message path currently calls MessageDecoder::decode directly on
response.get_body() under ResponseCode::Success which can panic on truncated
bodies (unchecked get_i32/copy_to_slice); change this to a fallible decode or
add explicit frame validation before calling decode: either use a provided
safe/fallible API (e.g., a MessageDecoder::decode_checked or similar) and
propagate its Result, or validate response.get_body() length and required
header/frame fields (compare to expected minimum header size and payload
lengths) and return Err(...) when too short so the ok_or_else runs instead of
letting decode panic; update the view_message handling to call the safe variant
or perform checks around MessageDecoder::decode and map failures to
mq_client_err.

}
_ => Err(mq_client_err!(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string())
)),
}
}

fn build_queue_offset_sorted_map(
topic: &str,
msg_found_list: &[MessageExt],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2109,9 +2109,13 @@ impl DefaultMQProducerImpl {
local_transaction_state: LocalTransactionState,
) -> rocketmq_error::RocketMQResult<()> {
let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id {
MessageDecoder::decode_message_id(offset_msg_id)
MessageDecoder::decode_message_id(offset_msg_id).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?
} else {
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap())
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap()).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?
};
let transaction_id = send_result.transaction_id.clone();
let queue = self
Expand Down
36 changes: 29 additions & 7 deletions rocketmq-common/src/common/message/message_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,32 @@ pub fn decode_message(buffer: &mut Bytes) -> Message {
message
}

pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
let msg_id = msg_id.trim();

if msg_id.is_empty() {
return Err("Message ID cannot be empty".to_string());
}

if msg_id.len() != 32 && msg_id.len() != 40 {
return Err(format!(
"Invalid message ID length: {}. Expected 32 characters (IPv4) or 40 characters (IPv6)",
msg_id.len()
));
}

if !msg_id.chars().all(|c| c.is_ascii_hexdigit()) {
return Err("Message ID must be a valid hexadecimal string".to_string());
}

Ok(())
}

pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
let bytes = util_all::string_to_bytes(msg_id)
.ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let address = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
Expand All @@ -509,10 +531,10 @@ pub fn decode_message_id(msg_id: &str) -> MessageId {
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
Ok(MessageId {
address,
offset: buffer.get_i64(),
}
})
}

pub fn encode(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
Expand Down Expand Up @@ -891,7 +913,7 @@ mod tests {
#[test]
fn decode_message_id_ipv4() {
let msg_id = "7F0000010007D8260BF075769D36C348";
let message_id = decode_message_id(msg_id);
let message_id = decode_message_id(msg_id).unwrap();
assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
assert_eq!(message_id.offset, 860316681131967304);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1331,11 +1331,13 @@ impl MQAdminExt for DefaultMQAdminExt {

async fn query_message(
&self,
_cluster_name: CheetahString,
_topic: CheetahString,
_msg_id: CheetahString,
cluster_name: CheetahString,
topic: CheetahString,
msg_id: CheetahString,
) -> rocketmq_error::RocketMQResult<MessageExt> {
unimplemented!("query_message not implemented yet")
self.default_mqadmin_ext_impl
.query_message(cluster_name, topic, msg_id)
.await
}

async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ impl CommandExecute for ClassificationTablePrint {
command: "printMsgByQueue",
remark: "Print Message Detail by queueId.",
},
Command {
category: "Message",
command: "queryMsgById",
remark: "Query message by message ID.",
},
Command {
category: "Message",
command: "queryMsgByKey",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod decode_message_id_sub_command;
pub mod dump_compaction_log_sub_command;
pub mod print_message_sub_command;
pub mod print_msg_by_queue_sub_command;
pub mod query_msg_by_id_sub_command;
pub mod query_msg_by_key_sub_command;
pub mod send_message_sub_command;

Expand All @@ -31,6 +32,7 @@ use crate::commands::message::decode_message_id_sub_command::DecodeMessageIdSubC
use crate::commands::message::dump_compaction_log_sub_command::DumpCompactionLogSubCommand;
use crate::commands::message::print_message_sub_command::PrintMessageSubCommand;
use crate::commands::message::print_msg_by_queue_sub_command::PrintMsgByQueueSubCommand;
use crate::commands::message::query_msg_by_id_sub_command::QueryMsgByIdSubCommand;
use crate::commands::message::query_msg_by_key_sub_command::QueryMsgByKeySubCommand;
use crate::commands::message::send_message_sub_command::SendMessageSubCommand;
use crate::commands::CommandExecute;
Expand Down Expand Up @@ -72,6 +74,12 @@ pub enum MessageCommands {
)]
PrintMsgByQueue(PrintMsgByQueueSubCommand),

#[command(
name = "queryMsgById",
about = "Query message by message ID.",
long_about = None,
)]
QueryMsgById(QueryMsgByIdSubCommand),
#[command(
name = "queryMsgByKey",
about = "Query Message by Key.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use clap::Parser;
use rocketmq_common::MessageDecoder::decode_message_id;
use rocketmq_common::MessageDecoder::validate_message_id;
use rocketmq_error::RocketMQError;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::runtime::RPCHook;
Expand Down Expand Up @@ -43,23 +44,12 @@ impl CommandExecute for DecodeMessageIdSubCommand {
continue;
}

if msg_id.len() != 32 && msg_id.len() != 40 {
eprintln!(
"Invalid message ID: {}. Expected 32 characters (IPv4) or 40 characters (IPv6) hexadecimal string.",
msg_id
);
if let Err(e) = validate_message_id(msg_id) {
eprintln!("Invalid message ID: {}. {}", msg_id, e);
continue;
}

if !msg_id.chars().all(|c| c.is_ascii_hexdigit()) {
eprintln!(
"Invalid message ID: {}. Message ID must be a valid hexadecimal string.",
msg_id
);
continue;
}

match std::panic::catch_unwind(|| decode_message_id(msg_id)) {
match decode_message_id(msg_id) {
Ok(message_id) => {
let ip = message_id.address.ip();
let port = message_id.address.port();
Expand All @@ -74,10 +64,10 @@ impl CommandExecute for DecodeMessageIdSubCommand {
println!(" Offset Hex: {:#018X}", offset);
println!();
}
Err(_) => {
Err(e) => {
return Err(RocketMQError::Internal(format!(
"DecodeMessageIdSubCommand command failed: failed to decode message ID: {}",
msg_id
"DecodeMessageIdSubCommand command failed: failed to decode message ID '{}': {}",
msg_id, e
)));
}
}
Expand Down
Loading
Loading