Skip to content

Commit fe0de90

Browse files
committed
fix: fix err and ai comments
1 parent af2b151 commit fe0de90

File tree

4 files changed

+485
-84
lines changed

4 files changed

+485
-84
lines changed

rocketmq-common/src/common/message/message_decoder.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,28 +495,33 @@ pub fn decode_message(buffer: &mut Bytes) -> Message {
495495
message
496496
}
497497

498+
const MSG_ID_IPV4_LEN: usize = 32;
499+
const MSG_ID_IPV6_LEN: usize = 56;
500+
498501
pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
499502
let msg_id = msg_id.trim();
500503

501504
if msg_id.is_empty() {
502505
return Err("Message ID cannot be empty".to_string());
503506
}
504507

505-
if msg_id.len() != 32 && msg_id.len() != 40 {
508+
let len = msg_id.len();
509+
if len != MSG_ID_IPV4_LEN && len != MSG_ID_IPV6_LEN {
506510
return Err(format!(
507-
"Invalid message ID length: {}. Expected 32 characters (IPv4) or 40 characters (IPv6)",
508-
msg_id.len()
511+
"Invalid message ID length: {len}. Expected {MSG_ID_IPV4_LEN} characters (IPv4) or {MSG_ID_IPV6_LEN} \
512+
characters (IPv6)"
509513
));
510514
}
511515

512-
if !msg_id.chars().all(|c| c.is_ascii_hexdigit()) {
516+
if !msg_id.bytes().all(|b| b.is_ascii_hexdigit()) {
513517
return Err("Message ID must be a valid hexadecimal string".to_string());
514518
}
515519

516520
Ok(())
517521
}
518522

519523
pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
524+
validate_message_id(msg_id)?;
520525
let bytes = util_all::string_to_bytes(msg_id)
521526
.ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
522527
let mut buffer = Bytes::from(bytes);
@@ -1070,4 +1075,45 @@ mod tests {
10701075
);
10711076
assert!(decode_properties(&mut bytes.freeze()).is_none());
10721077
}
1078+
1079+
#[test]
1080+
fn validate_message_id_ipv4_32_chars() {
1081+
let result = validate_message_id("AC11000100002A9F0000000000000001");
1082+
assert!(result.is_ok());
1083+
}
1084+
1085+
#[test]
1086+
fn validate_message_id_ipv6_56_chars() {
1087+
let result = validate_message_id("20010db800000000000000000000000100002A9F0000000000000001");
1088+
assert!(result.is_ok());
1089+
}
1090+
1091+
#[test]
1092+
fn validate_message_id_ipv6_40_chars_rejected() {
1093+
let result = validate_message_id("20010db800000000000000000000000100000001");
1094+
assert!(result.is_err());
1095+
if let Err(e) = result {
1096+
assert!(e.contains("Invalid message ID length"));
1097+
assert!(e.contains("56 characters (IPv6)"));
1098+
}
1099+
}
1100+
1101+
#[test]
1102+
fn decode_message_id_ipv6() {
1103+
let msg_id = "20010db800000000000000000000000100002A9F0000000000000001";
1104+
let message_id = decode_message_id(msg_id).unwrap();
1105+
assert_eq!(message_id.address, "[2001:db8::1]:10911".parse().unwrap());
1106+
assert_eq!(message_id.offset, 1);
1107+
}
1108+
1109+
#[test]
1110+
fn decode_message_id_ipv6_full_address() {
1111+
let msg_id = "20010db81234567800000000abcdef0100002A9F0000000000001234";
1112+
let message_id = decode_message_id(msg_id).unwrap();
1113+
assert_eq!(
1114+
message_id.address,
1115+
"[2001:db8:1234:5678::abcd:ef01]:10911".parse().unwrap()
1116+
);
1117+
assert_eq!(message_id.offset, 4660);
1118+
}
10731119
}

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,17 @@ impl DefaultMQAdminExt {
118118
pub fn with_timeout(timeout_millis: Duration) -> Self {
119119
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
120120
let client_config = ArcMut::new(ClientConfig::new());
121+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
122+
None,
123+
timeout_millis,
124+
client_config.clone(),
125+
admin_ext_group.clone(),
126+
));
127+
let inner = default_mqadmin_ext_impl.clone();
128+
default_mqadmin_ext_impl.set_inner(inner);
121129
Self {
122-
client_config: client_config.clone(),
123-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
124-
None,
125-
timeout_millis,
126-
client_config,
127-
admin_ext_group.clone(),
128-
)),
130+
client_config,
131+
default_mqadmin_ext_impl,
129132
admin_ext_group,
130133
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
131134
timeout_millis,
@@ -135,14 +138,17 @@ impl DefaultMQAdminExt {
135138
pub fn with_rpc_hook(rpc_hook: Arc<dyn RPCHook>) -> Self {
136139
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
137140
let client_config = ArcMut::new(ClientConfig::new());
141+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
142+
Some(rpc_hook),
143+
Duration::from_millis(5000),
144+
client_config.clone(),
145+
admin_ext_group.clone(),
146+
));
147+
let inner = default_mqadmin_ext_impl.clone();
148+
default_mqadmin_ext_impl.set_inner(inner);
138149
Self {
139-
client_config: client_config.clone(),
140-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
141-
Some(rpc_hook),
142-
Duration::from_millis(5000),
143-
client_config,
144-
admin_ext_group.clone(),
145-
)),
150+
client_config,
151+
default_mqadmin_ext_impl,
146152
admin_ext_group,
147153
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
148154
timeout_millis: Duration::from_millis(5000),
@@ -152,14 +158,17 @@ impl DefaultMQAdminExt {
152158
pub fn with_rpc_hook_and_timeout(rpc_hook: Arc<dyn RPCHook>, timeout_millis: Duration) -> Self {
153159
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
154160
let client_config = ArcMut::new(ClientConfig::new());
161+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
162+
Some(rpc_hook),
163+
timeout_millis,
164+
client_config.clone(),
165+
admin_ext_group.clone(),
166+
));
167+
let inner = default_mqadmin_ext_impl.clone();
168+
default_mqadmin_ext_impl.set_inner(inner);
155169
Self {
156-
client_config: client_config.clone(),
157-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
158-
Some(rpc_hook),
159-
timeout_millis,
160-
client_config,
161-
admin_ext_group.clone(),
162-
)),
170+
client_config,
171+
default_mqadmin_ext_impl,
163172
admin_ext_group,
164173
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
165174
timeout_millis,
@@ -169,14 +178,17 @@ impl DefaultMQAdminExt {
169178
pub fn with_admin_ext_group(admin_ext_group: impl Into<CheetahString>) -> Self {
170179
let admin_ext_group = admin_ext_group.into();
171180
let client_config = ArcMut::new(ClientConfig::new());
181+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
182+
None,
183+
Duration::from_millis(5000),
184+
client_config.clone(),
185+
admin_ext_group.clone(),
186+
));
187+
let inner = default_mqadmin_ext_impl.clone();
188+
default_mqadmin_ext_impl.set_inner(inner);
172189
Self {
173-
client_config: client_config.clone(),
174-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
175-
None,
176-
Duration::from_millis(5000),
177-
client_config,
178-
admin_ext_group.clone(),
179-
)),
190+
client_config,
191+
default_mqadmin_ext_impl,
180192
admin_ext_group,
181193
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
182194
timeout_millis: Duration::from_millis(5000),
@@ -189,14 +201,17 @@ impl DefaultMQAdminExt {
189201
) -> Self {
190202
let admin_ext_group = admin_ext_group.into();
191203
let client_config = ArcMut::new(ClientConfig::new());
204+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
205+
None,
206+
timeout_millis,
207+
client_config.clone(),
208+
admin_ext_group.clone(),
209+
));
210+
let inner = default_mqadmin_ext_impl.clone();
211+
default_mqadmin_ext_impl.set_inner(inner);
192212
Self {
193-
client_config: client_config.clone(),
194-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
195-
None,
196-
timeout_millis,
197-
client_config,
198-
admin_ext_group.clone(),
199-
)),
213+
client_config,
214+
default_mqadmin_ext_impl,
200215
admin_ext_group,
201216
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
202217
timeout_millis,

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl CommandExecute for MessageCommands {
103103
MessageCommands::DumpCompactionLog(value) => value.execute(rpc_hook).await,
104104
MessageCommands::PrintMessage(value) => value.execute(rpc_hook).await,
105105
MessageCommands::PrintMsgByQueue(value) => value.execute(rpc_hook).await,
106+
MessageCommands::QueryMsgById(value) => value.execute(rpc_hook).await,
106107
MessageCommands::QueryMsgByKey(value) => value.execute(rpc_hook).await,
107108
MessageCommands::SendMessage(value) => value.execute(rpc_hook).await,
108109
}

0 commit comments

Comments
 (0)