Skip to content

Commit 9e3c4dc

Browse files
author
zengqiao
committed
解决raft集群controller信息不断记录问题
1 parent 507abc1 commit 9e3c4dc

File tree

6 files changed

+130
-45
lines changed

6 files changed

+130
-45
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ private Constant() {}
6363
public static final String COLLECT_METRICS_COST_TIME_METRICS_NAME = "CollectMetricsCostTimeUnitSec";
6464
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
6565

66+
public static final Integer DEFAULT_RETRY_TIME = 3;
6667
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public interface BrokerService {
4444
* 获取具体Broker
4545
*/
4646
Broker getBroker(Long clusterPhyId, Integer brokerId);
47+
Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId);
4748

4849
/**
4950
* 获取BrokerLog-Dir信息

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,16 @@ public Result<List<MetricMultiLinesVO>> listBrokerMetricsFromES(Long clusterId,
178178

179179
@Override
180180
public Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) {
181-
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId,
182-
dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime());
181+
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(
182+
clusterPhyId,
183+
brokerId,
184+
dto.getMetricsNames(),
185+
dto.getAggType(),
186+
dto.getStartTime(),
187+
dto.getEndTime()
188+
);
183189

184-
List<MetricPointVO> metricPoints = new ArrayList<>(metricPointMap.values());
185-
return Result.buildSuc(metricPoints);
190+
return Result.buildSuc(new ArrayList<>(metricPointMap.values()));
186191
}
187192

188193
@Override
@@ -199,8 +204,10 @@ public Result<List<BrokerMetrics>> getLatestMetricsFromES(Long clusterPhyId, Lis
199204

200205
brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class));
201206
} catch (Exception e) {
202-
LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
203-
clusterPhyId, brokerId, e);
207+
LOGGER.error(
208+
"method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
209+
clusterPhyId, brokerId, e
210+
);
204211
}
205212
}
206213

@@ -219,6 +226,7 @@ public boolean isMetricName(String str) {
219226
}
220227

221228
/**************************************************** private method ****************************************************/
229+
222230
private List<Long> listTopNBrokerIds(Long clusterId, Integer topN){
223231
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
224232
if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,22 @@ public Broker getBroker(Long clusterPhyId, Integer brokerId) {
206206
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
207207
}
208208

209+
@Override
210+
public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) {
211+
List<Broker> brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId);
212+
if (brokerList == null) {
213+
return null;
214+
}
215+
216+
for (Broker broker: brokerList) {
217+
if (brokerId.equals(broker.getBrokerId())) {
218+
return broker;
219+
}
220+
}
221+
222+
return null;
223+
}
224+
209225
@Override
210226
public Result<Map<String, LogDirDescription>> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) {
211227
try {

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Result<KafkaController> getControllerFromKafka(ClusterPhy clusterPhy) {
5656
@Override
5757
public int insertAndIgnoreDuplicateException(KafkaController kafkaController) {
5858
try {
59-
Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
59+
Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
6060

6161
KafkaControllerPO kafkaControllerPO = new KafkaControllerPO();
6262
kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId());
@@ -136,34 +136,56 @@ public IPage<KafkaControllerPO> pagingControllerHistories(Long clusterPhyId, Int
136136
/**************************************************** private method ****************************************************/
137137

138138
private Result<KafkaController> getControllerFromAdminClient(ClusterPhy clusterPhy) {
139+
AdminClient adminClient = null;
139140
try {
140-
AdminClient adminClient = null;
141-
try {
142-
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
143-
} catch (Exception e) {
144-
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
141+
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
142+
} catch (Exception e) {
143+
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
145144

146-
// 集群已经加载进来,但是创建admin-client失败,则设置无controller
147-
return Result.buildSuc();
148-
}
145+
// 集群已经加载进来,但是创建admin-client失败,则设置无controller
146+
return Result.buildSuc();
147+
}
149148

150-
DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
149+
// 先从DB获取该集群controller
150+
KafkaController dbKafkaController = null;
151151

152-
Node controllerNode = describeClusterResult.controller().get();
153-
if (controllerNode == null) {
154-
return Result.buildSuc();
152+
for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) {
153+
try {
154+
if (i == 1) {
155+
// 获取DB中的controller信息
156+
dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId());
157+
}
158+
159+
DescribeClusterResult describeClusterResult = adminClient.describeCluster(
160+
new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
161+
);
162+
163+
Node controllerNode = describeClusterResult.controller().get();
164+
if (controllerNode == null) {
165+
return Result.buildSuc();
166+
}
167+
168+
if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) {
169+
// ID没有变化,直接返回原先的
170+
return Result.buildSuc(dbKafkaController);
171+
}
172+
173+
// 发生了变化
174+
return Result.buildSuc(new KafkaController(
175+
clusterPhy.getId(),
176+
controllerNode.id(),
177+
System.currentTimeMillis()
178+
));
179+
} catch (Exception e) {
180+
log.error(
181+
"class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception",
182+
clusterPhy.getId(), i, e
183+
);
155184
}
156-
157-
return Result.buildSuc(new KafkaController(
158-
clusterPhy.getId(),
159-
controllerNode.id(),
160-
System.currentTimeMillis()
161-
));
162-
} catch (Exception e) {
163-
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
164-
165-
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
166185
}
186+
187+
// 三次出错,则直接返回无controller
188+
return Result.buildSuc();
167189
}
168190

169191
private Result<KafkaController> getControllerFromZKClient(ClusterPhy clusterPhy) {

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,24 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
4141
DslsConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime);
4242

4343
BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst(
44-
brokerId.toString(), realIndex(startTime, endTime), dsl, BrokerMetricPO.class);
44+
brokerId.toString(),
45+
realIndex(startTime, endTime),
46+
dsl,
47+
BrokerMetricPO.class
48+
);
4549

4650
return (null == brokerMetricPO) ? new BrokerMetricPO(clusterId, brokerId) : brokerMetricPO;
4751
}
4852

4953
/**
5054
* 获取集群 clusterPhyId 中每个 metric 的指定 broker 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
5155
*/
52-
public Map<String/*metric*/, MetricPointVO> getBrokerMetricsPoint(Long clusterPhyId, Integer brokerId, List<String> metrics,
53-
String aggType, Long startTime, Long endTime){
56+
public Map<String/*metric*/, MetricPointVO> getBrokerMetricsPoint(Long clusterPhyId,
57+
Integer brokerId,
58+
List<String> metrics,
59+
String aggType,
60+
Long startTime,
61+
Long endTime) {
5462
//1、获取需要查下的索引
5563
String realIndex = realIndex(startTime, endTime);
5664

@@ -60,8 +68,13 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
6068
String dsl = dslLoaderUtil.getFormatDslByFileName(
6169
DslsConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl);
6270

63-
return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
64-
s -> handleSingleESQueryResponse(s, metrics, aggType), 3);
71+
return esOpClient.performRequestWithRouting(
72+
String.valueOf(brokerId),
73+
realIndex,
74+
dsl,
75+
s -> handleSingleESQueryResponse(s, metrics, aggType),
76+
3
77+
);
6578
}
6679

6780
/**
@@ -75,10 +88,19 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
7588
Map<String, List<Long>> metricBrokerIds = getTopNBrokerIds(clusterPhyId, metrics, aggType, topN, startTime, endTime);
7689

7790
Table<String, Long, List<MetricPointVO>> table = HashBasedTable.create();
91+
7892
//2、查询指标
7993
for(String metric : metricBrokerIds.keySet()){
80-
table.putAll(listBrokerMetricsByBrokerIds(clusterPhyId, Arrays.asList(metric),
81-
aggType, metricBrokerIds.getOrDefault(metric, brokerIds), startTime, endTime));
94+
table.putAll(
95+
this.listBrokerMetricsByBrokerIds(
96+
clusterPhyId,
97+
Arrays.asList(metric),
98+
aggType,
99+
metricBrokerIds.getOrDefault(metric, brokerIds),
100+
startTime,
101+
endTime
102+
)
103+
);
82104
}
83105

84106
return table;
@@ -87,9 +109,12 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
87109
/**
88110
* 获取集群 clusterPhyId 中每个 metric 的指定 brokers 在指定时间[startTime、endTime]区间内所有的指标
89111
*/
90-
public Table<String/*metric*/, Long/*brokerId*/, List<MetricPointVO>> listBrokerMetricsByBrokerIds(Long clusterPhyId, List<String> metrics,
91-
String aggType, List<Long> brokerIds,
92-
Long startTime, Long endTime){
112+
public Table<String/*metric*/, Long/*brokerId*/, List<MetricPointVO>> listBrokerMetricsByBrokerIds(Long clusterPhyId,
113+
List<String> metrics,
114+
String aggType,
115+
List<Long> brokerIds,
116+
Long startTime,
117+
Long endTime){
93118
//1、获取需要查下的索引
94119
String realIndex = realIndex(startTime, endTime);
95120

@@ -105,22 +130,34 @@ public BrokerMetricPO getBrokerLatestMetrics(Long clusterId, Integer brokerId){
105130
for(Long brokerId : brokerIds){
106131
try {
107132
String dsl = dslLoaderUtil.getFormatDslByFileName(
108-
DslsConstant.GET_BROKER_AGG_LIST_METRICS, clusterPhyId, brokerId, startTime, endTime, interval, aggDsl);
133+
DslsConstant.GET_BROKER_AGG_LIST_METRICS,
134+
clusterPhyId,
135+
brokerId,
136+
startTime,
137+
endTime,
138+
interval,
139+
aggDsl
140+
);
109141

110142
queryFuture.runnableTask(
111143
String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId),
112144
5000,
113145
() -> {
114-
Map<String, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
115-
s -> handleListESQueryResponse(s, metrics, aggType), 3);
116-
117-
synchronized (table){
146+
Map<String, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
147+
String.valueOf(brokerId),
148+
realIndex,
149+
dsl,
150+
s -> handleListESQueryResponse(s, metrics, aggType),
151+
3
152+
);
153+
154+
synchronized (table) {
118155
for(String metric : metricMap.keySet()){
119156
table.put(metric, brokerId, metricMap.get(metric));
120157
}
121158
}
122159
});
123-
}catch (Exception e){
160+
} catch (Exception e){
124161
LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!", clusterPhyId, brokerId, e);
125162
}
126163
}

0 commit comments

Comments
 (0)