Skip to content

Commit 6e05824

Browse files
author
zengqiao
committed
指标采集缓存时间调整
1 parent f005c6b commit 6e05824

File tree

8 files changed

+53
-56
lines changed

8 files changed

+53
-56
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ public enum VersionItemTypeEnum {
3131

3232

3333
SERVICE_OP_PARTITION(320, "service_partition_operation"),
34+
SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"),
3435

3536
SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),
3637

38+
3739
/**
3840
* 前端操作
3941
*/

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
public class CollectedMetricsLocalCache {
1212
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
13-
.expireAfterWrite(60, TimeUnit.SECONDS)
14-
.maximumSize(2000)
13+
.expireAfterWrite(90, TimeUnit.SECONDS)
14+
.maximumSize(10000)
1515
.build();
1616

1717
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
1818
.expireAfterWrite(90, TimeUnit.SECONDS)
19-
.maximumSize(5000)
19+
.maximumSize(10000)
2020
.build();
2121

2222
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
@@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache {
2929
.maximumSize(20000)
3030
.build();
3131

32-
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
33-
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
32+
public static Float getBrokerMetrics(String brokerMetricKey) {
33+
return brokerMetricsCache.getIfPresent(brokerMetricKey);
3434
}
3535

36-
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
36+
public static void putBrokerMetrics(String brokerMetricKey, Float value) {
3737
if (value == null) {
3838
return;
3939
}
40-
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
40+
41+
brokerMetricsCache.put(brokerMetricKey, value);
4142
}
4243

43-
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
44-
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
44+
public static List<TopicMetrics> getTopicMetrics(String topicMetricKey) {
45+
return topicMetricsCache.getIfPresent(topicMetricKey);
4546
}
4647

47-
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
48+
public static void putTopicMetrics(String topicMetricKey, List<TopicMetrics> metricsList) {
4849
if (metricsList == null) {
4950
return;
5051
}
51-
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
52+
53+
topicMetricsCache.put(topicMetricKey, metricsList);
5254
}
5355

54-
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
55-
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
56+
public static List<PartitionMetrics> getPartitionMetricsList(String partitionMetricKey) {
57+
return partitionMetricsCache.getIfPresent(partitionMetricKey);
5658
}
5759

58-
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
60+
public static void putPartitionMetricsList(String partitionMetricsKey, List<PartitionMetrics> metricsList) {
5961
if (metricsList == null) {
6062
return;
6163
}
62-
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
64+
partitionMetricsCache.put(partitionMetricsKey, metricsList);
6365
}
6466

65-
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
66-
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
67+
public static Float getReplicaMetrics(String replicaMetricsKey) {
68+
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
6769
}
6870

69-
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
71+
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
7072
if (value == null) {
7173
return;
7274
}
73-
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
75+
replicaMetricsValueCache.put(replicaMetricsKey, value);
7476
}
7577

76-
77-
/**************************************************** private method ****************************************************/
78-
79-
80-
private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
78+
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
8179
return clusterPhyId + "@" + brokerId + "@" + metricName;
8280
}
8381

84-
private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
82+
public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
8583
return clusterPhyId + "@" + topicName + "@" + metricName;
8684
}
8785

88-
private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
86+
public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
8987
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
9088
}
89+
90+
/**************************************************** private method ****************************************************/
91+
9192
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ protected void initRegisterVCHandler(){
110110
}
111111

112112
@Override
113-
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
113+
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) {
114+
String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric);
114115

115-
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
116+
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey);
116117
if(null != keyValue) {
117118
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
118119
brokerMetrics.putMetric(metric, keyValue);
@@ -124,7 +125,7 @@ public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long cl
124125

125126
Map<String, Float> metricsMap = ret.getData().getMetrics();
126127
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
127-
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
128+
CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue());
128129
}
129130

130131
return ret;

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
126126
private TopicMetricService topicMetricService;
127127

128128
@Autowired
129-
private TopicService topicService;
129+
private TopicService topicService;
130130

131131
@Autowired
132132
private PartitionService partitionService;
@@ -728,13 +728,10 @@ private Result<ClusterMetrics> getMetricFromKafkaByTotalBrokersJMX(VersionItemPa
728728
Long clusterId = param.getClusterId();
729729

730730
//1、获取jmx的属性信息
731-
VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric);
732-
if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);}
733-
734731
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
735732

736733
float metricVale = 0f;
737-
for(Broker broker : brokers){
734+
for(Broker broker : brokers) {
738735
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
739736

740737
if(null == ret || ret.failed() || null == ret.getData()){continue;}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ protected void initRegisterVCHandler(){
7575

7676
@Override
7777
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
78-
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
78+
String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
79+
80+
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey);
7981
if(null != metricsList) {
8082
return Result.buildSuc(metricsList);
8183
}
@@ -88,12 +90,7 @@ public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache
8890
// 更新cache
8991
PartitionMetrics metrics = metricsResult.getData().get(0);
9092
metrics.getMetrics().entrySet().forEach(
91-
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(
92-
clusterPhyId,
93-
metrics.getTopic(),
94-
metricEntry.getKey(),
95-
metricsResult.getData()
96-
)
93+
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData())
9794
);
9895

9996
return metricsResult;

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,14 @@ protected VersionItemTypeEnum getVersionItemType() {
7777
}
7878

7979
@Override
80-
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic,
81-
Integer brokerId, Integer partitionId, String metric){
82-
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
80+
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
81+
String topic,
82+
Integer brokerId,
83+
Integer partitionId,
84+
String metric) {
85+
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
86+
87+
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
8388
if(null != keyValue){
8489
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
8590
replicationMetrics.putMetric(metric, keyValue);
@@ -92,11 +97,7 @@ public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long c
9297
// 更新cache
9398
ret.getData().getMetrics().entrySet().stream().forEach(
9499
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
95-
clusterPhyId,
96-
brokerId,
97-
topic,
98-
partitionId,
99-
metricNameAndValueEntry.getKey(),
100+
replicaMetricsKey,
100101
metricNameAndValueEntry.getValue()
101102
)
102103
);

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ protected void initRegisterVCHandler(){
120120

121121
@Override
122122
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
123-
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
123+
String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
124+
125+
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey);
124126
if(null != metricsList) {
125127
return Result.buildSuc(metricsList);
126128
}
@@ -133,12 +135,7 @@ public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Lon
133135
// 更新cache
134136
TopicMetrics metrics = metricsResult.getData().get(0);
135137
metrics.getMetrics().entrySet().forEach(
136-
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(
137-
clusterPhyId,
138-
metrics.getTopic(),
139-
metricEntry.getKey(),
140-
metricsResult.getData()
141-
)
138+
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData())
142139
);
143140

144141
return metricsResult;

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
3535
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
3636
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
3737
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
38+
3839
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
3940
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
4041
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";

0 commit comments

Comments
 (0)