Skip to content

Commit 8ccf839

Browse files
committed
[#ISSUE 205] Support query consumer's stack from dashboard
1 parent d58e13d commit 8ccf839

File tree

13 files changed

+205
-6
lines changed

13 files changed

+205
-6
lines changed

src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,11 @@ public Object getConsumerRunningInfo(@RequestParam String consumerGroup, @Reques
117117
@RequestParam boolean jstack) {
118118
return consumerService.getConsumerRunningInfo(consumerGroup, clientId, jstack);
119119
}
120+
121+
@RequestMapping(value = "/consumerStack.query")
122+
@ResponseBody
123+
public Object getConsumerStack(@RequestParam String consumerGroup, @RequestParam String clientId,
124+
@RequestParam boolean jstack) {
125+
return consumerService.getConsumerStack(consumerGroup, clientId, jstack);
126+
}
120127
}

src/main/java/org/apache/rocketmq/dashboard/controller/OpsController.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public Object addNameSvrAddr(@RequestParam String newNamesrvAddr) {
5656
opsService.addNameSvrAddr(newNamesrvAddr);
5757
return true;
5858
}
59+
@RequestMapping(value = "/delNameSvrAddr.do", method = RequestMethod.POST)
60+
@ResponseBody
61+
public Object delNameSvrAddr(@RequestParam String namesrvAddr) {
62+
Preconditions.checkArgument(StringUtils.isNotEmpty(namesrvAddr),
63+
"namesrvAddr can not be blank");
64+
opsService.delNameSvrAddr(namesrvAddr);
65+
return true;
66+
}
5967

6068
@RequestMapping(value = "/updateIsVIPChannel.do", method = RequestMethod.POST)
6169
@ResponseBody
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.dashboard.model;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Builder;
22+
import lombok.Data;
23+
24+
import java.util.Map;
25+
26+
@Builder
27+
@Data
28+
@AllArgsConstructor
29+
public class StackResult {
30+
31+
private Map<String, String> stackMap;
32+
}

src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.rocketmq.dashboard.service;
1919

20+
import org.apache.rocketmq.dashboard.model.StackResult;
2021
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
2122
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
2223
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
@@ -55,4 +56,6 @@ public interface ConsumerService {
5556
ConsumerConnection getConsumerConnection(String consumerGroup, String address);
5657

5758
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack);
59+
60+
StackResult getConsumerStack(String consumerGroup, String clientId, boolean jstack);
5861
}

src/main/java/org/apache/rocketmq/dashboard/service/OpsService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ public interface OpsService {
3333
boolean updateUseTLS(boolean useTLS);
3434

3535
void addNameSvrAddr(String namesrvAddr);
36+
void delNameSvrAddr(String namesrvAddr);
3637
}

src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.rocketmq.dashboard.service.impl;
1919

2020
import com.google.common.base.Predicate;
21+
import com.google.common.base.Splitter;
2122
import com.google.common.base.Throwables;
2223
import com.google.common.collect.Iterables;
2324
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
2526
import com.google.common.collect.Sets;
27+
import java.util.ArrayList;
28+
2629
import java.util.ArrayList;
2730
import java.util.Arrays;
2831
import java.util.Collections;
@@ -42,10 +45,16 @@
4245
import java.util.stream.Collectors;
4346
import javax.annotation.Resource;
4447
import org.apache.commons.collections.CollectionUtils;
48+
import org.apache.commons.collections.MapUtils;
4549
import org.apache.commons.lang3.StringUtils;
4650
import org.apache.rocketmq.client.exception.MQClientException;
4751
import org.apache.rocketmq.common.MQVersion;
4852
import org.apache.rocketmq.common.MixAll;
53+
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
54+
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
55+
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
56+
import org.apache.rocketmq.dashboard.model.StackResult;
57+
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
4958
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
5059
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
5160
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
@@ -61,10 +70,6 @@
6170
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
6271
import org.apache.rocketmq.common.utils.ThreadUtils;
6372
import org.apache.rocketmq.dashboard.config.RMQConfigure;
64-
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
65-
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
66-
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
67-
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
6873
import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
6974
import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
7075
import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
@@ -482,4 +487,59 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c
482487
throw new RuntimeException(e);
483488
}
484489
}
490+
@Override
491+
public StackResult getConsumerStack(String consumerGroup, String clientId, boolean jstack) {
492+
ConsumerRunningInfo consumerRunningInfo = getConsumerRunningInfo(consumerGroup, clientId, jstack);
493+
Map<String, String> stackMap = new HashMap<>();
494+
Map<String, List<String>> map = formatThreadStack(consumerRunningInfo.getJstack());
495+
if (MapUtils.isNotEmpty(map)) {
496+
Set<String> threads = map.keySet();
497+
for (String thread : threads) {
498+
StringBuilder result = new StringBuilder();
499+
map.get(thread).forEach(s -> result.append(s).append("\n"));
500+
stackMap.put(thread, result.toString());
501+
}
502+
}
503+
return new StackResult(stackMap);
504+
}
505+
506+
private Map<String, List<String>> formatThreadStack(String stack) {
507+
Map<String, List<String>> threadStackMap = new HashMap<>();
508+
List<String> stackList = Splitter.on("\n\n").splitToList(stack);
509+
for (String threadStack : stackList) {
510+
List<String> stacks = Splitter.on("\n").splitToList(threadStack);
511+
if (CollectionUtils.isNotEmpty(stacks)) {
512+
List<String> elements = new ArrayList<>();
513+
String threadName = null;
514+
for (String s : stacks) {
515+
List<String> stackItem = Splitter.on(" ")
516+
.omitEmptyStrings()
517+
.trimResults()
518+
.splitToList(s);
519+
if (stackItem.size() == 1) {
520+
String stackStr = stackItem.get(0);
521+
if (threadName == null) {
522+
int index = stackStr.indexOf("TID");
523+
if (index != -1) {
524+
threadName = stackStr.substring(0, index);
525+
}
526+
} else {
527+
elements.add(stackStr.substring(threadName.length(), stackStr.length()));
528+
}
529+
}
530+
if (stackItem.size() == 2) {
531+
if (threadName == null) {
532+
threadName = stackItem.get(0);
533+
}
534+
elements.add(stackItem.get(stackItem.size() - 1));
535+
}
536+
}
537+
if (threadName != null) {
538+
threadStackMap.put(threadName, elements);
539+
}
540+
}
541+
}
542+
return threadStackMap;
543+
}
544+
485545
}

