Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use rocketmq_remoting::protocol::header::get_meta_data_response_header::GetMetaD
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
use rocketmq_remoting::protocol::header::query_topic_consume_by_who_request_header::QueryTopicConsumeByWhoRequestHeader;
use rocketmq_remoting::protocol::header::view_broker_stats_data_request_header::ViewBrokerStatsDataRequestHeader;
use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
Expand Down Expand Up @@ -1818,10 +1819,38 @@ impl MQAdminExt for DefaultMQAdminExtImpl {
async fn query_message(
&self,
_cluster_name: CheetahString,
_topic: CheetahString,
_msg_id: CheetahString,
topic: CheetahString,
msg_id: CheetahString,
) -> rocketmq_error::RocketMQResult<MessageExt> {
unimplemented!("query_message not implemented yet")
let client_instance = self
.client_instance
.as_ref()
.ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;

let msg_id_str = msg_id.as_str();

if let Err(e) = message_decoder::validate_message_id(msg_id_str) {
return Err(rocketmq_error::RocketMQError::IllegalArgument(format!(
"Invalid message ID: {}",
e
)));
}

let message_id = message_decoder::decode_message_id(msg_id_str).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?;
let broker_addr =
CheetahString::from_string(format!("{}:{}", message_id.address.ip(), message_id.address.port()));

let request_header = ViewMessageRequestHeader {
topic: Some(topic),
offset: message_id.offset,
};

client_instance
.get_mq_client_api_impl()
.view_message(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
.await
}

async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
Expand Down
36 changes: 36 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ use rocketmq_remoting::protocol::header::unlock_batch_mq_request_header::UnlockB
use rocketmq_remoting::protocol::header::unregister_client_request_header::UnregisterClientRequestHeader;
use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
use rocketmq_remoting::protocol::header::update_user_request_header::UpdateUserRequestHeader;
use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
use rocketmq_remoting::protocol::headers::client::GetConsumerConnectionListRequestHeader;
use rocketmq_remoting::protocol::headers::view::SearchOffsetRequestHeader;
use rocketmq_remoting::protocol::headers::view::SearchOffsetResponseHeader;
Expand Down Expand Up @@ -3299,6 +3300,41 @@ impl MQClientAPIImpl {
))
}

