Skip to content

Commit be60ae8

Browse files
authored
Merge pull request #560 from didi/dev
合并开发分支
2 parents 1aa759e + 7a3d155 commit be60ae8

File tree

15 files changed

+178
-86
lines changed

15 files changed

+178
-86
lines changed

docs/dev_guide/解决连接JMX失败.md

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11

2-
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
32

3+
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
44

55
## JMX-连接失败问题解决
66

7-
- [JMX-连接失败问题解决](#jmx-连接失败问题解决)
8-
- [1、问题&说明](#1问题说明)
9-
- [2、解决方法](#2解决方法)
10-
- [3、解决方法 —— 认证的JMX](#3解决方法--认证的jmx)
11-
12-
集群正常接入Logi-KafkaManager之后,即可以看到集群的Broker列表,此时如果查看不了Topic的实时流量,或者是Broker的实时流量信息时,那么大概率就是JMX连接的问题了。
7+
集群正常接入`KnowStreaming`之后,即可以看到集群的Broker列表,此时如果查看不了Topic的实时流量,或者是Broker的实时流量信息时,那么大概率就是`JMX`连接的问题了。
138

149
下面我们按照步骤来一步一步的检查。
1510

16-
### 1、问题&说明
11+
### 1、问题说明
1712

1813
**类型一:JMX配置未开启**
1914

@@ -43,6 +38,26 @@ java.rmi.ConnectException: Connection refused to host: 192.168.0.1; nested excep
4338
java.rmi.ConnectException: Connection refused to host: 127.0.0.1;; nested exception is:
4439
```
4540

41+
**类型三:连接特定IP**
42+
43+
Broker 配置了内外网,而JMX在配置时,可能配置了内网IP或者外网IP,此时 `KnowStreaming` 需要连接到特定网络的IP才可以进行访问。
44+
45+
比如:
46+
47+
Broker在ZK的存储结构如下所示,我们期望连接到 `endpoints` 中标记为 `INTERNAL` 的地址,但是 `KnowStreaming` 却连接了 `EXTERNAL` 的地址,此时可以看 `4、解决方法 —— JMX连接特定网络` 进行解决。
48+
49+
```json
50+
{
51+
"listener_security_protocol_map": {"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"},
52+
"endpoints": ["EXTERNAL://192.168.0.1:7092","INTERNAL://192.168.0.2:7093"],
53+
"jmx_port": 8099,
54+
"host": "192.168.0.1",
55+
"timestamp": "1627289710439",
56+
"port": -1,
57+
"version": 4
58+
}
59+
```
60+
4661
### 2、解决方法
4762

4863
这里仅介绍一下比较通用的解决方式,如若有更好的方式,欢迎大家指导告知一下。
@@ -76,26 +91,36 @@ fi
7691

7792
如果您是直接看的这个部分,建议先看一下上一节:`2、解决方法`以确保`JMX`的配置没有问题了。
7893

79-
在JMX的配置等都没有问题的情况下,如果是因为认证的原因导致连接不了的,此时可以使用下面介绍的方法进行解决。
94+
`JMX`的配置等都没有问题的情况下,如果是因为认证的原因导致连接不了的,可以在集群接入界面配置你的`JMX`认证信息。
95+
96+
<img src='http://img-ys011.didistatic.com/static/dc2img/do1_EUU352qMEX1Jdp7pxizp' width=350>
97+
8098

81-
**当前这块后端刚刚开发完成,可能还不够完善,有问题随时沟通。**
8299

83-
`Logi-KafkaManager 2.2.0+`之后的版本后端已经支持`JMX`认证方式的连接,但是还没有界面,此时我们可以往`cluster`表的`jmx_properties`字段写入`JMX`的认证信息。
100+
### 4、解决方法 —— JMX连接特定网络
84101

85-
这个数据是`json`格式的字符串,例子如下所示:
102+
可以手动往`ks_km_physical_cluster`表的`jmx_properties`字段增加一个`useWhichEndpoint`字段,从而控制 `KnowStreaming` 连接到特定的JMX IP及PORT。
86103

104+
`jmx_properties`格式:
87105
```json
88106
{
89-
"maxConn": 10, # KM对单台Broker的最大JMX连接数
90-
"username": "xxxxx", # 用户名
91-
"password": "xxxx", # 密码
107+
"maxConn": 100, # KM对单台Broker的最大JMX连接数
108+
"username": "xxxxx", # 用户名,可以不填写
109+
"password": "xxxx", # 密码,可以不填写
92110
"openSSL": true, # 开启SSL, true表示开启ssl, false表示关闭
111+
"useWhichEndpoint": "EXTERNAL" #指定要连接的网络名称,填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
93112
}
94113
```
95114

96115
&nbsp;
97116

98-
SQL的例子
117+
SQL例子
99118
```sql
100-
UPDATE cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false }' where id={xxx};
101-
```
119+
UPDATE ks_km_physical_cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false , "useWhichEndpoint": "xxx"}' where id={xxx};
120+
```
121+
122+
注意:
123+
124+
+ 目前此功能只支持采用 `ZK` 做分布式协调的kafka集群。
125+
126+

docs/install_guide/版本升级手册.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,31 @@
44

55
### 6.2.0、升级至 `master` 版本
66

7-
暂无
7+
**SQL变更**
8+
9+
```sql
10+
-- 多集群管理权限2022-09-06新增
11+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2000', '多集群管理查看', '1593', '1', '2', '多集群管理查看', '0', 'know-streaming');
12+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2002', 'Topic-迁移副本', '1593', '1', '2', 'Topic-迁移副本', '0', 'know-streaming');
13+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2004', 'Topic-扩缩副本', '1593', '1', '2', 'Topic-扩缩副本', '0', 'know-streaming');
14+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2006', 'Cluster-LoadReBalance-周期均衡', '1593', '1', '2', 'Cluster-LoadReBalance-周期均衡', '0', 'know-streaming');
15+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2008', 'Cluster-LoadReBalance-立即均衡', '1593', '1', '2', 'Cluster-LoadReBalance-立即均衡', '0', 'know-streaming');
16+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2010', 'Cluster-LoadReBalance-设置集群规格', '1593', '1', '2', 'Cluster-LoadReBalance-设置集群规格', '0', 'know-streaming');
17+
18+
19+
-- 系统管理权限2022-09-06新增
20+
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('3000', '系统管理查看', '1595', '1', '2', '系统管理查看', '0', 'know-streaming');
21+
22+
23+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2000', '0', 'know-streaming');
24+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2002', '0', 'know-streaming');
25+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2004', '0', 'know-streaming');
26+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2006', '0', 'know-streaming');
27+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2008', '0', 'know-streaming');
28+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2010', '0', 'know-streaming');
29+
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '3000', '0', 'know-streaming');
30+
31+
```
832

933
---
1034

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
1515
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;
1616
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
17+
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
1718
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
1819
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
1920
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
@@ -71,14 +72,18 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
7172
Topic groupTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME);
7273
Topic transactionTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME);
7374

75+
//获取controller信息
76+
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
77+
7478
// 格式转换
7579
return PaginationResult.buildSuc(
7680
this.convert2ClusterBrokersOverviewVOList(
7781
paginationResult.getData().getBizData(),
7882
brokerList,
7983
metricsResult.getData(),
8084
groupTopic,
81-
transactionTopic
85+
transactionTopic,
86+
kafkaController
8287
),
8388
paginationResult
8489
);
@@ -159,7 +164,8 @@ private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List
159164
List<Broker> brokerList,
160165
List<BrokerMetrics> metricsList,
161166
Topic groupTopic,
162-
Topic transactionTopic) {
167+
Topic transactionTopic,
168+
KafkaController kafkaController) {
163169
Map<Integer, BrokerMetrics> metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
164170

165171
Map<Integer, Broker> brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
@@ -169,12 +175,12 @@ private List<ClusterBrokersOverviewVO> convert2ClusterBrokersOverviewVOList(List
169175
Broker broker = brokerMap.get(brokerId);
170176
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
171177

172-
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic));
178+
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController));
173179
}
174180
return voList;
175181
}
176182

