Skip to content

Commit af1bb2c

Browse files
zengqiaoZQKC
authored andcommitted
[Optimize] 删除Replica指标采集任务
1、当集群存在较多副本时,指标采集的性能会严重降低; 2、Replica的指标基本上都是在实时获取时才需要,因此当前先将Replica指标采集任务关闭,后续依据产品需要再看是否开启;
1 parent 714e9a5 commit af1bb2c

File tree

7 files changed

+101
-84
lines changed

7 files changed

+101
-84
lines changed

km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ReplicaMetricCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics
9191
continue;
9292
}
9393

94-
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
94+
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafka(
9595
clusterPhyId,
9696
metrics.getTopic(),
9797
metrics.getBrokerId(),

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ public class CollectedMetricsLocalCache {
2424
.maximumSize(10000)
2525
.build();
2626

27-
private static final Cache<String, Float> replicaMetricsValueCache = Caffeine.newBuilder()
28-
.expireAfterWrite(90, TimeUnit.SECONDS)
29-
.maximumSize(20000)
30-
.build();
31-
3227
public static Float getBrokerMetrics(String brokerMetricKey) {
3328
return brokerMetricsCache.getIfPresent(brokerMetricKey);
3429
}
@@ -64,17 +59,6 @@ public static void putPartitionMetricsList(String partitionMetricsKey, List<Part
6459
partitionMetricsCache.put(partitionMetricsKey, metricsList);
6560
}
6661

67-
public static Float getReplicaMetrics(String replicaMetricsKey) {
68-
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
69-
}
70-
71-
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
72-
if (value == null) {
73-
return;
74-
}
75-
replicaMetricsValueCache.put(replicaMetricsKey, value);
76-
}
77-
7862
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
7963
return clusterPhyId + "@" + brokerId + "@" + metricName;
8064
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ReplicaMetricVersionItems;
3838
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BrokerMetricESDAO;
3939
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
40+
import org.apache.kafka.clients.admin.LogDirDescription;
41+
import org.apache.kafka.clients.admin.ReplicaInfo;
4042
import org.springframework.beans.factory.annotation.Autowired;
4143
import org.springframework.stereotype.Service;
4244
import org.springframework.util.CollectionUtils;
@@ -49,6 +51,7 @@
4951
import java.util.stream.Collectors;
5052

5153
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
54+
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
5255

5356
/**
5457
* @author didi
@@ -105,7 +108,11 @@ protected void initRegisterVCHandler(){
105108
registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore);
106109
registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew);
107110
registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew);
108-
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
111+
// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
112+
113+
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx);
114+
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient);
115+
109116
registerVCHandler( BROKER_METHOD_IS_BROKER_ALIVE, this::isBrokerAlive);
110117
}
111118

@@ -351,7 +358,7 @@ private Result<BrokerMetrics> getPartitionsSkew(VersionItemParam metricParam) {
351358
);
352359
}
353360

354-
private Result<BrokerMetrics> getLogSize(VersionItemParam metricParam) {
361+
private Result<BrokerMetrics> getLogSizeFromJmx(VersionItemParam metricParam) {
355362
BrokerMetricParam param = (BrokerMetricParam)metricParam;
356363

357364
String metric = param.getMetric();
@@ -360,19 +367,17 @@ private Result<BrokerMetrics> getLogSize(VersionItemParam metricParam) {
360367

361368
List<Partition> partitions = partitionService.listPartitionByBroker(clusterId, brokerId);
362369

363-
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId);
364-
if (ValidateUtils.isNull(jmxConnectorWrap)){return Result.buildFailure(VC_JMX_INIT_ERROR);}
365-
366370
Float logSizeSum = 0f;
367371
for(Partition p : partitions) {
368372
try {
369-
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
373+
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafka(
370374
clusterId,
371375
p.getTopicName(),
372376
brokerId,
373377
p.getPartitionId(),
374378
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
375379
);
380+
376381
if(null == metricsResult || metricsResult.failed() || null == metricsResult.getData()) {
377382
continue;
378383
}
@@ -391,6 +396,28 @@ private Result<BrokerMetrics> getLogSize(VersionItemParam metricParam) {
391396
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
392397
}
393398

399+
private Result<BrokerMetrics> getLogSizeFromClient(VersionItemParam metricParam) {
400+
BrokerMetricParam param = (BrokerMetricParam)metricParam;
401+
402+
String metric = param.getMetric();
403+
Long clusterId = param.getClusterId();
404+
Integer brokerId = param.getBrokerId();
405+
406+
Result<Map<String, LogDirDescription>> descriptionMapResult = brokerService.getBrokerLogDirDescFromKafka(clusterId, brokerId);
407+
if(null == descriptionMapResult || descriptionMapResult.failed() || null == descriptionMapResult.getData()) {
408+
return Result.buildFromIgnoreData(descriptionMapResult);
409+
}
410+
411+
Float logSizeSum = 0f;
412+
for (LogDirDescription logDirDescription: descriptionMapResult.getData().values()) {
413+
for (ReplicaInfo replicaInfo: logDirDescription.replicaInfos().values()) {
414+
logSizeSum += replicaInfo.size();
415+
}
416+
}
417+
418+
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
419+
}
420+
394421
private Result<BrokerMetrics> getLeadersSkew(VersionItemParam metricParam) {
395422
BrokerMetricParam param = (BrokerMetricParam)metricParam;
396423

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ public interface ReplicaMetricService {
1313
* 从kafka中采集指标
1414
*/
1515
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric);
16-
Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, String metric);
16+
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList);
1717

