From b0b26e31f6ca0d73587e25219da1337c28528547 Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Fri, 28 Nov 2025 09:53:42 +0800 Subject: [PATCH 1/3] add configuration to enable/disable namespace --- .../rocketmq/proxy/config/ProxyConfig.java | 9 +++++++ .../proxy/grpc/v2/common/GrpcConverter.java | 27 ++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index bc1919c07a1..aa1a3a910c8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -271,6 +271,7 @@ public class ProxyConfig implements ConfigFile { private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000; private boolean enableBatchAck = false; + private boolean namespaceEnable = false; @Override public void initData() { @@ -1545,4 +1546,12 @@ public int getReturnHandleGroupThreadPoolNums() { public void setReturnHandleGroupThreadPoolNums(int returnHandleGroupThreadPoolNums) { this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums; } + + public boolean isNamespaceEnable() { + return namespaceEnable; + } + + public void setNamespaceEnable(boolean namespaceEnable) { + this.namespaceEnable = namespaceEnable; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 33a4e1312f8..5608492dec3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -45,8 +45,11 @@ import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import static org.apache.rocketmq.remoting.protocol.NamespaceUtil.STRING_BLANK; + public class GrpcConverter { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -72,11 +75,21 @@ public MessageQueue buildMessageQueue(MessageExt messageExt, String brokerName) .setId(0) .build(); } + if (ConfigurationManager.getProxyConfig().isNamespaceEnable()) { + return MessageQueue.newBuilder() + .setId(messageExt.getQueueId()) + .setTopic(Resource.newBuilder() + .setName(NamespaceUtil.withoutNamespace(messageExt.getTopic())) + .setResourceNamespace(NamespaceUtil.getNamespaceFromResource(messageExt.getTopic())) + .build()) + .setBroker(broker) + .build(); + } return MessageQueue.newBuilder() .setId(messageExt.getQueueId()) .setTopic(Resource.newBuilder() - .setName(NamespaceUtil.withoutNamespace(messageExt.getTopic())) - .setResourceNamespace(NamespaceUtil.getNamespaceFromResource(messageExt.getTopic())) + .setName(messageExt.getTopic()) + .setResourceNamespace(STRING_BLANK) .build()) .setBroker(broker) .build(); @@ -252,9 +265,15 @@ protected SystemProperties buildSystemProperties(MessageExt messageExt) { } public Resource buildResource(String resourceNameWithNamespace) { + if (ConfigurationManager.getProxyConfig().isNamespaceEnable()) { + return Resource.newBuilder() + .setResourceNamespace(NamespaceUtil.getNamespaceFromResource(resourceNameWithNamespace)) + .setName(NamespaceUtil.withoutNamespace(resourceNameWithNamespace)) + .build(); + } return Resource.newBuilder() - .setResourceNamespace(NamespaceUtil.getNamespaceFromResource(resourceNameWithNamespace)) - .setName(NamespaceUtil.withoutNamespace(resourceNameWithNamespace)) + .setResourceNamespace(STRING_BLANK) + .setName(resourceNameWithNamespace) .build(); } } From 9a58066de00bb87368452e2f89d10079afffaac8 Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Mon, 1 Dec 2025 15:06:03 +0800 Subject: [PATCH 2/3] fix test --- .../rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java | 4 +++- .../service/mqclient/ProxyClientRemotingProcessorTest.java | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java index bc9b8a60b40..7006d2a8ab9 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java @@ -19,13 +19,15 @@ import apache.rocketmq.v2.MessageQueue; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class GrpcConverterTest { @Test - public void testBuildMessageQueue() { + public void testBuildMessageQueue() throws Exception{ + ConfigurationManager.intConfig(); String topic = "topic"; String brokerName = "brokerName"; int queueId = 1; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index 2cdd92ba5be..aeeb0df6c1a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -87,6 +88,7 @@ public void testTransactionCheck() throws Exception { ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId"); when(producerManager.getAvailableChannel(anyString())) .thenReturn(grpcClientChannel); + ConfigurationManager.intConfig(); ProxyClientRemotingProcessor processor = new ProxyClientRemotingProcessor(producerManager); CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); From 45fabe3a814a784b515b4a99761364e623ed86b2 Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Mon, 1 Dec 2025 15:46:35 +0800 Subject: [PATCH 3/3] fix test --- .../apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java index 7006d2a8ab9..13295133889 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverterTest.java @@ -26,7 +26,7 @@ public class GrpcConverterTest { @Test - public void testBuildMessageQueue() throws Exception{ + public void testBuildMessageQueue() throws Exception { ConfigurationManager.intConfig(); String topic = "topic"; String brokerName = "brokerName";