diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java index f2a42c0aed9..225c58a17d0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java @@ -61,7 +61,12 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
String topicName) throws Exception { MessageQueueView messageQueueView = getAllMessageQueueView(ctx, topicName); TopicRouteData topicRouteData = messageQueueView.getTopicRouteData(); - return new ProxyTopicRouteData(topicRouteData, grpcPort); + + if (requestHostAndPortList != null && !requestHostAndPortList.isEmpty()) { + return new ProxyTopicRouteData(topicRouteData, requestHostAndPortList); + } else { + return new ProxyTopicRouteData(topicRouteData, grpcPort); + } } @Override diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java index 1ad39a1db64..ea42e576515 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java @@ -19,6 +19,7 @@ import com.google.common.net.HostAndPort; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; @@ -102,4 +103,22 @@ public void testGetTopicRouteForProxy() throws Throwable { ConfigurationManager.getProxyConfig().getGrpcServerPort()))), proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID)); } + + @Test + public void testGetTopicRouteForProxyWithRemotingClient() throws Throwable { + ProxyContext ctx = ProxyContext.create(); + + List remotingAddressList = Lists.newArrayList( + new Address(Address.AddressScheme.IPv4, HostAndPort.fromParts( + HostAndPort.fromString(BROKER_ADDR).getHost(), + ConfigurationManager.getProxyConfig().getRemotingListenPort())) + ); + + ProxyTopicRouteData proxyTopicRouteData = this.topicRouteService.getTopicRouteForProxy(ctx, remotingAddressList, TOPIC); + + assertEquals(1, proxyTopicRouteData.getBrokerDatas().size()); + assertEquals( + remotingAddressList, + proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID)); + } }