pub async fn view_message(
&mut self,
addr: &CheetahString,
request_header: ViewMessageRequestHeader,
timeout_millis: u64,
) -> RocketMQResult<MessageExt> {
let request = RemotingCommand::create_request_command(RequestCode::ViewMessageById, request_header);
let response = self
.remoting_client
.invoke_request(Some(addr), request, timeout_millis)
.await?;

match ResponseCode::from(response.code()) {
ResponseCode::Success => {
if let Some(body) = response.get_body() {
let mut bytes = body.clone();
let body_len = bytes.len();
MessageDecoder::decode(&mut bytes, true, true, false, false, false).ok_or_else(|| {
mq_client_err!(format!(
"Failed to decode message from view_message response body: body_len={}, possible causes: \
CRC check failed or malformed message data",
body_len
))
})
} else {
Err(mq_client_err!("view_message response body is empty".to_string()))
}
Comment on lines +3315 to +3329
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid the panicking decoder on broker responses.

MessageDecoder::decode still performs unchecked get_i32 / copy_to_slice reads. If the broker returns Success with an empty or truncated body, this new path panics before the ok_or_else runs. Please route view_message through a fallible decoder or add explicit frame validation so malformed responses return Err instead of crashing the client.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rocketmq-client/src/implementation/mq_client_api_impl.rs` around lines 3280 -
3294, The view_message path currently calls MessageDecoder::decode directly on
response.get_body() under ResponseCode::Success which can panic on truncated
bodies (unchecked get_i32/copy_to_slice); change this to a fallible decode or
add explicit frame validation before calling decode: either use a provided
safe/fallible API (e.g., a MessageDecoder::decode_checked or similar) and
propagate its Result, or validate response.get_body() length and required
header/frame fields (compare to expected minimum header size and payload
lengths) and return Err(...) when too short so the ok_or_else runs instead of
letting decode panic; update the view_message handling to call the safe variant
or perform checks around MessageDecoder::decode and map failures to
mq_client_err.

}
_ => Err(mq_client_err!(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string())
)),
}
}

fn build_queue_offset_sorted_map(
topic: &str,
msg_found_list: &[MessageExt],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2109,9 +2109,13 @@ impl DefaultMQProducerImpl {
local_transaction_state: LocalTransactionState,
) -> rocketmq_error::RocketMQResult<()> {
let id = if let Some(ref offset_msg_id) = send_result.offset_msg_id {
MessageDecoder::decode_message_id(offset_msg_id)
MessageDecoder::decode_message_id(offset_msg_id).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?
} else {
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap())
MessageDecoder::decode_message_id(send_result.msg_id.as_ref().unwrap()).map_err(|e| {
rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
})?
};
let transaction_id = send_result.transaction_id.clone();
let queue = self
Expand Down
82 changes: 75 additions & 7 deletions rocketmq-common/src/common/message/message_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,37 @@ pub fn decode_message(buffer: &mut Bytes) -> Message {
message
}

pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
const MSG_ID_IPV4_LEN: usize = 32;
const MSG_ID_IPV6_LEN: usize = 56;

pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
let msg_id = msg_id.trim();

if msg_id.is_empty() {
return Err("Message ID cannot be empty".to_string());
}

let len = msg_id.len();
if len != MSG_ID_IPV4_LEN && len != MSG_ID_IPV6_LEN {
return Err(format!(
"Invalid message ID length: {len}. Expected {MSG_ID_IPV4_LEN} characters (IPv4) or {MSG_ID_IPV6_LEN} \
characters (IPv6)"
));
}

if !msg_id.bytes().all(|b| b.is_ascii_hexdigit()) {
return Err("Message ID must be a valid hexadecimal string".to_string());
}

Ok(())
}

pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
validate_message_id(msg_id)?;
let bytes = util_all::string_to_bytes(msg_id)
.ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let address = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
Expand All @@ -509,10 +536,10 @@ pub fn decode_message_id(msg_id: &str) -> MessageId {
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
Ok(MessageId {
address,
offset: buffer.get_i64(),
}
})
}

pub fn encode(message_ext: &MessageExt, need_compress: bool) -> rocketmq_error::RocketMQResult<Bytes> {
Expand Down Expand Up @@ -891,7 +918,7 @@ mod tests {
#[test]
fn decode_message_id_ipv4() {
let msg_id = "7F0000010007D8260BF075769D36C348";
let message_id = decode_message_id(msg_id);
let message_id = decode_message_id(msg_id).unwrap();
assert_eq!(message_id.address, "127.0.0.1:55334".parse().unwrap());
assert_eq!(message_id.offset, 860316681131967304);
}
Expand Down Expand Up @@ -1048,4 +1075,45 @@ mod tests {
);
assert!(decode_properties(&mut bytes.freeze()).is_none());
}

#[test]
fn validate_message_id_ipv4_32_chars() {
let result = validate_message_id("AC11000100002A9F0000000000000001");
assert!(result.is_ok());
}

#[test]
fn validate_message_id_ipv6_56_chars() {
let result = validate_message_id("20010db800000000000000000000000100002A9F0000000000000001");
assert!(result.is_ok());
}

#[test]
fn validate_message_id_ipv6_40_chars_rejected() {
let result = validate_message_id("20010db800000000000000000000000100000001");
assert!(result.is_err());
if let Err(e) = result {
assert!(e.contains("Invalid message ID length"));
assert!(e.contains("56 characters (IPv6)"));
}
}

#[test]
fn decode_message_id_ipv6() {
let msg_id = "20010db800000000000000000000000100002A9F0000000000000001";
let message_id = decode_message_id(msg_id).unwrap();
assert_eq!(message_id.address, "[2001:db8::1]:10911".parse().unwrap());
assert_eq!(message_id.offset, 1);
}

#[test]
fn decode_message_id_ipv6_full_address() {
let msg_id = "20010db81234567800000000abcdef0100002A9F0000000000001234";
let message_id = decode_message_id(msg_id).unwrap();
assert_eq!(
message_id.address,
"[2001:db8:1234:5678::abcd:ef01]:10911".parse().unwrap()
);
assert_eq!(message_id.offset, 4660);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ impl DefaultMQAdminExt {
pub fn with_timeout(timeout_millis: Duration) -> Self {
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
let client_config = ArcMut::new(ClientConfig::new());
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
None,
timeout_millis,
client_config.clone(),
admin_ext_group.clone(),
));
let inner = default_mqadmin_ext_impl.clone();
default_mqadmin_ext_impl.set_inner(inner);
Self {
client_config: client_config.clone(),
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
None,
timeout_millis,
client_config,
admin_ext_group.clone(),
)),
client_config,
default_mqadmin_ext_impl,
admin_ext_group,
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
timeout_millis,
Expand All @@ -135,14 +138,17 @@ impl DefaultMQAdminExt {
pub fn with_rpc_hook(rpc_hook: Arc<dyn RPCHook>) -> Self {
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
let client_config = ArcMut::new(ClientConfig::new());
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
Some(rpc_hook),
Duration::from_millis(5000),
client_config.clone(),
admin_ext_group.clone(),
));
let inner = default_mqadmin_ext_impl.clone();
default_mqadmin_ext_impl.set_inner(inner);
Self {
client_config: client_config.clone(),
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
Some(rpc_hook),
Duration::from_millis(5000),
client_config,
admin_ext_group.clone(),
)),
client_config,
default_mqadmin_ext_impl,
admin_ext_group,
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
timeout_millis: Duration::from_millis(5000),
Expand All @@ -152,14 +158,17 @@ impl DefaultMQAdminExt {
pub fn with_rpc_hook_and_timeout(rpc_hook: Arc<dyn RPCHook>, timeout_millis: Duration) -> Self {
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
let client_config = ArcMut::new(ClientConfig::new());
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
Some(rpc_hook),
timeout_millis,
client_config.clone(),
admin_ext_group.clone(),
));
let inner = default_mqadmin_ext_impl.clone();
default_mqadmin_ext_impl.set_inner(inner);
Self {
client_config: client_config.clone(),
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
Some(rpc_hook),
timeout_millis,
client_config,
admin_ext_group.clone(),
)),
client_config,
default_mqadmin_ext_impl,
admin_ext_group,
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
timeout_millis,
Expand All @@ -169,14 +178,17 @@ impl DefaultMQAdminExt {
pub fn with_admin_ext_group(admin_ext_group: impl Into<CheetahString>) -> Self {
let admin_ext_group = admin_ext_group.into();
let client_config = ArcMut::new(ClientConfig::new());
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
None,
Duration::from_millis(5000),
client_config.clone(),
admin_ext_group.clone(),
));
let inner = default_mqadmin_ext_impl.clone();
default_mqadmin_ext_impl.set_inner(inner);
Self {
client_config: client_config.clone(),
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
None,
Duration::from_millis(5000),
client_config,
admin_ext_group.clone(),
)),
client_config,
default_mqadmin_ext_impl,
admin_ext_group,
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
timeout_millis: Duration::from_millis(5000),
Expand All @@ -189,14 +201,17 @@ impl DefaultMQAdminExt {
) -> Self {
let admin_ext_group = admin_ext_group.into();
let client_config = ArcMut::new(ClientConfig::new());
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
None,
timeout_millis,
client_config.clone(),
admin_ext_group.clone(),
));
let inner = default_mqadmin_ext_impl.clone();
default_mqadmin_ext_impl.set_inner(inner);
Self {
client_config: client_config.clone(),
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
None,
timeout_millis,
client_config,
admin_ext_group.clone(),
)),
client_config,
default_mqadmin_ext_impl,
admin_ext_group,
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
timeout_millis,
Expand Down Expand Up @@ -1331,11 +1346,13 @@ impl MQAdminExt for DefaultMQAdminExt {

async fn query_message(
&self,
_cluster_name: CheetahString,
_topic: CheetahString,
_msg_id: CheetahString,
cluster_name: CheetahString,
topic: CheetahString,
msg_id: CheetahString,
) -> rocketmq_error::RocketMQResult<MessageExt> {
unimplemented!("query_message not implemented yet")
self.default_mqadmin_ext_impl
.query_message(cluster_name, topic, msg_id)
.await
}

async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ impl CommandExecute for ClassificationTablePrint {
command: "printMsgByQueue",
remark: "Print Message Detail by queueId.",
},
Command {
category: "Message",
command: "queryMsgById",
remark: "Query message by message ID.",
},
Command {
category: "Message",
command: "queryMsgByKey",
Expand Down
Loading
Loading