Skip to content

Commit eb51da6

Browse files
Crazylycheeicenfly
andauthored
Merge branch 'refactor' of github.com:apache/rocketmq-dashboard into refactor (#307)
* pref: optimize the response speed of the query api * pref: optimize the response speed of the query api (#273) * Fixing and Adding Unit Tests (#266) (#278) * fix: align top navigation bar styles #279 * fix code style --------- Co-authored-by: icenfly <[email protected]>
1 parent c85aa2e commit eb51da6

File tree

11 files changed

+1242
-79
lines changed

11 files changed

+1242
-79
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
.project
66
.factorypath
77
.settings/
8-
.vscode
8+
.vscode
9+
htmlReport/

src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.rocketmq.common.message.MessageClientIDSetter;
3838
import org.apache.rocketmq.common.message.MessageExt;
3939
import org.apache.rocketmq.common.message.MessageQueue;
40+
import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
4041
import org.apache.rocketmq.remoting.protocol.body.Connection;
4142
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
4243
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
@@ -127,11 +128,11 @@ public List<MessageView> queryMessageByTopic(String topic, final long begin, fin
127128
if (isEnableAcl) {
128129
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
129130
}
130-
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
131+
AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
132+
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
131133
List<MessageView> messageViewList = Lists.newArrayList();
132134
try {
133135
String subExpression = "*";
134-
consumer.start();
135136
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
136137
for (MessageQueue mq : mqs) {
137138
long minOffset = consumer.searchOffset(mq, begin);
@@ -188,8 +189,6 @@ public int compare(MessageView o1, MessageView o2) {
188189
} catch (Exception e) {
189190
Throwables.throwIfUnchecked(e);
190191
throw new RuntimeException(e);
191-
} finally {
192-
consumer.shutdown();
193192
}
194193
}
195194

@@ -263,15 +262,15 @@ private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
263262
if (isEnableAcl) {
264263
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
265264
}
266-
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
265+
AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
266+
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
267267

268268
long total = 0;
269269
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
270270

271271
List<MessageView> messageViews = new ArrayList<>();
272272

273273
try {
274-
consumer.start();
275274
Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
276275
int idx = 0;
277276
for (MessageQueue messageQueue : messageQueues) {
@@ -394,8 +393,6 @@ private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
394393
} catch (Exception e) {
395394
Throwables.throwIfUnchecked(e);
396395
throw new RuntimeException(e);
397-
} finally {
398-
consumer.shutdown();
399396
}
400397
}
401398

@@ -405,14 +402,14 @@ private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<
405402
if (isEnableAcl) {
406403
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
407404
}
408-
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
405+
AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper();
406+
DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS());
409407
List<MessageView> messageViews = new ArrayList<>();
410408

411409
long offset = query.getPageNum() * query.getPageSize();
412410

413411
long total = 0;
414412
try {
415-
consumer.start();
416413
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
417414
long start = queueOffsetInfo.getStart();
418415
long end = queueOffsetInfo.getEnd();
@@ -462,8 +459,6 @@ private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<
462459
} catch (Exception e) {
463460
Throwables.throwIfUnchecked(e);
464461
throw new RuntimeException(e);
465-
} finally {
466-
consumer.shutdown();
467462
}
468463
}
469464

src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@
8080
@Service
8181
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
8282

83+
private transient DefaultMQProducer systemTopicProducer;
84+
85+
private final Object producerLock = new Object();
86+
8387
private Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
8488

8589
@Autowired
@@ -355,18 +359,40 @@ private TopicList getSystemTopicList() {
355359
if (isEnableAcl) {
356360
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
357361
}
358-
DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
359-
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
360-
producer.setNamesrvAddr(configure.getNamesrvAddr());
362+
363+
// ensures thread safety
364+
if (systemTopicProducer == null) {
365+
synchronized (producerLock) {
366+
if (systemTopicProducer == null) {
367+
systemTopicProducer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
368+
systemTopicProducer.setInstanceName("SystemTopicProducer-" + System.currentTimeMillis());
369+
systemTopicProducer.setNamesrvAddr(configure.getNamesrvAddr());
370+
try {
371+
systemTopicProducer.start();
372+
} catch (Exception e) {
373+
systemTopicProducer = null;
374+
Throwables.throwIfUnchecked(e);
375+
throw new RuntimeException(e);
376+
}
377+
}
378+
}
379+
}
361380

362381
try {
363-
producer.start();
364-
return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
382+
return systemTopicProducer.getDefaultMQProducerImpl()
383+
.getmQClientFactory()
384+
.getMQClientAPIImpl()
385+
.getSystemTopicList(20000L);
365386
} catch (Exception e) {
387+
// If the call fails, close and clean up the producer, and it will be re-created next time.
388+
synchronized (producerLock) {
389+
if (systemTopicProducer != null) {
390+
systemTopicProducer.shutdown();
391+
systemTopicProducer = null;
392+
}
393+
}
366394
Throwables.throwIfUnchecked(e);
367395
throw new RuntimeException(e);
368-
} finally {
369-
producer.shutdown();
370396
}
371397
}
372398

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.dashboard.support;
19+
20+
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
21+
import org.apache.rocketmq.client.exception.MQClientException;
22+
import org.apache.rocketmq.common.MixAll;
23+
import org.apache.rocketmq.remoting.RPCHook;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import java.time.Duration;
27+
import java.time.Instant;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
public class AutoCloseConsumerWrapper {
35+
36+
private final Logger logger = LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class);
37+
38+
private static final AtomicReference<DefaultMQPullConsumer> CONSUMER_REF = new AtomicReference<>();
39+
private final AtomicBoolean isTaskScheduled = new AtomicBoolean(false);
40+
private final AtomicBoolean isClosing = new AtomicBoolean(false);
41+
private static volatile Instant lastUsedTime = Instant.now();
42+
43+
44+
private static final ScheduledExecutorService SCHEDULER =
45+
Executors.newSingleThreadScheduledExecutor();
46+
47+
public AutoCloseConsumerWrapper() {
48+
startIdleCheckTask();
49+
}
50+
51+
52+
public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS) {
53+
lastUsedTime = Instant.now();
54+
55+
DefaultMQPullConsumer consumer = CONSUMER_REF.get();
56+
if (consumer == null) {
57+
synchronized (this) {
58+
consumer = CONSUMER_REF.get();
59+
if (consumer == null) {
60+
consumer = createNewConsumer(rpcHook,useTLS);
61+
CONSUMER_REF.set(consumer);
62+
}
63+
try {
64+
consumer.start();
65+
} catch (MQClientException e) {
66+
consumer.shutdown();
67+
CONSUMER_REF.set(null);
68+
throw new RuntimeException("Failed to start consumer", e);
69+
70+
}
71+
}
72+
}
73+
return consumer;
74+
}
75+
76+
77+
protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) {
78+
return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) {
79+
{ setUseTLS(useTLS); } };
80+
}
81+
82+
private void startIdleCheckTask() {
83+
if (!isTaskScheduled.get()) {
84+
synchronized (this) {
85+
if (!isTaskScheduled.get()) {
86+
SCHEDULER.scheduleWithFixedDelay(() -> {
87+
try {
88+
checkAndCloseIdleConsumer();
89+
} catch (Exception e) {
90+
logger.error("Idle check failed", e);
91+
}
92+
}, 1, 1, TimeUnit.MINUTES);
93+
94+
isTaskScheduled.set(true);
95+
}
96+
}
97+
}
98+
}
99+
100+
public void checkAndCloseIdleConsumer() {
101+
if (shouldClose()) {
102+
synchronized (this) {
103+
if (shouldClose()) {
104+
close();
105+
}
106+
}
107+
}
108+
}
109+
110+
private boolean shouldClose() {
111+
long idleTimeoutMs = 60_000;
112+
return CONSUMER_REF.get() != null &&
113+
Duration.between(lastUsedTime, Instant.now()).toMillis() > idleTimeoutMs;
114+
}
115+
116+
117+
public void close() {
118+
if (isClosing.compareAndSet(false, true)) {
119+
try {
120+
DefaultMQPullConsumer consumer = CONSUMER_REF.getAndSet(null);
121+
if (consumer != null) {
122+
consumer.shutdown();
123+
}
124+
isTaskScheduled.set(false);
125+
} finally {
126+
isClosing.set(false);
127+
}
128+
}
129+
}
130+
131+
}

