Skip to content

Commit c977c68

Browse files
committed
[#ISSUE 205] Support query consumer's stack from dashboard
1 parent 21dc2ac commit c977c68

File tree

13 files changed

+206
-6
lines changed

13 files changed

+206
-6
lines changed

src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,11 @@ public Object getConsumerRunningInfo(@RequestParam String consumerGroup, @Reques
117117
@RequestParam boolean jstack) {
118118
return consumerService.getConsumerRunningInfo(consumerGroup, clientId, jstack);
119119
}
120+
121+
@RequestMapping(value = "/consumerStack.query")
122+
@ResponseBody
123+
public Object getConsumerStack(@RequestParam String consumerGroup, @RequestParam String clientId,
124+
@RequestParam boolean jstack) {
125+
return consumerService.getConsumerStack(consumerGroup, clientId, jstack);
126+
}
120127
}

src/main/java/org/apache/rocketmq/dashboard/controller/OpsController.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public Object addNameSvrAddr(@RequestParam String newNamesrvAddr) {
5656
opsService.addNameSvrAddr(newNamesrvAddr);
5757
return true;
5858
}
59+
@RequestMapping(value = "/delNameSvrAddr.do", method = RequestMethod.POST)
60+
@ResponseBody
61+
public Object delNameSvrAddr(@RequestParam String namesrvAddr) {
62+
Preconditions.checkArgument(StringUtils.isNotEmpty(namesrvAddr),
63+
"namesrvAddr can not be blank");
64+
opsService.delNameSvrAddr(namesrvAddr);
65+
return true;
66+
}
5967

6068
@RequestMapping(value = "/updateIsVIPChannel.do", method = RequestMethod.POST)
6169
@ResponseBody
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.dashboard.model;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Builder;
22+
import lombok.Data;
23+
24+
import java.util.Map;
25+
26+
@Builder
27+
@Data
28+
@AllArgsConstructor
29+
public class StackResult {
30+
31+
private Map<String, String> stackMap;
32+
}

src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.rocketmq.dashboard.service;
1919

20+
import org.apache.rocketmq.dashboard.model.StackResult;
2021
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
2122
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
2223
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
@@ -55,4 +56,6 @@ public interface ConsumerService {
5556
ConsumerConnection getConsumerConnection(String consumerGroup);
5657

5758
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
59+
60+
StackResult getConsumerStack(String consumerGroup, String clientId, boolean jstack);
5861
}

src/main/java/org/apache/rocketmq/dashboard/service/OpsService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ public interface OpsService {
3333
boolean updateUseTLS(boolean useTLS);
3434

3535
void addNameSvrAddr(String namesrvAddr);
36+
void delNameSvrAddr(String namesrvAddr);
3637
}

src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
package org.apache.rocketmq.dashboard.service.impl;
1919