1818
/**
1919
* 从ES中获取指标
2020
*/
21+
@Deprecated
2122
Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto);
2223

24+
@Deprecated
2325
Result<ReplicationMetrics> getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List<String> metricNames);
2426
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
1818
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
1919
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
20-
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
2120
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
2221
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
2322
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
@@ -77,32 +76,36 @@ protected VersionItemTypeEnum getVersionItemType() {
7776
}
7877

7978
@Override
80-
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
81-
String topic,
82-
Integer brokerId,
83-
Integer partitionId,
84-
String metric) {
85-
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
86-
87-
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
88-
if(null != keyValue){
89-
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
90-
replicationMetrics.putMetric(metric, keyValue);
91-
return Result.buildSuc(replicationMetrics);
79+
public Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList) {
80+
ReplicationMetrics metrics = new ReplicationMetrics(clusterId, topicName, brokerId, partitionId);
81+
for (String metricName: metricNameList) {
82+
try {
83+
if (metrics.getMetrics().containsKey(metricName)) {
84+
continue;
85+
}
86+
87+
Result<ReplicationMetrics> ret = this.collectReplicaMetricsFromKafka(
88+
clusterId,
89+
metrics.getTopic(),
90+
metrics.getBrokerId(),
91+
metrics.getPartitionId(),
92+
metricName
93+
);
94+
95+
if (null == ret || ret.failed() || null == ret.getData()) {
96+
continue;
97+
}
98+
99+
metrics.putMetric(ret.getData().getMetrics());
100+
} catch (Exception e) {
101+
LOGGER.error(
102+
"method=collectReplicaMetricsFromKafka||clusterPhyId={}||topicName={}||partition={}||brokerId={}||metricName={}||errMsg=exception!",
103+
clusterId, topicName, partitionId, brokerId, e
104+
);
105+
}
92106
}
93107

94-
Result<ReplicationMetrics> ret = collectReplicaMetricsFromKafka(clusterPhyId, topic, partitionId, brokerId, metric);
95-
if(null == ret || ret.failed() || null == ret.getData()){return ret;}
96-
97-
// 更新cache
98-
ret.getData().getMetrics().entrySet().stream().forEach(
99-
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
100-
replicaMetricsKey,
101-
metricNameAndValueEntry.getValue()
102-
)
103-
);
104-
105-
return ret;
108+
return Result.buildSuc(metrics);
106109
}
107110

