Skip to content

Commit d58e13d

Browse files
1294566108yuanziwei
andauthored
Proxy Support And ConsumerGroup Enhancement (#207)
* Support dashboard v4-v5 switch And query for v5 topic * Modify tag name * Support proxy-module And Fix the problem of showing wrong consumerGroup-info --------- Co-authored-by: yuanziwei <[email protected]>
1 parent e7cb315 commit d58e13d

File tree

22 files changed

+516
-48
lines changed

22 files changed

+516
-48
lines changed

src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class RMQConfigure {
4343
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env
4444
private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
4545

46+
private volatile String proxyAddr;
47+
4648
private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
4749

4850

@@ -62,6 +64,8 @@ public class RMQConfigure {
6264

6365
private List<String> namesrvAddrs = new ArrayList<>();
6466

67+
private List<String> proxyAddrs = new ArrayList<>();
68+
6569
public String getAccessKey() {
6670
return accessKey;
6771
}
@@ -86,6 +90,25 @@ public List<String> getNamesrvAddrs() {
8690
return namesrvAddrs;
8791
}
8892

93+
public List<String> getProxyAddrs() {
94+
return this.proxyAddrs;
95+
}
96+
97+
public void setProxyAddrs(List<String> proxyAddrs) {
98+
this.proxyAddrs = proxyAddrs;
99+
if (CollectionUtils.isNotEmpty(proxyAddrs)) {
100+
this.setProxyAddr(proxyAddrs.get(0));
101+
}
102+
}
103+
104+
public String getProxyAddr() {
105+
return proxyAddr;
106+
}
107+
108+
public void setProxyAddr(String proxyAddr) {
109+
this.proxyAddr = proxyAddr;
110+
}
111+
89112
public void setNamesrvAddrs(List<String> namesrvAddrs) {
90113
this.namesrvAddrs = namesrvAddrs;
91114
if (CollectionUtils.isNotEmpty(namesrvAddrs)) {

src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ public class ConsumerController {
4747

4848
@RequestMapping(value = "/groupList.query")
4949
@ResponseBody
50-
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) {
51-
return consumerService.queryGroupList(skipSysGroup);
50+
public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) {
51+
return consumerService.queryGroupList(skipSysGroup, address);
5252
}
5353

5454
@RequestMapping(value = "/group.query")
5555
@ResponseBody
56-
public Object groupQuery(@RequestParam String consumerGroup) {
57-
return consumerService.queryGroup(consumerGroup);
56+
public Object groupQuery(@RequestParam String consumerGroup, String address) {
57+
return consumerService.queryGroup(consumerGroup, address);
5858
}
5959

6060
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public Object fetchBrokerNameList(@RequestParam String consumerGroup) {
9999

100100
@RequestMapping(value = "/queryTopicByConsumer.query")
101101
@ResponseBody
102-
public Object queryConsumerByTopic(@RequestParam String consumerGroup) {
103-
return consumerService.queryConsumeStatsListByGroupName(consumerGroup);
102+
public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) {
103+
return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address);
104104
}
105105

106106
@RequestMapping(value = "/consumerConnection.query")
107107
@ResponseBody
108-
public Object consumerConnection(@RequestParam(required = false) String consumerGroup) {
109-
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup);
108+
public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) {
109+
ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address);
110110
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
111111
return consumerConnection;
112112
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.controller;
18+
19+
import org.apache.rocketmq.dashboard.permisssion.Permission;
20+
import org.apache.rocketmq.dashboard.service.ProxyService;
21+
import org.springframework.stereotype.Controller;
22+
import org.springframework.web.bind.annotation.RequestMapping;
23+
import org.springframework.web.bind.annotation.RequestMethod;
24+
import org.springframework.web.bind.annotation.RequestParam;
25+
import org.springframework.web.bind.annotation.ResponseBody;
26+
27+
import javax.annotation.Resource;
28+
29+
@Controller
30+
@RequestMapping("/proxy")
31+
@Permission
32+
public class ProxyController {
33+
@Resource
34+
private ProxyService proxyService;
35+
@RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
36+
@ResponseBody
37+
public Object homePage() {
38+
return proxyService.getProxyHomePage();
39+
}
40+
41+
@RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
42+
@ResponseBody
43+
public Object addProxyAddr(@RequestParam String newProxyAddr) {
44+
proxyService.addProxyAddrList(newProxyAddr);
45+
return true;
46+
}
47+
48+
@RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
49+
@ResponseBody
50+
public Object updateProxyAddr(@RequestParam String proxyAddr) {
51+
proxyService.updateProxyAddrList(proxyAddr);
52+
return true;
53+
}
54+
}

src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
2020
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
2121

22+
import java.util.List;
23+
2224
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
2325
private String group;
2426
private String version;
2527
private int count;
2628
private ConsumeType consumeType;
2729
private MessageModel messageModel;
30+
private List<String> address;
2831
private int consumeTps;
2932
private long diffTotal = -1;
3033
private String subGroupType = "NORMAL";
@@ -70,6 +73,22 @@ public void setDiffTotal(long diffTotal) {
7073
this.diffTotal = diffTotal;
7174
}
7275

76+
public List<String> getAddress() {
77+
return address;
78+
}
79+
80+
public void setAddress(List<String> address) {
81+
this.address = address;
82+
}
83+
84+
public String getSubGroupType() {
85+
return subGroupType;
86+
}
87+
88+
public void setSubGroupType(String subGroupType) {
89+
this.subGroupType = subGroupType;
90+
}
91+
7392
@Override
7493
public int compareTo(GroupConsumeInfo o) {
7594
if (this.count != o.count) {
@@ -93,12 +112,4 @@ public String getVersion() {
93112
public void setVersion(String version) {
94113
this.version = version;
95114
}
96-
97-
public String getSubGroupType() {
98-
return subGroupType;
99-
}
100-
101-
public void setSubGroupType(String subGroupType) {
102-
this.subGroupType = subGroupType;
103-
}
104115
}

src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
import java.util.Set;
3232

3333
public interface ConsumerService {
34-
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup);
34+
List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
3535

36-
GroupConsumeInfo queryGroup(String consumerGroup);
36+
GroupConsumeInfo queryGroup(String consumerGroup, String address);
3737

3838

39-
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
39+
List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address);
4040

4141
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName);
4242

@@ -52,7 +52,7 @@ public interface ConsumerService {
5252

5353
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
5454

55-
ConsumerConnection getConsumerConnection(String consumerGroup);
55+
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
5656

5757
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
5858
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.service;
18+
19+
import java.util.Map;
20+
21+
public interface ProxyService {
22+
23+
void addProxyAddrList(String proxyAddr);
24+
25+
void updateProxyAddrList(String proxyAddr);
26+
27+
Map<String, Object> getProxyHomePage();
28+
}

src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ public ConsumeStats examineConsumeStats(String brokerAddr, String consumerGroup,
627627
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
628628
RemotingConnectException, MQBrokerException {
629629
// TODO Auto-generated method stub
630-
throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'");
630+
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis);
631631
}
632632

633633
@Override
@@ -639,8 +639,7 @@ public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String consum
639639
@Override
640640
public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr)
641641
throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
642-
// TODO Auto-generated method stub
643-
throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'");
642+
return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr);
644643
}
645644

646645
@Override
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.service.client;
18+
19+
import org.apache.rocketmq.client.exception.MQBrokerException;
20+
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
21+
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
22+
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
23+
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
24+
25+
public interface ProxyAdmin {
26+
27+
ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException;
28+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.service.client;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.apache.commons.pool2.impl.GenericObjectPool;
21+
import org.apache.rocketmq.client.exception.MQBrokerException;
22+
import org.apache.rocketmq.remoting.RemotingClient;
23+
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
24+
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
25+
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
26+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
27+
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
28+
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
29+
import org.apache.rocketmq.tools.admin.MQAdminExt;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.stereotype.Service;
32+
33+
import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
34+
35+
@Slf4j
36+
@Service
37+
public class ProxyAdminImpl implements ProxyAdmin {
38+
@Autowired
39+
private GenericObjectPool<MQAdminExt> mqAdminExtPool;
40+
41+
@Override
42+
public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
43+
try {
44+
MQAdminInstance.createMQAdmin(mqAdminExtPool);
45+
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
46+
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
47+
requestHeader.setConsumerGroup(consumerGroup);
48+
RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader);
49+
RemotingCommand response = remotingClient.invokeSync(addr, request, 3000);
50+
switch (response.getCode()) {
51+
case 0:
52+
return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
53+
default:
54+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
55+
}
56+
} finally {
57+
MQAdminInstance.returnMQAdmin(mqAdminExtPool);
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)