2020
import com.google.common.base.Predicate;
21+
import com.google.common.base.Splitter;
2122
import com.google.common.base.Throwables;
2223
import com.google.common.collect.Iterables;
2324
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
2526
import com.google.common.collect.Sets;
27+
28+
import java.util.ArrayList;
2629
import java.util.Arrays;
2730
import java.util.Collections;
31+
import java.util.HashMap;
2832
import java.util.HashSet;
2933
import java.util.List;
3034
import java.util.Map;
@@ -40,10 +44,16 @@
4044
import java.util.stream.Collectors;
4145
import javax.annotation.Resource;
4246
import org.apache.commons.collections.CollectionUtils;
47+
import org.apache.commons.collections.MapUtils;
4348
import org.apache.commons.lang3.StringUtils;
4449
import org.apache.rocketmq.client.exception.MQClientException;
4550
import org.apache.rocketmq.common.MQVersion;
4651
import org.apache.rocketmq.common.MixAll;
52+
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
53+
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
54+
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
55+
import org.apache.rocketmq.dashboard.model.StackResult;
56+
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
4757
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
4858
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
4959
import org.apache.rocketmq.common.message.MessageQueue;
@@ -58,10 +68,6 @@
5868
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
5969
import org.apache.rocketmq.common.utils.ThreadUtils;
6070
import org.apache.rocketmq.dashboard.config.RMQConfigure;
61-
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
62-
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
63-
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
64-
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
6571
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
6672
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
6773
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
@@ -440,4 +446,59 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c
440446
throw new RuntimeException(e);
441447
}
442448
}
449+
@Override
450+
public StackResult getConsumerStack(String consumerGroup, String clientId, boolean jstack) {
451+
ConsumerRunningInfo consumerRunningInfo = getConsumerRunningInfo(consumerGroup, clientId, jstack);
452+
Map<String, String> stackMap = new HashMap<>();
453+
Map<String, List<String>> map = formatThreadStack(consumerRunningInfo.getJstack());
454+
if (MapUtils.isNotEmpty(map)) {
455+
Set<String> threads = map.keySet();
456+
for (String thread : threads) {
457+
StringBuilder result = new StringBuilder();
458+
map.get(thread).forEach(s -> result.append(s).append("\n"));
459+
stackMap.put(thread, result.toString());
460+
}
461+
}
462+
return new StackResult(stackMap);
463+
}
464+
465+
private Map<String, List<String>> formatThreadStack(String stack) {
466+
Map<String, List<String>> threadStackMap = new HashMap<>();
467+
List<String> stackList = Splitter.on("\n\n").splitToList(stack);
468+
for (String threadStack : stackList) {
469+
List<String> stacks = Splitter.on("\n").splitToList(threadStack);
470+
if (CollectionUtils.isNotEmpty(stacks)) {
471+
List<String> elements = new ArrayList<>();
472+
String threadName = null;
473+
for (String s : stacks) {
474+
List<String> stackItem = Splitter.on(" ")
475+
.omitEmptyStrings()
476+
.trimResults()
477+
.splitToList(s);
478+
if (stackItem.size() == 1) {
479+
String stackStr = stackItem.get(0);
480+
if (threadName == null) {
481+
int index = stackStr.indexOf("TID");
482+
if (index != -1) {
483+
threadName = stackStr.substring(0, index);
484+
}
485+
} else {
486+
elements.add(stackStr.substring(threadName.length(), stackStr.length()));
487+
}
488+
}
489+
if (stackItem.size() == 2) {
490+
if (threadName == null) {
491+
threadName = stackItem.get(0);
492+
}
493+
elements.add(stackItem.get(stackItem.size() - 1));
494+
}
495+
}
496+
if (threadName != null) {
497+
threadStackMap.put(threadName, elements);
498+
}
499+
}
500+
}
501+
return threadStackMap;
502+
}
503+
443504
}

src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,12 @@ public void addNameSvrAddr(String namesrvAddr) {
9595
}
9696
configure.setNamesrvAddrs(namesrvAddrs);
9797
}
98+
@Override
99+
public void delNameSvrAddr(String namesrvAddr) {
100+
List<String> namesrvAddrs = configure.getNamesrvAddrs();
101+
if (namesrvAddrs != null && namesrvAddrs.contains(namesrvAddr)) {
102+
namesrvAddrs.remove(namesrvAddr);
103+
}
104+
configure.setNamesrvAddrs(namesrvAddrs);
105+
}
98106
}

src/main/resources/static/src/consumer.js

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
255255
console.log(resp);
256256
ngDialog.open({
257257
template: 'clientInfoDialog',
258-
// controller: 'addTopicDialogController',
258+
controller: 'consumerStackDialogController',
259259
data: {data: resp.data, consumerGroupName: consumerGroupName}
260260
});
261261
} else {
@@ -403,4 +403,33 @@ module.controller('consumerTopicViewDialogController', ['$scope', 'ngDialog', '$
403403
});
404404
};
405405
}]
406+
);
407+
408+
module.controller('consumerStackDialogController', ['$scope', 'ngDialog', '$http', 'Notification', function ($scope, ngDialog, $http, Notification) {
409+
$scope.consumerStack = function (consumerGroup, clientId, jstack) {
410+
$http({
411+
method: "GET",
412+
url: "consumer/consumerStack.query",
413+
params: {
414+
consumerGroup: consumerGroup
415+
,
416+
clientId: clientId,
417+
jstack: jstack
418+
}
419+
}).success(function (resp) {
420+
if (resp.status == 0) {
421+
console.log(resp);
422+
ngDialog.open({
423+
template: 'consumerStackDialog',
424+
data: {
425+
consumerStack: resp.data,
426+
clientId: clientId
427+
}
428+
});
429+
} else {
430+
Notification.error({message: resp.errMsg, delay: 2000});
431+
}
432+
});
433+
};
434+
}]
406435
);

src/main/resources/static/src/i18n/en.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,5 @@ var en = {
133133
"MESSAGE_TYPE_FIFO": "FIFO",
134134
"MESSAGE_TYPE_DELAY": "DELAY",
135135
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
136+
"STACK": "STACK"
136137
}

src/main/resources/static/src/i18n/zh.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,5 @@ var zh = {
134134
"MESSAGE_TYPE_FIFO": "顺序消息",
135135
"MESSAGE_TYPE_DELAY": "定时/延时消息",
136136
"MESSAGE_TYPE_TRANSACTION": "事务消息",
137+
"STACK": "堆栈"
137138
}

0 commit comments

Comments
 (0)