Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,13 +1375,10 @@ impl<MS: MessageStore> StateGetter for ConsumerStateGetter<MS> {
.topic_config_table()
.contains_key(topic)
{
let topic_full_name = CheetahString::from_string(NamespaceUtil::wrap_namespace(instance_id, topic));
let topic_full_name = NamespaceUtil::wrap_namespace(instance_id, topic);
self.broker_runtime_inner
.consumer_manager
.find_subscription_data(
CheetahString::from_string(NamespaceUtil::wrap_namespace(instance_id, group)).as_ref(),
topic_full_name.as_ref(),
)
.find_subscription_data(&NamespaceUtil::wrap_namespace(instance_id, group), &topic_full_name)
.is_some()
} else {
self.broker_runtime_inner
Expand Down
20 changes: 16 additions & 4 deletions rocketmq-client/src/base/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,28 @@ impl ClientConfig {

impl ClientConfig {
#[inline]
pub fn with_namespace(&mut self, resource: &str) -> CheetahString {
NamespaceUtil::wrap_namespace(self.get_namespace().unwrap_or_default().as_str(), resource).into()
pub fn with_namespace(&mut self, resource: impl Into<CheetahString>) -> CheetahString {
let resource = resource.into();
let namespace = self.get_namespace().unwrap_or_default();

// Fast path: no namespace needed, return resource directly
if namespace.is_empty() {
return resource;
}

// Fast path: resource already has namespace, return directly
if NamespaceUtil::is_already_with_namespace(resource.as_str(), namespace.as_str()) {
return resource;
}

NamespaceUtil::wrap_namespace(namespace, resource)
}

#[inline]
pub fn queue_with_namespace(&mut self, mut queue: MessageQueue) -> MessageQueue {
if let Some(namespace) = self.get_namespace() {
if !namespace.is_empty() {
let topic =
CheetahString::from_string(NamespaceUtil::wrap_namespace(namespace.as_str(), queue.get_topic()));
let topic = NamespaceUtil::wrap_namespace(namespace.as_str(), queue.get_topic());
queue.set_topic(topic);
return queue;
}
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-client/src/producer/default_mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl DefaultMQProducer {

impl DefaultMQProducer {
#[inline]
pub fn with_namespace(&mut self, resource: &str) -> CheetahString {
pub fn with_namespace(&mut self, resource: impl Into<CheetahString>) -> CheetahString {
self.client_config.with_namespace(resource)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,7 @@ impl DefaultMQProducerImpl {
/// Prepare message for retry (reset topic with namespace)
fn prepare_message_for_retry<T: MessageTrait>(&self, msg: &mut T, topic: &CheetahString) {
let namespace = self.client_config.namespace.as_ref().map(|s| s.as_str()).unwrap_or("");
msg.set_topic(CheetahString::from_string(NamespaceUtil::wrap_namespace(
namespace,
topic.as_str(),
)));
msg.set_topic(NamespaceUtil::wrap_namespace(namespace, topic.as_str()));
}

/// Handle send error - update fault item and log
Expand Down Expand Up @@ -2421,10 +2418,8 @@ impl DefaultMQProducerImpl {

async fn init_topic_route(&mut self) {
for topic in self.producer_config.topics() {
let new_topic = CheetahString::from_string(NamespaceUtil::wrap_namespace(
self.client_config.get_namespace().unwrap_or_default().as_str(),
topic,
));
let new_topic =
NamespaceUtil::wrap_namespace(self.client_config.get_namespace().unwrap_or_default().as_str(), topic);
let topic_publish_info = self.try_to_find_topic_publish_info(&new_topic).await;
if topic_publish_info.is_none() || !topic_publish_info.unwrap().ok() {
warn!(
Expand Down
44 changes: 32 additions & 12 deletions rocketmq-remoting/src/protocol/namespace_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use cheetah_string::CheetahString;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::topic::TopicValidator;

Expand Down Expand Up @@ -60,28 +61,39 @@ impl NamespaceUtil {
resource_with_namespace.to_string()
}

pub fn wrap_namespace(namespace: &str, resource_without_namespace: &str) -> String {
pub fn wrap_namespace(
namespace: impl Into<CheetahString>,
resource_without_namespace: impl Into<CheetahString>,
) -> CheetahString {
let namespace = namespace.into();
let resource_without_namespace = resource_without_namespace.into();

if namespace.is_empty() || resource_without_namespace.is_empty() {
return resource_without_namespace.to_string();
return resource_without_namespace;
}

if NamespaceUtil::is_system_resource(resource_without_namespace)
|| NamespaceUtil::is_already_with_namespace(resource_without_namespace, namespace)
if NamespaceUtil::is_system_resource(resource_without_namespace.as_str())
|| NamespaceUtil::is_already_with_namespace(resource_without_namespace.as_str(), namespace.as_str())
{
return resource_without_namespace.to_string();
return resource_without_namespace;
}

let mut string_builder = String::new();

if NamespaceUtil::is_retry_topic(resource_without_namespace) {
if NamespaceUtil::is_retry_topic(resource_without_namespace.as_str()) {
string_builder.push_str(mix_all::RETRY_GROUP_TOPIC_PREFIX);
}

if NamespaceUtil::is_dlq_topic(resource_without_namespace) {
if NamespaceUtil::is_dlq_topic(resource_without_namespace.as_str()) {
string_builder.push_str(mix_all::DLQ_GROUP_TOPIC_PREFIX);
}
let resource_without_retry_and_dlq = NamespaceUtil::without_retry_and_dlq(resource_without_namespace);
string_builder + namespace + &NamespaceUtil::NAMESPACE_SEPARATOR.to_string() + resource_without_retry_and_dlq
let resource_without_retry_and_dlq = NamespaceUtil::without_retry_and_dlq(resource_without_namespace.as_str());
CheetahString::from_string(
string_builder
+ namespace.as_str()
+ &NamespaceUtil::NAMESPACE_SEPARATOR.to_string()
+ resource_without_retry_and_dlq,
)
}

pub fn is_already_with_namespace(resource: &str, namespace: &str) -> bool {
Expand All @@ -94,12 +106,20 @@ impl NamespaceUtil {
resource_without_retry_and_dlq.starts_with(&format!("{}{}", namespace, NamespaceUtil::NAMESPACE_SEPARATOR))
}

pub fn wrap_namespace_and_retry(namespace: &str, consumer_group: &str) -> Option<String> {
pub fn wrap_namespace_and_retry(
namespace: impl Into<CheetahString>,
consumer_group: impl Into<CheetahString>,
) -> Option<CheetahString> {
let consumer_group = consumer_group.into();
if consumer_group.is_empty() {
return None;
}

Some(mix_all::RETRY_GROUP_TOPIC_PREFIX.to_string() + &NamespaceUtil::wrap_namespace(namespace, consumer_group))
let namespace = namespace.into();
let wrapped = NamespaceUtil::wrap_namespace(namespace, consumer_group);
Some(CheetahString::from_string(
mix_all::RETRY_GROUP_TOPIC_PREFIX.to_string() + wrapped.as_str(),
))
}

pub fn get_namespace_from_resource(resource: &str) -> String {
Expand Down Expand Up @@ -216,7 +236,7 @@ mod tests {
#[test]
fn wrap_namespace_and_retry_adds_namespace_and_retry() {
assert_eq!(
NamespaceUtil::wrap_namespace_and_retry("my_namespace", "my_group"),
NamespaceUtil::wrap_namespace_and_retry("my_namespace", "my_group").map(|s| s.as_str().to_string()),
Some("%RETRY%my_namespace%my_group".to_string())
);
}
Expand Down
Loading