Skip to content

Commit 86bdb06

Browse files
zhangjidi2016zhangjidi
andauthored
[ISSUE #123]Optimize groupList.query (#124)
Co-authored-by: zhangjidi <[email protected]>
1 parent 7a54427 commit 86bdb06

File tree

2 files changed

+58
-4
lines changed

2 files changed

+58
-4
lines changed

src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Set;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.RejectedExecutionHandler;
36+
import java.util.concurrent.ThreadFactory;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicLong;
3240
import java.util.stream.Collectors;
3341
import javax.annotation.Resource;
3442
import org.apache.commons.collections.CollectionUtils;
@@ -48,6 +56,7 @@
4856
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
4957
import org.apache.rocketmq.common.protocol.route.BrokerData;
5058
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
59+
import org.apache.rocketmq.common.utils.ThreadUtils;
5160
import org.apache.rocketmq.dashboard.config.RMQConfigure;
5261
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
5362
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
@@ -60,19 +69,46 @@
6069
import org.apache.rocketmq.dashboard.service.ConsumerService;
6170
import org.slf4j.Logger;
6271
import org.slf4j.LoggerFactory;
72+
import org.springframework.beans.factory.DisposableBean;
73+
import org.springframework.beans.factory.InitializingBean;
6374
import org.springframework.stereotype.Service;
6475

6576
import static com.google.common.base.Throwables.propagate;
6677

6778
@Service
68-
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
79+
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean {
6980
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
7081

7182
@Resource
7283
private RMQConfigure configure;
7384

7485
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
7586

87+
private ExecutorService executorService;
88+
89+
@Override
90+
public void afterPropertiesSet() {
91+
Runtime runtime = Runtime.getRuntime();
92+
int corePoolSize = Math.max(10, runtime.availableProcessors() * 2);
93+
int maximumPoolSize = Math.max(20, runtime.availableProcessors() * 2);
94+
ThreadFactory threadFactory = new ThreadFactory() {
95+
private final AtomicLong threadIndex = new AtomicLong(0);
96+
97+
@Override
98+
public Thread newThread(Runnable r) {
99+
return new Thread(r, "QueryGroup_" + this.threadIndex.incrementAndGet());
100+
}
101+
};
102+
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
103+
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
104+
new LinkedBlockingQueue<>(5000), threadFactory, handler);
105+
}
106+
107+
@Override
108+
public void destroy() {
109+
ThreadUtils.shutdownGracefully(executorService, 10L, TimeUnit.SECONDS);
110+
}
111+
76112
static {
77113
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
78114
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
@@ -97,10 +133,26 @@ public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) {
97133
catch (Exception err) {
98134
throw Throwables.propagate(err);
99135
}
100-
List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList();
136+
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
137+
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size());
101138
for (String consumerGroup : consumerGroupSet) {
102-
groupConsumeInfoList.add(queryGroup(consumerGroup));
139+
executorService.submit(() -> {
140+
try {
141+
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
142+
groupConsumeInfoList.add(consumeInfo);
143+
} catch (Exception e) {
144+
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
145+
} finally {
146+
countDownLatch.countDown();
147+
}
148+
});
103149
}
150+
try {
151+
countDownLatch.await(30, TimeUnit.SECONDS);
152+
} catch (InterruptedException e) {
153+
logger.error("query consumerGroup countDownLatch await Exception", e);
154+
}
155+
104156
if (!skipSysGroup) {
105157
groupConsumeInfoList.stream().map(group -> {
106158
if (SYSTEM_GROUP_SET.contains(group.getGroup())) {

src/test/java/org/apache/rocketmq/dashboard/controller/ConsumerControllerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class ConsumerControllerTest extends BaseControllerTest {
6767

6868
@Before
6969
public void init() throws Exception {
70+
consumerService.afterPropertiesSet();
7071
super.mockRmqConfigure();
7172
ClusterInfo clusterInfo = MockObjectUtil.createClusterInfo();
7273
when(mqAdminExt.examineBrokerClusterInfo()).thenReturn(clusterInfo);
@@ -93,9 +94,10 @@ public void testList() throws Exception {
9394
perform = mockMvc.perform(requestBuilder);
9495
perform.andExpect(status().isOk())
9596
.andExpect(jsonPath("$.data", hasSize(2)))
96-
.andExpect(jsonPath("$.data[0].group").value("group_test"))
9797
.andExpect(jsonPath("$.data[0].consumeType").value(ConsumeType.CONSUME_ACTIVELY.name()))
9898
.andExpect(jsonPath("$.data[0].messageModel").value(MessageModel.CLUSTERING.name()));
99+
// executorService shutdown
100+
consumerService.destroy();
99101
}
100102

101103
@Test

0 commit comments

Comments
 (0)