src/main/java/org/apache/rocketmq/dashboard/service/impl/OpsServiceImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,12 @@ public void addNameSvrAddr(String namesrvAddr) {
9595
}
9696
configure.setNamesrvAddrs(namesrvAddrs);
9797
}
98+
@Override
99+
public void delNameSvrAddr(String namesrvAddr) {
100+
List<String> namesrvAddrs = configure.getNamesrvAddrs();
101+
if (namesrvAddrs != null && namesrvAddrs.contains(namesrvAddr)) {
102+
namesrvAddrs.remove(namesrvAddr);
103+
}
104+
configure.setNamesrvAddrs(namesrvAddrs);
105+
}
98106
}

src/main/resources/static/src/consumer.js

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific
273273
console.log(resp);
274274
ngDialog.open({
275275
template: 'clientInfoDialog',
276-
// controller: 'addTopicDialogController',
276+
controller: 'consumerStackDialogController',
277277
data: {data: resp.data, consumerGroupName: consumerGroupName}
278278
});
279279
} else {
@@ -421,4 +421,33 @@ module.controller('consumerTopicViewDialogController', ['$scope', 'ngDialog', '$
421421
});
422422
};
423423
}]
424+
);
425+
426+
module.controller('consumerStackDialogController', ['$scope', 'ngDialog', '$http', 'Notification', function ($scope, ngDialog, $http, Notification) {
427+
$scope.consumerStack = function (consumerGroup, clientId, jstack) {
428+
$http({
429+
method: "GET",
430+
url: "consumer/consumerStack.query",
431+
params: {
432+
consumerGroup: consumerGroup
433+
,
434+
clientId: clientId,
435+
jstack: jstack
436+
}
437+
}).success(function (resp) {
438+
if (resp.status == 0) {
439+
console.log(resp);
440+
ngDialog.open({
441+
template: 'consumerStackDialog',
442+
data: {
443+
consumerStack: resp.data,
444+
clientId: clientId
445+
}
446+
});
447+
} else {
448+
Notification.error({message: resp.errMsg, delay: 2000});
449+
}
450+
});
451+
};
452+
}]
424453
);

src/main/resources/static/src/i18n/en.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,5 @@ var en = {
134134
"MESSAGE_TYPE_FIFO": "FIFO",
135135
"MESSAGE_TYPE_DELAY": "DELAY",
136136
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
137+
"STACK": "STACK"
137138
}

src/main/resources/static/src/i18n/zh.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,5 @@ var zh = {
135135
"MESSAGE_TYPE_FIFO": "顺序消息",
136136
"MESSAGE_TYPE_DELAY": "定时/延时消息",
137137
"MESSAGE_TYPE_TRANSACTION": "事务消息",
138+
"STACK": "堆栈"
138139
}

0 commit comments

Comments
 (0)