Skip to content

Commit 7699acf

Browse files
authored
Merge pull request #531 from didi/dev_v3.0.0
1、后端补充leader选举能力;2、图片链接调整;3、健康检查文案调整;4、版本列表增加排序;5、指标采集缓存时间调整;
2 parents 9189a54 + 6e05824 commit 7699acf

File tree

17 files changed

+230
-77
lines changed

17 files changed

+230
-77
lines changed

docs/dev_guide/解决连接JMX失败.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
未开启时,直接到`2、解决方法`查看如何开启即可。
2121

22-
![check_jmx_opened](./assets/connect_jmx_failed/check_jmx_opened.jpg)
22+
![check_jmx_opened](http://img-ys011.didistatic.com/static/dc2img/do1_dRX6UHE2IUSHqsN95DGb)
2323

2424

2525
**类型二:配置错误**
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
import org.apache.kafka.common.TopicPartition;
7+
8+
import java.util.List;
9+
10+
@Data
11+
@NoArgsConstructor
12+
public class BatchPartitionParam extends ClusterPhyParam {
13+
private List<TopicPartition> tpList;
14+
15+
public BatchPartitionParam(Long clusterPhyId, List<TopicPartition> tpList) {
16+
super(clusterPhyId);
17+
this.tpList = tpList;
18+
}
19+
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,31 @@ public enum HealthCheckNameEnum {
2626
HealthCheckDimensionEnum.CLUSTER,
2727
"Controller",
2828
Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER",
29-
"集群Controller数错误",
29+
"集群Controller数正常",
3030
HealthCompareValueConfig.class
3131
),
3232

3333
BROKER_REQUEST_QUEUE_FULL(
3434
HealthCheckDimensionEnum.BROKER,
3535
"RequestQueueSize",
3636
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL",
37-
"Broker-RequestQueueSize被打满",
37+
"Broker-RequestQueueSize指标",
3838
HealthCompareValueConfig.class
3939
),
4040

4141
BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW(
4242
HealthCheckDimensionEnum.BROKER,
4343
"NetworkProcessorAvgIdlePercent",
4444
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW",
45-
"Broker-NetworkProcessorAvgIdlePercent的Idle过低",
45+
"Broker-NetworkProcessorAvgIdlePercent指标",
4646
HealthCompareValueConfig.class
4747
),
4848

4949
GROUP_RE_BALANCE_TOO_FREQUENTLY(
5050
HealthCheckDimensionEnum.GROUP,
5151
"Group Re-Balance",
5252
Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY",
53-
"Group re-balance太频繁",
53+
"Group re-balance频率",
5454
HealthDetectedInLatestMinutesConfig.class
5555
),
5656

@@ -66,7 +66,7 @@ public enum HealthCheckNameEnum {
6666
HealthCheckDimensionEnum.TOPIC,
6767
"UnderReplicaTooLong",
6868
Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG",
69-
"Topic 长期处于未同步状态",
69+
"Topic 未同步持续时间",
7070
HealthDetectedInLatestMinutesConfig.class
7171
),
7272

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ public enum VersionItemTypeEnum {
3131

3232

3333
SERVICE_OP_PARTITION(320, "service_partition_operation"),
34+
SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"),
3435

3536
SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),
3637

38+
3739
/**
3840
* 前端操作
3941
*/

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
public class CollectedMetricsLocalCache {
1212
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
13-
.expireAfterWrite(60, TimeUnit.SECONDS)
14-
.maximumSize(2000)
13+
.expireAfterWrite(90, TimeUnit.SECONDS)
14+
.maximumSize(10000)
1515
.build();
1616

1717
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
1818
.expireAfterWrite(90, TimeUnit.SECONDS)
19-
.maximumSize(5000)
19+
.maximumSize(10000)
2020
.build();
2121

2222
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
@@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache {
2929
.maximumSize(20000)
3030
.build();
3131

32-
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
33-
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
32+
public static Float getBrokerMetrics(String brokerMetricKey) {
33+
return brokerMetricsCache.getIfPresent(brokerMetricKey);
3434
}
3535

36-
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
36+
public static void putBrokerMetrics(String brokerMetricKey, Float value) {
3737
if (value == null) {
3838
return;
3939
}
40-
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
40+
41+
brokerMetricsCache.put(brokerMetricKey, value);
4142
}
4243

43-
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
44-
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
44+
public static List<TopicMetrics> getTopicMetrics(String topicMetricKey) {
45+
return topicMetricsCache.getIfPresent(topicMetricKey);
4546
}
4647

47-
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
48+
public static void putTopicMetrics(String topicMetricKey, List<TopicMetrics> metricsList) {
4849
if (metricsList == null) {
4950
return;
5051
}
51-
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
52+
53+
topicMetricsCache.put(topicMetricKey, metricsList);
5254
}
5355

54-
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
55-
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
56+
public static List<PartitionMetrics> getPartitionMetricsList(String partitionMetricKey) {
57+
return partitionMetricsCache.getIfPresent(partitionMetricKey);
5658
}
5759

58-
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
60+
public static void putPartitionMetricsList(String partitionMetricsKey, List<PartitionMetrics> metricsList) {
5961
if (metricsList == null) {
6062
return;
6163
}
62-
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
64+
partitionMetricsCache.put(partitionMetricsKey, metricsList);
6365
}
6466

65-
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
66-
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
67+
public static Float getReplicaMetrics(String replicaMetricsKey) {
68+
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
6769
}
6870

69-
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
71+
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
7072
if (value == null) {
7173
return;
7274
}
73-
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
75+
replicaMetricsValueCache.put(replicaMetricsKey, value);
7476
}
7577

76-
77-
/**************************************************** private method ****************************************************/
78-
79-
80-
private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
78+
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
8179
return clusterPhyId + "@" + brokerId + "@" + metricName;
8280
}
8381

84-
private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
82+
public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
8583
return clusterPhyId + "@" + topicName + "@" + metricName;
8684
}
8785

88-
private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
86+
public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
8987
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
9088
}
89+
90+
/**************************************************** private method ****************************************************/
91+
9192
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ protected void initRegisterVCHandler(){
110110
}
111111