108111
@Override
@@ -167,8 +170,8 @@ private Result<ReplicationMetrics> getMetricMessages(VersionItemParam param) {
167170
Integer brokerId = metricParam.getBrokerId();
168171
Integer partitionId = metricParam.getPartitionId();
169172

170-
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
171-
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
173+
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
174+
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
172175

173176
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId);
174177
if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){

km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ReplicaMetricsController {
2626
@Autowired
2727
private ReplicaMetricService replicationMetricService;
2828

29+
@Deprecated
2930
@ApiOperation(value = "Replica指标-单个Replica")
3031
@PostMapping(value = "clusters/{clusterPhyId}/brokers/{brokerId}/topics/{topicName}/partitions/{partitionId}/metric-points")
3132
@ResponseBody
@@ -45,7 +46,7 @@ public Result<BaseMetrics> getReplicaMetricPoints(@PathVariable Long clusterPhyI
4546
@PathVariable String topicName,
4647
@PathVariable Integer partitionId,
4748
@RequestBody List<String> metricsNames) {
48-
Result<ReplicationMetrics> metricsResult = replicationMetricService.getLatestMetricsFromES(clusterPhyId, brokerId, topicName, partitionId, metricsNames);
49+
Result<ReplicationMetrics> metricsResult = replicationMetricService.collectReplicaMetricsFromKafka(clusterPhyId, topicName, partitionId, brokerId, metricsNames);
4950
if (metricsResult.failed()) {
5051
return Result.buildFromIgnoreData(metricsResult);
5152
}
Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
1-
package com.xiaojukeji.know.streaming.km.task.metrics;
2-
3-
import com.didiglobal.logi.job.annotation.Task;
4-
import com.didiglobal.logi.job.common.TaskResult;
5-
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
6-
import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
7-
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
8-
import lombok.extern.slf4j.Slf4j;
9-
import org.springframework.beans.factory.annotation.Autowired;
10-
11-
/**
12-
* @author didi
13-
*/
14-
@Slf4j
15-
@Task(name = "ReplicaMetricCollectorTask",
16-
description = "Replica指标采集任务",
17-
cron = "0 0/1 * * * ? *",
18-
autoRegister = true,
19-
consensual = ConsensualEnum.BROADCAST,
20-
timeout = 2 * 60)
21-
public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
22-
23-
@Autowired
24-
private ReplicaMetricCollector replicaMetricCollector;
25-
26-
@Override
27-
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
28-
replicaMetricCollector.collectMetrics(clusterPhy);
29-
30-
return TaskResult.SUCCESS;
31-
}
32-
}
1+
//package com.xiaojukeji.know.streaming.km.task.metrics;
2+
//
3+
//import com.didiglobal.logi.job.annotation.Task;
4+
//import com.didiglobal.logi.job.common.TaskResult;
5+
//import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
6+
//import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
7+
//import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
8+
//import lombok.extern.slf4j.Slf4j;
9+
//import org.springframework.beans.factory.annotation.Autowired;
10+
//
11+
///**
12+
// * @author didi
13+
// */
14+
//@Slf4j
15+
//@Task(name = "ReplicaMetricCollectorTask",
16+
// description = "Replica指标采集任务",
17+
// cron = "0 0/1 * * * ? *",
18+
// autoRegister = true,
19+
// consensual = ConsensualEnum.BROADCAST,
20+
// timeout = 2 * 60)
21+
//public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
22+
//
23+
// @Autowired
24+
// private ReplicaMetricCollector replicaMetricCollector;
25+
//
26+
// @Override
27+
// public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
28+
// replicaMetricCollector.collectMetrics(clusterPhy);
29+
//
30+
// return TaskResult.SUCCESS;
31+
// }
32+
//}

0 commit comments

Comments
 (0)