Skip to content

Commit bbabd1c

Browse files
authored
[ISSUES #281 #274 #285 #287] Speeds up topic and consumer queries, adds caching, and fixes delay/dead-letter topic mix-up (#286)
* fix: Resolved issue of query failure under a large number of topics and consumers #281 * fix: Expand the message ID query time range to avoid query failure * fix: Remove duplicates from topic queries, increase system topic recognition #287
1 parent e761854 commit bbabd1c

File tree

16 files changed

+444
-140
lines changed

16 files changed

+444
-140
lines changed

src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ public Object list(@RequestParam(value = "skipSysGroup", required = false) boole
5151
return consumerService.queryGroupList(skipSysGroup, address);
5252
}
5353

54+
@RequestMapping(value = "/group.refresh")
55+
@ResponseBody
56+
public Object refresh(String address,
57+
String consumerGroup) {
58+
return consumerService.refreshGroup(address, consumerGroup);
59+
}
60+
5461
@RequestMapping(value = "/group.query")
5562
@ResponseBody
5663
public Object groupQuery(@RequestParam String consumerGroup, String address) {

src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public Object list(@RequestParam(value = "skipSysProcess", required = false) boo
5656
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
5757
}
5858

59+
@RequestMapping(value = "/refresh", method = {RequestMethod.POST})
60+
@ResponseBody
61+
public Object refresh() {
62+
return topicService.refreshTopicList();
63+
}
64+
5965
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
6066
@ResponseBody
6167
public Object listTopicType() {

src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
2020
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
2121

22+
import java.util.Date;
2223
import java.util.List;
2324

2425
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
@@ -31,6 +32,7 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
3132
private int consumeTps;
3233
private long diffTotal = -1;
3334
private String subGroupType = "NORMAL";
35+
private Date updateTime;
3436

3537

3638
public String getGroup() {
@@ -112,4 +114,12 @@ public String getVersion() {
112114
public void setVersion(String version) {
113115
this.version = version;
114116
}
117+
118+
public Date getUpdateTime() {
119+
return updateTime;
120+
}
121+
122+
public void setUpdateTime(Date updateTime) {
123+
this.updateTime = updateTime;
124+
}
115125
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.service;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
21+
import org.apache.rocketmq.tools.admin.MQAdminExt;
22+
import org.springframework.beans.factory.annotation.Value;
23+
import org.springframework.stereotype.Service;
24+
25+
import javax.annotation.PostConstruct;
26+
import javax.annotation.Resource;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
32+
@Slf4j
33+
@Service
34+
public class ClusterInfoService {
35+
36+
@Resource
37+
private MQAdminExt mqAdminExt;
38+
39+
@Value("${rocketmq.cluster.cache.expire:60000}")
40+
private long cacheExpireMs;
41+
42+
43+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
44+
private final AtomicReference<ClusterInfo> cachedRef = new AtomicReference<>();
45+
46+
47+
@PostConstruct
48+
public void init() {
49+
scheduler.scheduleAtFixedRate(this::refresh,
50+
0, cacheExpireMs / 2, TimeUnit.MILLISECONDS);
51+
}
52+
53+
public ClusterInfo get() {
54+
ClusterInfo info = cachedRef.get();
55+
return info != null ? info : refresh();
56+
}
57+
58+
public synchronized ClusterInfo refresh() {
59+
try {
60+
ClusterInfo fresh = mqAdminExt.examineBrokerClusterInfo();
61+
cachedRef.set(fresh);
62+
return fresh;
63+
} catch (Exception e) {
64+
log.warn("Refresh cluster info failed", e);
65+
ClusterInfo old = cachedRef.get();
66+
if (old != null) {
67+
return old;
68+
}
69+
throw new IllegalStateException("No cluster info available", e);
70+
}
71+
}
72+
}

src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.rocketmq.dashboard.service;
1919

20+
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
21+
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
22+
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
2023
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
2124
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
2225
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
2326
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
2427
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
25-
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
26-
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
27-
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
2828

2929
import java.util.List;
3030
import java.util.Map;
@@ -55,4 +55,6 @@ public interface ConsumerService {
5555
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
5656

5757
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
58+
59+
Object refreshGroup(String address, String consumerGroup);
5860
}

src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,5 @@ public interface TopicService {
5454

5555
SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest);
5656

57+
boolean refreshTopicList();
5758
}

src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package org.apache.rocketmq.dashboard.service.client;
1818

1919
import com.google.common.base.Throwables;
20+
2021
import java.io.UnsupportedEncodingException;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Properties;
2425
import java.util.Set;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ConcurrentMap;
28+
2529
import org.apache.rocketmq.client.QueryResult;
2630
import org.apache.rocketmq.client.exception.MQBrokerException;
2731
import org.apache.rocketmq.client.exception.MQClientException;
@@ -87,9 +91,15 @@
8791
public class MQAdminExtImpl implements MQAdminExt {
8892
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
8993

90-
public MQAdminExtImpl() {
94+
private static final ConcurrentMap<String, TopicConfigSerializeWrapper> TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>();
95+
96+
public MQAdminExtImpl() {}
97+
98+
public static void clearTopicConfigCache() {
99+
TOPIC_CONFIG_CACHE.clear();
91100
}
92101

102+
93103
@Override
94104
public void updateBrokerConfig(String brokerAddr, Properties properties)
95105
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
@@ -145,7 +155,7 @@ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, Strin
145155
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
146156
RemotingCommand response = null;
147157
try {
148-
response = remotingClient.invokeSync(addr, request, 3000);
158+
response = remotingClient.invokeSync(addr, request, 8000);
149159
}
150160
catch (Exception err) {
151161
Throwables.throwIfUnchecked(err);
@@ -164,19 +174,27 @@ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, Strin
164174

165175
@Override
166176
public TopicConfig examineTopicConfig(String addr, String topic) throws MQBrokerException {
177+
TopicConfigSerializeWrapper cachedWrapper = TOPIC_CONFIG_CACHE.get(addr);
178+
179+
if (cachedWrapper != null && cachedWrapper.getTopicConfigTable().containsKey(topic)) {
180+
return cachedWrapper.getTopicConfigTable().get(topic);
181+
}
182+
167183
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
168184
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
169185
RemotingCommand response = null;
170186
try {
171187
response = remotingClient.invokeSync(addr, request, 3000);
172-
}
173-
catch (Exception err) {
188+
} catch (Exception err) {
174189
Throwables.throwIfUnchecked(err);
175190
throw new RuntimeException(err);
176191
}
177192
switch (response.getCode()) {
178193
case ResponseCode.SUCCESS: {
179-
TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
194+
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
195+
decode(response.getBody(), TopicConfigSerializeWrapper.class);
196+
197+
TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper);
180198
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
181199
}
182200
default:
@@ -468,14 +486,14 @@ public MessageExt viewMessage(String topic,
468486
Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
469487
if (clusterList == null || clusterList.isEmpty()) {
470488
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32,
471-
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get();
489+
0L, Long.MAX_VALUE, true).get();
472490
if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
473491
return qr.getMessageList().get(0);
474492
}
475493
} else {
476494
for (String name : clusterList) {
477495
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32,
478-
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get();
496+
0L, Long.MAX_VALUE, true).get();
479497
if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) {
480498
return qr.getMessageList().get(0);
481499
}

0 commit comments

Comments
 (0)