177-
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic) {
183+
private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) {
178184
ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO();
179185
clusterBrokersOverviewVO.setBrokerId(brokerId);
180186
if (broker != null) {
@@ -192,6 +198,9 @@ private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer broker
192198
if (transactionTopic != null && transactionTopic.getBrokerIdSet().contains(brokerId)) {
193199
clusterBrokersOverviewVO.getKafkaRoleList().add(transactionTopic.getTopicName());
194200
}
201+
if (kafkaController != null && kafkaController.getBrokerId().equals(brokerId)) {
202+
clusterBrokersOverviewVO.getKafkaRoleList().add(KafkaConstant.CONTROLLER_ROLE);
203+
}
195204

196205
clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics);
197206
return clusterBrokersOverviewVO;

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhy
7575
}
7676

7777
if (!paginationResult.hasData()) {
78-
return PaginationResult.buildSuc(dto);
78+
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
7979
}
8080

8181
// 获取指标

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class KafkaConstant {
4141

4242
public static final Long POLL_ONCE_TIMEOUT_UNIT_MS = 2000L;
4343

44+
public static final String CONTROLLER_ROLE = "controller";
45+
4446
public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();
4547

4648
static {

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import kafka.zk.TopicsZNode;
2828
import org.apache.kafka.clients.admin.*;
2929
import org.apache.kafka.common.TopicPartitionInfo;
30+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3031
import org.springframework.beans.factory.annotation.Autowired;
3132
import org.springframework.dao.DuplicateKeyException;
3233
import org.springframework.stereotype.Service;
3334

3435
import java.util.*;
36+
import java.util.concurrent.ExecutionException;
3537
import java.util.concurrent.TimeUnit;
3638
import java.util.function.Function;
3739
import java.util.stream.Collectors;
@@ -84,6 +86,13 @@ public Map<Integer, List<Integer>> getTopicPartitionMapFromKafka(Long clusterPhy
8486
}
8587

8688
return partitionMap;
89+
} catch (ExecutionException e) {
90+
log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e);
91+
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
92+
throw new AdminOperateException(String.format("Kafka does not host Topic:[%s]", topicName), e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED);
93+
}
94+
95+
throw new AdminOperateException("get topic info from kafka failed", e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED);
8796
} catch (Exception e) {
8897
log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e);
8998
throw new AdminOperateException("get topic info from kafka failed", e, ResultStatus.KAFKA_OPERATE_FAILED);

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/PartitionMetricVersionItems.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public List<VersionMetricControlItem> init(){
3737

3838
// LogEndOffset 指标
3939
itemList.add( buildAllVersionsItem()
40-
.name(PARTITION_METRIC_LOG_END_OFFSET).unit("").desc("Partition中Leader副本的LogEndOffset")
40+
.name(PARTITION_METRIC_LOG_END_OFFSET).unit("").desc("Partition中Leader副本的LogEndOffset")
4141
.extendMethod(PARTITION_METHOD_GET_OFFSET_RELEVANT_METRICS));
4242

4343
// LogStartOffset 指标
4444
itemList.add( buildAllVersionsItem()
45-
.name(PARTITION_METRIC_LOG_START_OFFSET).unit("").desc("Partition中Leader副本的LogStartOffset")
45+
.name(PARTITION_METRIC_LOG_START_OFFSET).unit("").desc("Partition中Leader副本的LogStartOffset")
4646
.extendMethod(PARTITION_METHOD_GET_OFFSET_RELEVANT_METRICS));
4747

4848
// Messages

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ReplicaMetricVersionItems.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ public List<VersionMetricControlItem> init(){
3636

3737
// LogEndOffset 指标
3838
itemList.add(buildAllVersionsItem()
39-
.name(REPLICATION_METRIC_LOG_END_OFFSET).unit("").desc("副本的LogEndOffset")
39+
.name(REPLICATION_METRIC_LOG_END_OFFSET).unit("").desc("副本的LogEndOffset")
4040
.extend(buildJMXMethodExtend(REPLICATION_METHOD_GET_METRIC_FROM_JMX )
4141
.jmxObjectName( JMX_LOG_LOG_END_OFFSET ).jmxAttribute(VALUE)));
4242

4343
// LogStartOffset 指标
4444
itemList.add(buildAllVersionsItem()
45-
.name( REPLICATION_METRIC_LOG_START_OFFSET ).unit("").desc("副本的LogStartOffset")
45+
.name( REPLICATION_METRIC_LOG_START_OFFSET ).unit("").desc("副本的LogStartOffset")
4646
.extend(buildJMXMethodExtend(REPLICATION_METHOD_GET_METRIC_FROM_JMX )
4747
.jmxObjectName( JMX_LOG_LOG_START_OFFSET ).jmxAttribute(VALUE)));
4848

0 commit comments

Comments
 (0)