Skip to content

Commit 88d0a60

Browse files
_haoqiZQKC
authored andcommitted
[ISSUE #677] 重启会导致部分信息采集抛出空指针
1 parent 05c52cd commit 88d0a60

File tree

7 files changed

+123
-28
lines changed

7 files changed

+123
-28
lines changed

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,12 @@ public Map<String, List<Long>> getTopNBrokerIds(Long clusterPhyId, List<String>
207207
}
208208

209209
for(String metric : metrics){
210-
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
210+
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
211+
if(null == value){continue;}
211212

212213
MetricPointVO metricPoint = new MetricPointVO();
213214
metricPoint.setAggType(aggType);
214-
metricPoint.setValue(value);
215+
metricPoint.setValue(value.toString());
215216
metricPoint.setName(metric);
216217

217218
metricMap.put(metric, metricPoint);
@@ -243,12 +244,13 @@ private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryRespon
243244
try {
244245
if (null != esBucket.getUnusedMap().get(KEY)) {
245246
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
246-
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
247+
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
248+
if(null == value){return;}
247249

248250
MetricPointVO metricPoint = new MetricPointVO();
249251
metricPoint.setAggType(aggType);
250252
metricPoint.setTimeStamp(timestamp);
251-
metricPoint.setValue(value);
253+
metricPoint.setValue(value.toString());
252254
metricPoint.setName(metric);
253255

254256
metricPoints.add(metricPoint);
@@ -290,13 +292,14 @@ private Map<String, List<Long>> handleTopBrokerESQueryResponse(ESQueryResponse r
290292
try {
291293
if (null != esBucket.getUnusedMap().get(KEY)) {
292294
Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
293-
Double value = Double.valueOf(esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
294-
.get(metric).getUnusedMap().get(VALUE).toString());
295+
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
296+
.get(metric).getUnusedMap().get(VALUE);
297+
if(null == value){return;}
295298

296299
List<Tuple<Long, Double>> brokerValue = (null == metricBrokerValueMap.get(metric)) ?
297300
new ArrayList<>() : metricBrokerValueMap.get(metric);
298301

299-
brokerValue.add(new Tuple<>(brokerId, value));
302+
brokerValue.add(new Tuple<>(brokerId, Double.valueOf(value.toString())));
300303
metricBrokerValueMap.put(metric, brokerValue);
301304
}
302305
}catch (Exception e){

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,12 @@ public List<ClusterMetricPO> pagingClusterWithLatestMetrics(List<SearchTerm> ter
169169
}
170170

171171
for(String metric : metrics){
172-
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
172+
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
173+
if(null == value){continue;}
173174

174175
MetricPointVO metricPoint = new MetricPointVO();
175176
metricPoint.setAggType(aggType);
176-
metricPoint.setValue(value);
177+
metricPoint.setValue(value.toString());
177178
metricPoint.setName(metric);
178179

179180
metricMap.put(metric, metricPoint);
@@ -194,12 +195,13 @@ private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryRespon
194195
try {
195196
if (null != esBucket.getUnusedMap().get(KEY)) {
196197
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
197-
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
198+
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
199+
if(null == value){return;}
198200

199201
MetricPointVO metricPoint = new MetricPointVO();
200202
metricPoint.setAggType(aggType);
201203
metricPoint.setTimeStamp(timestamp);
202-
metricPoint.setValue(value);
204+
metricPoint.setValue(value.toString());
203205
metricPoint.setName(metric);
204206

205207
metricPoints.add(metricPoint);

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ private GroupMetricPO handleGroupMetricESQueryResponse(ESQueryResponse response,
173173
}
174174

175175
for(String metric : metrics){
176-
String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString();
177-
groupMetricPO.getMetrics().put(metric, Float.valueOf(value));
176+
Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE);
177+
if(value == null){continue;}
178+
groupMetricPO.getMetrics().put(metric, Float.parseFloat(value.toString()));
178179
}
179180

180181
return groupMetricPO;
@@ -192,12 +193,13 @@ private GroupMetricPO handleGroupMetricESQueryResponse(ESQueryResponse response,
192193
try {
193194
if (null != esBucket.getUnusedMap().get(KEY)) {
194195
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
195-
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
196+
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
197+
if(value == null){return;}
196198

197199
MetricPointVO metricPoint = new MetricPointVO();
198200
metricPoint.setAggType(aggType);
199201
metricPoint.setTimeStamp(timestamp);
200-
metricPoint.setValue(value);
202+
metricPoint.setValue(value.toString());
201203
metricPoint.setName(metric);
202204

203205
metricPoints.add(metricPoint);

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,13 @@ private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryRespon
337337
try {
338338
if (null != esBucket.getUnusedMap().get(KEY)) {
339339
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
340-
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
340+
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
341+
if(value == null){return;}
341342

342343
MetricPointVO metricPoint = new MetricPointVO();
343344
metricPoint.setAggType(aggType);
344345
metricPoint.setTimeStamp(timestamp);
345-
metricPoint.setValue(value);
346+
metricPoint.setValue(value.toString());
346347
metricPoint.setName(metric);
347348

348349
metricPoints.add(metricPoint);

km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchPage;
66
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange;
77
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
8+
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
9+
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
810
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO;
911
import org.junit.jupiter.api.Test;
1012
import org.springframework.beans.factory.annotation.Autowired;
1113

12-
import java.util.ArrayList;
13-
import java.util.Arrays;
14-
import java.util.List;
14+
import java.util.*;
1515

1616
public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
1717

@@ -28,6 +28,53 @@ public void listClusterMetricsByClusterIdsTest(){
2828
clusterMetricESDAO.listClusterMetricsByClusterIds(metrics, "avg", clusterIds, startTime, endTime);
2929
}
3030

31+
/**
32+
* 测试
33+
* 获取集群 clusterPhyId 中每个 metric 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
34+
*/
35+
@Test
36+
public void getClusterMetricsPointTest(){
37+
Long clusterId = 1L;
38+
List<String> metrics = Arrays.asList(
39+
"Connections", "BytesIn_min_15", "PartitionURP",
40+
"HealthScore_Topics", "EventQueueSize", "ActiveControllerCount",
41+
"GroupDeads", "BytesIn_min_5", "HealthCheckTotal_Topics",
42+
"Partitions", "BytesOut", "Groups",
43+
"BytesOut_min_15", "TotalRequestQueueSize", "HealthCheckPassed_Groups",
44+
"TotalProduceRequests", "HealthCheckPassed", "TotalLogSize",
45+
"GroupEmptys", "PartitionNoLeader", "HealthScore_Brokers",
46+
"Messages", "Topics", "PartitionMinISR_E",
47+
"HealthCheckTotal", "Brokers", "Replicas",
48+
"HealthCheckTotal_Groups", "GroupRebalances", "MessageIn",
49+
"HealthScore", "HealthCheckPassed_Topics", "HealthCheckTotal_Brokers",
50+
"PartitionMinISR_S", "BytesIn", "BytesOut_min_5",
51+
"GroupActives", "MessagesIn", "GroupReBalances",
52+
"HealthCheckPassed_Brokers", "HealthScore_Groups", "TotalResponseQueueSize",
53+
"Zookeepers", "LeaderMessages", "HealthScore_Cluster",
54+
"HealthCheckPassed_Cluster", "HealthCheckTotal_Cluster");
55+
Long endTime = System.currentTimeMillis();
56+
Long startTime = endTime - 4 * 60 * 60 * 1000;
57+
58+
Map<String/*metric*/, MetricPointVO> metricPointVOS = clusterMetricESDAO.getClusterMetricsPoint(
59+
clusterId, metrics, "avg", startTime, endTime);
60+
61+
assert null != metricPointVOS;
62+
}
63+
64+
/**
65+
* 测试
66+
* 获取集群 clusterId 最新的统计指标
67+
*/
68+
@Test
69+
public void getClusterLatestMetricsTest(){
70+
Long clusterId = 1L;
71+
List<String> metrics = Collections.emptyList();
72+
73+
ClusterMetricPO clusterLatestMetrics = clusterMetricESDAO.getClusterLatestMetrics(clusterId, metrics);
74+
75+
assert null != clusterLatestMetrics;
76+
}
77+
3178
@Test
3279
public void pagingClusterWithLatestMetricsTest(){
3380
List<Long> clusterIds = new ArrayList<>();

km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest;
44
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
5+
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
56
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO;
67
import org.junit.jupiter.api.Test;
78
import org.springframework.beans.factory.annotation.Autowired;
89

910
import java.util.ArrayList;
11+
import java.util.Collections;
12+
import java.util.Map;
1013

1114
public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
1215

@@ -15,7 +18,7 @@ public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest {
1518

1619
@Test
1720
public void getReplicationLatestMetricsTest(){
18-
Long clusterPhyId = 2l;
21+
Long clusterPhyId = 2L;
1922
Integer brokerId = 1;
2023
String topic = "know-streaming-test-251";
2124
Integer partitionId = 1;
@@ -24,4 +27,22 @@ public void getReplicationLatestMetricsTest(){
2427

2528
assert null != replicationMetricPO;
2629
}
30+
31+
/**
32+
* 测试
33+
* 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
34+
*/
35+
@Test
36+
public void getReplicationMetricsPointTest(){
37+
Long clusterPhyId = 2L;
38+
Integer brokerId = 1;
39+
String topic = "know-streaming-test-251";
40+
Integer partitionId = 1;
41+
Long endTime = System.currentTimeMillis();
42+
Long startTime = endTime - 4 * 60 * 60 * 1000;
43+
Map<String, MetricPointVO> metricPointVOMap = replicationMetricESDAO.getReplicationMetricsPoint(
44+
clusterPhyId, topic, brokerId, partitionId, Collections.emptyList(), "avg", startTime, endTime);
45+
46+
assert null != metricPointVOMap;
47+
}
2748
}

km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.ArrayList;
1616
import java.util.Arrays;
1717
import java.util.List;
18-
import java.util.Map;
1918

2019
public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
2120

@@ -38,14 +37,20 @@ public void listTopicMaxMinMetricsTest(){
3837
@Test
3938
public void getTopicsAggsMetricsValueTest(){
4039
Long clusterId = 2L;
41-
String topic = "know-streaming-test-251";
42-
String topic1 = "topic_test01";
43-
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
40+
List<String> topicList = Arrays.asList("know-streaming-test-251", "topic_test01");
41+
List<String> metrics = Arrays.asList(
42+
"Messages", "BytesIn_min_15", "BytesRejected",
43+
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
44+
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
45+
"HealthScore", "LogSize", "BytesOut",
46+
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
47+
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
48+
"HealthCheckPassed");
4449
Long endTime = System.currentTimeMillis();
4550
Long startTime = endTime - 4 * 60 * 60 * 1000;
4651

4752
Table<String/*topics*/, String/*metric*/, MetricPointVO> ret = topicMetricESDAO.getTopicsAggsMetricsValue(
48-
clusterId, Arrays.asList(topic, topic1), metrics, "max", startTime, endTime);
53+
clusterId, topicList, metrics, "max", startTime, endTime);
4954
assert null != ret;
5055
}
5156

@@ -90,7 +95,14 @@ public void listTopicLatestMetricTest(){
9095
String topic = "know-streaming-test-251";
9196
String topic1 = "know-streaming-123";
9297
String topic2 = "1209test";
93-
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
98+
List<String> metrics = Arrays.asList(
99+
"Messages", "BytesIn_min_15", "BytesRejected",
100+
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
101+
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
102+
"HealthScore", "LogSize", "BytesOut",
103+
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
104+
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
105+
"HealthCheckPassed");
94106

95107

96108
List<TopicMetricPO> topicMetricPO = topicMetricESDAO.listTopicLatestMetric(clusterId, Arrays.asList(topic,topic1,topic2), metrics);
@@ -101,7 +113,14 @@ public void listTopicLatestMetricTest(){
101113
@Test
102114
public void listBrokerMetricsByTopicsTest(){
103115
Long clusterId = 2L;
104-
List<String> metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
116+
List<String> metrics = Arrays.asList(
117+
"Messages", "BytesIn_min_15", "BytesRejected",
118+
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
119+
"CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5",
120+
"HealthScore", "LogSize", "BytesOut",
121+
"FailedProduceRequests", "BytesOut_min_15", "BytesIn",
122+
"BytesOut_min_5", "MessagesIn", "TotalProduceRequests",
123+
"HealthCheckPassed");
105124
List<String> topics = Arrays.asList("QAtest_1_13", "__consumer_offsets");
106125
Long endTime = System.currentTimeMillis();
107126
Long startTime = endTime - 4 * 60 * 60 * 1000;

0 commit comments

Comments
 (0)