Skip to content

Commit 821f95b

Browse files
committed
fix: fix err and ai comments
1 parent af2b151 commit 821f95b

File tree

4 files changed

+478
-81
lines changed

4 files changed

+478
-81
lines changed

rocketmq-common/src/common/message/message_decoder.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,9 @@ pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
502502
return Err("Message ID cannot be empty".to_string());
503503
}
504504

505-
if msg_id.len() != 32 && msg_id.len() != 40 {
505+
if msg_id.len() != 32 && msg_id.len() != 56 {
506506
return Err(format!(
507-
"Invalid message ID length: {}. Expected 32 characters (IPv4) or 40 characters (IPv6)",
507+
"Invalid message ID length: {}. Expected 32 characters (IPv4) or 56 characters (IPv6)",
508508
msg_id.len()
509509
));
510510
}
@@ -517,6 +517,7 @@ pub fn validate_message_id(msg_id: &str) -> Result<(), String> {
517517
}
518518

519519
pub fn decode_message_id(msg_id: &str) -> Result<MessageId, String> {
520+
validate_message_id(msg_id)?;
520521
let bytes = util_all::string_to_bytes(msg_id)
521522
.ok_or_else(|| "Failed to decode message ID: invalid hex string".to_string())?;
522523
let mut buffer = Bytes::from(bytes);
@@ -1070,4 +1071,45 @@ mod tests {
10701071
);
10711072
assert!(decode_properties(&mut bytes.freeze()).is_none());
10721073
}
1074+
1075+
#[test]
1076+
fn validate_message_id_ipv4_32_chars() {
1077+
let result = validate_message_id("AC11000100002A9F0000000000000001");
1078+
assert!(result.is_ok());
1079+
}
1080+
1081+
#[test]
1082+
fn validate_message_id_ipv6_56_chars() {
1083+
let result = validate_message_id("20010db800000000000000000000000100002A9F0000000000000001");
1084+
assert!(result.is_ok());
1085+
}
1086+
1087+
#[test]
1088+
fn validate_message_id_ipv6_40_chars_rejected() {
1089+
let result = validate_message_id("20010db800000000000000000000000100000001");
1090+
assert!(result.is_err());
1091+
if let Err(e) = result {
1092+
assert!(e.contains("Invalid message ID length"));
1093+
assert!(e.contains("56 characters (IPv6)"));
1094+
}
1095+
}
1096+
1097+
#[test]
1098+
fn decode_message_id_ipv6() {
1099+
let msg_id = "20010db800000000000000000000000100002A9F0000000000000001";
1100+
let message_id = decode_message_id(msg_id).unwrap();
1101+
assert_eq!(message_id.address, "[2001:db8::1]:10911".parse().unwrap());
1102+
assert_eq!(message_id.offset, 1);
1103+
}
1104+
1105+
#[test]
1106+
fn decode_message_id_ipv6_full_address() {
1107+
let msg_id = "20010db81234567800000000abcdef0100002A9F0000000000001234";
1108+
let message_id = decode_message_id(msg_id).unwrap();
1109+
assert_eq!(
1110+
message_id.address,
1111+
"[2001:db8:1234:5678::abcd:ef01]:10911".parse().unwrap()
1112+
);
1113+
assert_eq!(message_id.offset, 4660);
1114+
}
10731115
}

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,17 @@ impl DefaultMQAdminExt {
118118
pub fn with_timeout(timeout_millis: Duration) -> Self {
119119
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
120120
let client_config = ArcMut::new(ClientConfig::new());
121+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
122+
None,
123+
timeout_millis,
124+
client_config.clone(),
125+
admin_ext_group.clone(),
126+
));
127+
let inner = default_mqadmin_ext_impl.clone();
128+
default_mqadmin_ext_impl.set_inner(inner);
121129
Self {
122-
client_config: client_config.clone(),
123-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
124-
None,
125-
timeout_millis,
126-
client_config,
127-
admin_ext_group.clone(),
128-
)),
130+
client_config,
131+
default_mqadmin_ext_impl,
129132
admin_ext_group,
130133
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
131134
timeout_millis,
@@ -135,14 +138,17 @@ impl DefaultMQAdminExt {
135138
pub fn with_rpc_hook(rpc_hook: Arc<dyn RPCHook>) -> Self {
136139
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
137140
let client_config = ArcMut::new(ClientConfig::new());
141+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
142+
Some(rpc_hook),
143+
Duration::from_millis(5000),
144+
client_config.clone(),
145+
admin_ext_group.clone(),
146+
));
147+
let inner = default_mqadmin_ext_impl.clone();
148+
default_mqadmin_ext_impl.set_inner(inner);
138149
Self {
139-
client_config: client_config.clone(),
140-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
141-
Some(rpc_hook),
142-
Duration::from_millis(5000),
143-
client_config,
144-
admin_ext_group.clone(),
145-
)),
150+
client_config,
151+
default_mqadmin_ext_impl,
146152
admin_ext_group,
147153
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
148154
timeout_millis: Duration::from_millis(5000),
@@ -152,14 +158,17 @@ impl DefaultMQAdminExt {
152158
pub fn with_rpc_hook_and_timeout(rpc_hook: Arc<dyn RPCHook>, timeout_millis: Duration) -> Self {
153159
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
154160
let client_config = ArcMut::new(ClientConfig::new());
161+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
162+
Some(rpc_hook),
163+
timeout_millis,
164+
client_config.clone(),
165+
admin_ext_group.clone(),
166+
));
167+
let inner = default_mqadmin_ext_impl.clone();
168+
default_mqadmin_ext_impl.set_inner(inner);
155169
Self {
156-
client_config: client_config.clone(),
157-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
158-
Some(rpc_hook),
159-
timeout_millis,
160-
client_config,
161-
admin_ext_group.clone(),
162-
)),
170+
client_config,
171+
default_mqadmin_ext_impl,
163172
admin_ext_group,
164173
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
165174
timeout_millis,
@@ -169,14 +178,17 @@ impl DefaultMQAdminExt {
169178
pub fn with_admin_ext_group(admin_ext_group: impl Into<CheetahString>) -> Self {
170179
let admin_ext_group = admin_ext_group.into();
171180
let client_config = ArcMut::new(ClientConfig::new());
181+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
182+
None,
183+
Duration::from_millis(5000),
184+
client_config.clone(),
185+
admin_ext_group.clone(),
186+
));
187+
let inner = default_mqadmin_ext_impl.clone();
188+
default_mqadmin_ext_impl.set_inner(inner);
172189
Self {
173-
client_config: client_config.clone(),
174-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
175-
None,
176-
Duration::from_millis(5000),
177-
client_config,
178-
admin_ext_group.clone(),
179-
)),
190+
client_config,
191+
default_mqadmin_ext_impl,
180192
admin_ext_group,
181193
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
182194
timeout_millis: Duration::from_millis(5000),
@@ -189,14 +201,17 @@ impl DefaultMQAdminExt {
189201
) -> Self {
190202
let admin_ext_group = admin_ext_group.into();
191203
let client_config = ArcMut::new(ClientConfig::new());
204+
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
205+
None,
206+
timeout_millis,
207+
client_config.clone(),
208+
admin_ext_group.clone(),
209+
));
210+
let inner = default_mqadmin_ext_impl.clone();
211+
default_mqadmin_ext_impl.set_inner(inner);
192212
Self {
193-
client_config: client_config.clone(),
194-
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
195-
None,
196-
timeout_millis,
197-
client_config,
198-
admin_ext_group.clone(),
199-
)),
213+
client_config,
214+
default_mqadmin_ext_impl,
200215
admin_ext_group,
201216
create_topic_key: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
202217
timeout_millis,

rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl CommandExecute for MessageCommands {
103103
MessageCommands::DumpCompactionLog(value) => value.execute(rpc_hook).await,
104104
MessageCommands::PrintMessage(value) => value.execute(rpc_hook).await,
105105
MessageCommands::PrintMsgByQueue(value) => value.execute(rpc_hook).await,
106+
MessageCommands::QueryMsgById(value) => value.execute(rpc_hook).await,
106107
MessageCommands::QueryMsgByKey(value) => value.execute(rpc_hook).await,
107108
MessageCommands::SendMessage(value) => value.execute(rpc_hook).await,
108109
}

0 commit comments

Comments
 (0)