src/test/java/org/apache/rocketmq/dashboard/admin/MQAdminExtImplTest.java

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@
8787
import static org.mockito.Mockito.doNothing;
8888
import static org.mockito.Mockito.mock;
8989
import static org.mockito.Mockito.when;
90+
import java.util.concurrent.ConcurrentHashMap;
91+
import java.util.concurrent.ConcurrentMap;
92+
import static org.mockito.ArgumentMatchers.eq;
9093

9194
@RunWith(MockitoJUnitRunner.Silent.class)
9295
public class MQAdminExtImplTest {
@@ -195,62 +198,55 @@ public void testCreateAndUpdateSubscriptionGroupConfig() throws Exception {
195198
@Test
196199
public void testExamineSubscriptionGroupConfig() throws Exception {
197200
assertNotNull(mqAdminExtImpl);
198-
{
199-
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
200-
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
201-
response2.setCode(ResponseCode.SUCCESS);
202-
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createSubscriptionGroupWrapper()));
203-
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
204-
.thenThrow(new RuntimeException("invokeSync exception"))
205-
.thenReturn(response1).thenReturn(response2);
206-
}
207-
// invokeSync exception
208-
try {
209-
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "topic_test");
210-
} catch (Exception e) {
211-
Assert.assertEquals(e.getMessage(), "invokeSync exception");
212-
}
213-
214-
// responseCode is not success
215-
try {
216-
mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
217-
} catch (Exception e) {
218-
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
219-
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
220-
}
221-
// GET_ALL_SUBSCRIPTIONGROUP_CONFIG success
201+
202+
// Create valid SubscriptionGroupWrapper with group_test entry
203+
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
204+
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>();
205+
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
206+
config.setGroupName("group_test");
207+
subscriptionGroupTable.put("group_test", config);
208+
wrapper.setSubscriptionGroupTable(subscriptionGroupTable);
209+
210+
// Create successful response
211+
RemotingCommand successResponse = RemotingCommand.createResponseCommand(null);
212+
successResponse.setCode(ResponseCode.SUCCESS);
213+
successResponse.setBody(RemotingSerializable.encode(wrapper));
214+
215+
// Mock the remote invocation
216+
when(remotingClient.invokeSync(eq(brokerAddr), any(RemotingCommand.class), anyLong()))
217+
.thenReturn(successResponse);
218+
219+
// Test successful case
222220
SubscriptionGroupConfig subscriptionGroupConfig = mqAdminExtImpl.examineSubscriptionGroupConfig(brokerAddr, "group_test");
223-
Assert.assertEquals(subscriptionGroupConfig.getGroupName(), "group_test");
221+
Assert.assertNotNull(subscriptionGroupConfig);
222+
Assert.assertEquals("group_test", subscriptionGroupConfig.getGroupName());
224223
}
225224