112112
@Override
113-
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
113+
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) {
114+
String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric);
114115

115-
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
116+
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey);
116117
if(null != keyValue) {
117118
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
118119
brokerMetrics.putMetric(metric, keyValue);
@@ -124,7 +125,7 @@ public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long cl
124125

125126
Map<String, Float> metricsMap = ret.getData().getMetrics();
126127
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
127-
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
128+
CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue());
128129
}
129130

130131
return ret;

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,5 @@ void modifyClusterPhyById(ClusterPhyPO clusterPhyPO, String operator) throws
7373
* 获取系统已存在的kafka版本列表
7474
* @return
7575
*/
76-
Set<String> getClusterVersionSet();
76+
List<String> getClusterVersionList();
7777
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
126126
private TopicMetricService topicMetricService;
127127

128128
@Autowired
129-
private TopicService topicService;
129+
private TopicService topicService;
130130

131131
@Autowired
132132
private PartitionService partitionService;
@@ -728,13 +728,10 @@ private Result<ClusterMetrics> getMetricFromKafkaByTotalBrokersJMX(VersionItemPa
728728
Long clusterId = param.getClusterId();
729729

730730
//1、获取jmx的属性信息
731-
VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric);
732-
if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);}
733-
734731
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
735732

736733
float metricVale = 0f;
737-
for(Broker broker : brokers){
734+
for(Broker broker : brokers) {
738735
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
739736

740737
if(null == ret || ret.failed() || null == ret.getData()){continue;}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import org.springframework.dao.DuplicateKeyException;
2525
import org.springframework.stereotype.Service;
2626

27+
import java.util.ArrayList;
28+
import java.util.Collections;
2729
import java.util.List;
28-
import java.util.Set;
2930
import java.util.stream.Collectors;
3031

3132
/**
@@ -205,9 +206,12 @@ public void modifyClusterPhyById(ClusterPhyPO clusterPhyPO, String operator) thr
205206
}
206207

207208
@Override
208-
public Set<String> getClusterVersionSet() {
209-
List<ClusterPhy> clusterPhyList = listAllClusters();
210-
Set<String> versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet());
211-
return versionSet;
209+
public List<String> getClusterVersionList() {
210+
List<ClusterPhy> clusterPhyList = this.listAllClusters();
211+
212+
List<String> versionList = new ArrayList<>(clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet()));
213+
Collections.sort(versionList);
214+
215+
return versionList;
212216
}
213217
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.xiaojukeji.know.streaming.km.core.service.partition;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
4+
import org.apache.kafka.common.TopicPartition;
5+
6+
import java.util.List;
7+
8+
public interface OpPartitionService {
9+
10+
/**
11+
* 优先副本选举
12+
*/
13+
Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList);
14+
}

0 commit comments

Comments
 (0)