diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index c99d4ae93..aaf1434bc 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -52,46 +52,28 @@ pub mod message_envelope; pub mod routing_context; pub mod storage_metadata; -/// This module defines the `MessageTrait` trait, which provides a flexible interface for working +/// Defines the interface for RocketMQ message operations. /// -/// with message objects in RocketMQ. It includes methods for managing message properties, keys, -/// tags, body, and other metadata related to the message. +/// Provides methods for managing message properties, keys, tags, body, and metadata. pub trait MessageTrait: Any + Display + Debug { /// Sets the keys for the message. - /// - /// # Arguments - /// - /// * `keys` - The keys to set, converted into a `String`. #[inline] fn set_keys(&mut self, keys: CheetahString) { self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_KEYS), keys); } /// Adds a property to the message. - /// - /// # Arguments - /// - /// * `key` - The property key, converted into a `String`. - /// * `value` - The property value, converted into a `String`. fn put_property(&mut self, key: CheetahString, value: CheetahString); - /// Clears a specific property from the message. - /// - /// # Arguments - /// - /// * `name` - The name of the property to clear. + /// Removes the specified property from the message. fn clear_property(&mut self, name: &str); /// Adds a user-defined property to the message. /// - /// # Arguments - /// - /// * `name` - The name of the user property, converted into a `String`. - /// * `value` - The value of the user property, converted into a `String`. - /// /// # Errors /// - /// Returns an error if the property name is reserved by the system or if name/value is empty. + /// Returns an error if the property name is reserved by the system or if the name or value is + /// empty. fn put_user_property(&mut self, name: CheetahString, value: CheetahString) -> RocketMQResult<()> { if name.is_empty() || value.is_empty() { return Err(RocketMQError::InvalidProperty( @@ -107,132 +89,58 @@ pub trait MessageTrait: Any + Display + Debug { Ok(()) } - /// Retrieves a user-defined property from the message. - /// - /// # Arguments - /// - /// * `name` - The name of the user property to retrieve. - /// - /// # Returns - /// - /// An `Option` containing the property value if it exists, otherwise `None`. + /// Retrieves a user-defined property value. fn user_property(&self, name: &CheetahString) -> Option { self.property(name) } - /// Retrieves a reference to a user-defined property from the message. - /// - /// # Arguments - /// - /// * `name` - A reference to a `CheetahString` representing the name of the user property to - /// retrieve. - /// - /// # Returns - /// - /// An `Option<&CheetahString>` containing a reference to the property value if it exists, - /// otherwise `None`. + /// Retrieves a reference to a user-defined property value. fn user_property_ref(&self, name: &CheetahString) -> Option<&CheetahString> { self.property_ref(name) } - /// Retrieves a property from the message. - /// - /// # Arguments - /// - /// * `name` - The name of the property to retrieve. - /// - /// # Returns - /// - /// An `Option` containing the property value if it exists, otherwise `None`. + /// Retrieves a property value. fn property(&self, name: &CheetahString) -> Option; - /// Retrieves a reference to a property value from the message. - /// - /// # Arguments - /// - /// * `name` - A reference to a `CheetahString` representing the name of the property to - /// retrieve. - /// - /// # Returns - /// - /// An `Option<&CheetahString>` containing a reference to the property value if it exists, - /// otherwise `None`. + /// Retrieves a reference to a property value. fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString>; - /// Retrieves the topic of the message. - /// - /// # Returns - /// - /// A reference to the topic as a `&str`. + /// Returns the topic of the message. fn topic(&self) -> &CheetahString; /// Sets the topic for the message. - /// - /// # Arguments - /// - /// * `topic` - The topic to set, converted into a `String`. fn set_topic(&mut self, topic: CheetahString); - /// Retrieves the tags associated with the message. - /// - /// # Returns - /// - /// An `Option` containing the tags if they exist, otherwise `None`. + /// Returns the tags associated with the message. fn tags(&self) -> Option { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS)) } - /// Retrieves the reference of the tag associated with the message - /// - /// # Returns - /// - /// An `Option<&CheetahString>` containing a reference to the tags if they exist, otherwise - /// `None`. + /// Returns a reference to the tags associated with the message. fn get_tags_ref(&self) -> Option<&CheetahString> { self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_TAGS)) } /// Sets the tags for the message. - /// - /// # Arguments - /// - /// * `tags` - The tags to set, converted into a `String`. fn set_tags(&mut self, tags: CheetahString) { self.put_property(CheetahString::from_static_str(MessageConst::PROPERTY_TAGS), tags); } - /// Retrieves the keys associated with the message. - /// - /// # Returns - /// - /// An `Option` containing the keys if they exist, otherwise `None`. + /// Returns the keys associated with the message. fn get_keys(&self) -> Option { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS)) } - /// Retrieves the keys associated with the message. - /// - /// # Returns - /// - /// An `Option<&CheetahString>` containing a reference to the keys if they exist, otherwise - /// `None`. + /// Returns a reference to the keys associated with the message. fn get_keys_ref(&self) -> Option<&CheetahString> { self.property_ref(&CheetahString::from_static_str(MessageConst::PROPERTY_KEYS)) } - /// Sets multiple keys from a collection for the message. - /// - /// # Arguments - /// - /// * `key_collection` - A vector of keys to set. + /// Sets the message keys from a collection, joining them with spaces. fn set_keys_from_collection(&mut self, key_collection: Vec) { let keys = key_collection.join(MessageConst::KEY_SEPARATOR); self.set_keys(CheetahString::from_string(keys)); } - /// Retrieves the delay time level of the message. - /// - /// # Returns - /// - /// An `i32` representing the delay time level, defaults to 0 if not set or invalid. + /// Returns the delay time level of the message, or 0 if not set. fn get_delay_time_level(&self) -> i32 { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL)) .and_then(|v| v.parse().ok()) @@ -240,10 +148,6 @@ pub trait MessageTrait: Any + Display + Debug { } /// Sets the delay time level for the message. - /// - /// # Arguments - /// - /// * `level` - The delay time level to set. fn set_delay_time_level(&mut self, level: i32) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_DELAY_TIME_LEVEL), @@ -251,11 +155,8 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Checks if the message should wait for store acknowledgment. - /// - /// # Returns + /// Returns whether the message should wait for store acknowledgment. /// - /// `true` if the message should wait for store acknowledgment; `false` otherwise. /// Defaults to `true` if not set. fn is_wait_store_msg_ok(&self) -> bool { self.property(&CheetahString::from_static_str( @@ -266,10 +167,6 @@ pub trait MessageTrait: Any + Display + Debug { } /// Sets whether the message should wait for store acknowledgment. - /// - /// # Arguments - /// - /// * `wait_store_msg_ok` - A boolean indicating whether to wait for store acknowledgment. fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_WAIT_STORE_MSG_OK), @@ -278,10 +175,6 @@ pub trait MessageTrait: Any + Display + Debug { } /// Sets the instance ID for the message. - /// - /// # Arguments - /// - /// * `instance_id` - The instance ID to set. fn set_instance_id(&mut self, instance_id: CheetahString) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_INSTANCE_ID), @@ -289,62 +182,30 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Retrieves the flag associated with the message. - /// - /// # Returns - /// - /// An `i32` representing the flag. + /// Returns the flag associated with the message. fn get_flag(&self) -> i32; /// Sets the flag for the message. - /// - /// # Arguments - /// - /// * `flag` - The flag to set. fn set_flag(&mut self, flag: i32); - /// Retrieves the body of the message. - /// - /// # Returns - /// - /// A byte slice (`&[u8]`) representing the body of the message. + /// Returns the body of the message. fn get_body(&self) -> Option<&Bytes>; /// Sets the body of the message. - /// - /// # Arguments - /// - /// * `body` - The byte slice (`&[u8]`) to set as the body. fn set_body(&mut self, body: Bytes); - /// Retrieves all properties associated with the message. - /// - /// # Returns - /// - /// A reference to a `HashMap` containing the properties. + /// Returns all properties associated with the message. fn get_properties(&self) -> &HashMap; /// Sets multiple properties for the message. - /// - /// # Arguments - /// - /// * `properties` - A `HashMap` containing the properties to set. fn set_properties(&mut self, properties: HashMap); - /// Retrieves the buyer ID associated with the message. - /// - /// # Returns - /// - /// An `Option` containing the buyer ID if it exists, otherwise `None`. + /// Returns the buyer ID associated with the message. fn buyer_id(&self) -> Option { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID)) } /// Sets the buyer ID for the message. - /// - /// # Arguments - /// - /// * `buyer_id` - The buyer ID to set. fn set_buyer_id(&mut self, buyer_id: CheetahString) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_BUYER_ID), @@ -352,25 +213,13 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Retrieves the transaction ID associated with the message. - /// - /// # Returns - /// - /// A reference to the transaction ID as a `&str`. + /// Returns the transaction ID associated with the message. fn get_transaction_id(&self) -> Option<&CheetahString>; /// Sets the transaction ID for the message. - /// - /// # Arguments - /// - /// * `transaction_id` - The transaction ID to set. fn set_transaction_id(&mut self, transaction_id: CheetahString); /// Sets the delay time for the message in seconds. - /// - /// # Arguments - /// - /// * `sec` - The delay time in seconds. fn set_delay_time_sec(&mut self, sec: u64) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC), @@ -378,11 +227,7 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Retrieves the delay time for the message in seconds. - /// - /// # Returns - /// - /// The delay time in seconds, defaults to 0 if not set or invalid. + /// Returns the delay time for the message in seconds, or 0 if not set. fn get_delay_time_sec(&self) -> u64 { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_SEC)) .and_then(|v| v.parse().ok()) @@ -390,10 +235,6 @@ pub trait MessageTrait: Any + Display + Debug { } /// Sets the delay time for the message in milliseconds. - /// - /// # Arguments - /// - /// * `time_ms` - The delay time in milliseconds. fn set_delay_time_ms(&mut self, time_ms: u64) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS), @@ -401,11 +242,7 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Retrieves the delay time for the message in milliseconds. - /// - /// # Returns - /// - /// The delay time in milliseconds, defaults to 0 if not set or invalid. + /// Returns the delay time for the message in milliseconds, or 0 if not set. fn get_delay_time_ms(&self) -> u64 { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELAY_MS)) .and_then(|v| v.parse().ok()) @@ -413,10 +250,6 @@ pub trait MessageTrait: Any + Display + Debug { } /// Sets the delivery time for the message in milliseconds. - /// - /// # Arguments - /// - /// * `time_ms` - The delivery time in milliseconds. fn set_deliver_time_ms(&mut self, time_ms: u64) { self.put_property( CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS), @@ -424,59 +257,39 @@ pub trait MessageTrait: Any + Display + Debug { ); } - /// Retrieves the delivery time for the message in milliseconds. - /// - /// # Returns - /// - /// The delivery time in milliseconds, defaults to 0 if not set or invalid. + /// Returns the delivery time for the message in milliseconds, or 0 if not set. fn get_deliver_time_ms(&self) -> u64 { self.property(&CheetahString::from_static_str(MessageConst::PROPERTY_TIMER_DELIVER_MS)) .and_then(|v| v.parse().ok()) .unwrap_or(0) } - /// Retrieves a mutable reference to the compressed body of the message. - /// - /// # Returns - /// An `Option<&mut Bytes>` containing the compressed body, if it exists. + /// Returns a mutable reference to the compressed body of the message. fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes>; - /// Retrieves an immutable reference to the compressed body of the message. - /// - /// # Returns - /// An `Option<&Bytes>` containing the compressed body, if it exists. + /// Returns a reference to the compressed body of the message. fn get_compressed_body(&self) -> Option<&Bytes>; /// Sets the compressed body of the message. - /// - /// # Arguments - /// * `compressed_body` - A `Bytes` object representing the compressed body to set. fn set_compressed_body_mut(&mut self, compressed_body: Bytes); /// Takes ownership of the message body, leaving it empty. - /// - /// # Returns - /// An `Option` containing the message body if it exists, otherwise `None`. fn take_body(&mut self) -> Option; - /// Converts the message into a dynamic `Any` type. - /// - /// # Returns - /// - /// A reference to the message as `&dyn Any`. + /// Returns a reference to the message as a trait object. fn as_any(&self) -> &dyn Any; - /// Converts the message into a mutable dynamic `Any` type. - /// - /// # Returns - /// - /// A mutable reference to the message as `&mut dyn Any`. + /// Returns a mutable reference to the message as a trait object. fn as_any_mut(&mut self) -> &mut dyn Any; } +/// Magic code for message format version 1. pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481; + +/// Magic code for message format version 2. pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477; +/// Represents the message format version. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Default)] pub enum MessageVersion { #[default] @@ -494,6 +307,11 @@ impl fmt::Display for MessageVersion { } impl MessageVersion { + /// Returns the message version corresponding to the given magic code. + /// + /// # Errors + /// + /// Returns an error if the magic code is not recognized. pub fn value_of_magic_code(magic_code: i32) -> Result { match magic_code { MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1), @@ -502,6 +320,7 @@ impl MessageVersion { } } + /// Returns the magic code for this message version. pub fn get_magic_code(&self) -> i32 { match self { MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1, @@ -509,6 +328,7 @@ impl MessageVersion { } } + /// Returns the number of bytes used to encode the topic length. pub fn get_topic_length_size(&self) -> usize { match self { MessageVersion::V1 => 1, @@ -516,6 +336,7 @@ impl MessageVersion { } } + /// Reads and returns the topic length from the buffer, advancing the buffer position. pub fn get_topic_length(&self, buffer: &mut Bytes) -> usize { match self { MessageVersion::V1 => buffer.get_u8() as usize, @@ -523,6 +344,8 @@ impl MessageVersion { } } + /// Returns the topic length from the buffer at the specified index without advancing the + /// position. pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize { match self { MessageVersion::V1 => buffer[index] as usize, @@ -530,6 +353,7 @@ impl MessageVersion { } } + /// Writes the topic length to the buffer according to the message version. pub fn put_topic_length(&self, buffer: &mut Vec, topic_length: usize) { match self { MessageVersion::V1 => buffer.push(topic_length as u8), @@ -540,6 +364,7 @@ impl MessageVersion { } } + /// Returns `true` if this is message format version 1. pub fn is_v1(&self) -> bool { match self { MessageVersion::V1 => true, @@ -547,6 +372,7 @@ impl MessageVersion { } } + /// Returns `true` if this is message format version 2. pub fn is_v2(&self) -> bool { match self { MessageVersion::V1 => false, @@ -555,14 +381,18 @@ impl MessageVersion { } } +/// Defines constants for message property names and configuration values. pub struct MessageConst; impl MessageConst { pub const DUP_INFO: &'static str = "DUP_INFO"; pub const KEY_SEPARATOR: &'static str = " "; + /// Host address where the message was born. pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST"; + /// Timestamp when the message was born. pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP"; pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID"; + /// Time in seconds during which transaction checks are suppressed. pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str = "CHECK_IMMUNITY_TIME_IN_SECONDS"; pub const PROPERTY_CLUSTER: &'static str = "CLUSTER"; pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME"; @@ -572,9 +402,7 @@ impl MessageConst { pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY"; pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID"; pub const PROPERTY_STARTDE_LIVER_TIME: &'static str = "__STARTDELIVERTIME"; - /** - * properties for DLQ - */ + /// Original topic name for messages in the dead-letter queue. pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC"; pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO"; pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME"; @@ -622,33 +450,31 @@ impl MessageConst { pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG"; pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str = "TRAN_PREPARED_QUEUE_OFFSET"; pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG"; - /** - * the transient property key of groupSysFlag (set by the client when pulling messages) - */ + /// Transient property for group system flags set by the client when pulling messages. pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG"; - /** - * property which name starts with "__RMQ.TRANSIENT." is called transient one that will not - * be stored in broker disks. - */ + /// Prefix for transient properties that are not persisted to broker disk. pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT."; - /** - * the transient property key of topicSysFlag (set by the client when pulling messages) - */ + /// Transient property for topic system flags set by the client when pulling messages. pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG"; pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY"; pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT"; - // Timer engine type constants + /// Timer engine type identifier for RocksDB timeline implementation. pub const TIMER_ENGINE_ROCKSDB_TIMELINE: &'static str = "R"; + /// Timer engine type identifier for file-based time wheel implementation. pub const TIMER_ENGINE_FILE_TIME_WHEEL: &'static str = "F"; + /// Property name for timer engine type. pub const TIMER_ENGINE_TYPE: &'static str = "timerEngineType"; - // Index type constants + /// Index type identifier for message key indexing. pub const INDEX_KEY_TYPE: &'static str = "K"; + /// Index type identifier for unique indexing. pub const INDEX_UNIQUE_TYPE: &'static str = "U"; + /// Index type identifier for tag indexing. pub const INDEX_TAG_TYPE: &'static str = "T"; } +/// Set of system-reserved property names that cannot be used as user-defined properties. pub static STRING_HASH_SET: LazyLock> = LazyLock::new(|| { let mut set = HashSet::with_capacity(64); set.insert(MessageConst::PROPERTY_TRACE_SWITCH);