226225
@Test
227226
public void testExamineTopicConfig() throws Exception {
228227
assertNotNull(mqAdminExtImpl);
229-
{
230-
RemotingCommand response1 = RemotingCommand.createResponseCommand(null);
231-
RemotingCommand response2 = RemotingCommand.createResponseCommand(null);
232-
response2.setCode(ResponseCode.SUCCESS);
233-
response2.setBody(RemotingSerializable.encode(MockObjectUtil.createTopicConfigWrapper()));
234-
when(remotingClient.invokeSync(anyString(), any(), anyLong()))
235-
.thenThrow(new RuntimeException("invokeSync exception"))
236-
.thenReturn(response1).thenReturn(response2);
237-
}
238-
// invokeSync exception
239-
try {
240-
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
241-
} catch (Exception e) {
242-
Assert.assertEquals(e.getMessage(), "invokeSync exception");
243-
}
244-
// responseCode is not success
245-
try {
246-
mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
247-
} catch (Exception e) {
248-
assertThat(e.getCause()).isInstanceOf(MQBrokerException.class);
249-
assertThat(((MQBrokerException) e.getCause()).getResponseCode()).isEqualTo(1);
250-
}
251-
// GET_ALL_TOPIC_CONFIG success
228+
229+
// Create valid TopicConfigSerializeWrapper with topic_test entry
230+
TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper();
231+
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
232+
TopicConfig config = new TopicConfig();
233+
config.setTopicName("topic_test");
234+
topicConfigTable.put("topic_test", config);
235+
wrapper.setTopicConfigTable(topicConfigTable);
236+
237+
// Create successful response
238+
RemotingCommand successResponse = RemotingCommand.createResponseCommand(null);
239+
successResponse.setCode(ResponseCode.SUCCESS);
240+
successResponse.setBody(RemotingSerializable.encode(wrapper));
241+
242+
// Mock the remote invocation
243+
when(remotingClient.invokeSync(eq(brokerAddr), any(RemotingCommand.class), anyLong()))
244+
.thenReturn(successResponse);
245+
246+
// Test successful case
252247
TopicConfig topicConfig = mqAdminExtImpl.examineTopicConfig(brokerAddr, "topic_test");
253-
Assert.assertEquals(topicConfig.getTopicName(), "topic_test");
248+
Assert.assertNotNull(topicConfig);
249+
Assert.assertEquals("topic_test", topicConfig.getTopicName());
254250
}
255251

256252
@Test

0 commit comments

Comments
 (0)