Skip to content

Commit 4543a33

Browse files
WYAOBOZQKC
authored andcommitted
[Bugfix]修复job更新中的数组越界报错(#744)
1 parent 1c4fbef commit 4543a33

File tree

3 files changed

+50
-17
lines changed

3 files changed

+50
-17
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.xiaojukeji.know.streaming.km.common.bean.entity.reassign;
22

3-
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
43
import lombok.Data;
54
import org.apache.kafka.common.TopicPartition;
65

@@ -20,10 +19,4 @@ public boolean checkPartitionFinished(String topicName, Integer partitionId) {
2019

2120
return state.isDone();
2221
}
23-
24-
public boolean checkPreferredReplicaElectionUnNeed(String reassignBrokerIds, String originalBrokerIds) {
25-
Integer targetLeader = CommonUtils.string2IntList(reassignBrokerIds).get(0);
26-
Integer originalLeader = CommonUtils.string2IntList(originalBrokerIds).get(0);
27-
return originalLeader.equals(targetLeader);
28-
}
2922
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/CommonUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,20 @@ public static String getWorkerId(String url){
261261
return null;
262262
}
263263
}
264+
265+
266+
/**
267+
* 校验两个list的第一个元素是否相等,以","分隔元素。
268+
* @param str1
269+
* @param str2
270+
* @return
271+
*/
272+
public static boolean checkFirstElementIsEquals(String str1, String str2) {
273+
if (ValidateUtils.anyBlank(str1, str2)) {
274+
return false;
275+
}
276+
Integer targetLeader = CommonUtils.string2IntList(str1).get(0);
277+
Integer originalLeader = CommonUtils.string2IntList(str2).get(0);
278+
return originalLeader.equals(targetLeader);
279+
}
264280
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
2929
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
3030
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
31+
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReassignUtil;
3132
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
3233
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
3334
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
@@ -385,11 +386,13 @@ public Result<Void> verifyAndUpdateStatue(Long jobId) {
385386

386387
// 更新任务状态
387388
rv = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
388-
if (rv.successful()){
389+
390+
//如果任务还未完成,先返回,不必考虑优先副本的重新选举。
391+
if (!rv.successful()) {
389392
return Result.buildFromIgnoreData(rv);
390393
}
391394

392-
//已完成
395+
//任务已完成,检查是否需要重新选举,并进行选举。
393396
rv = this.preferredReplicaElection(jobId);
394397

395398

@@ -500,10 +503,8 @@ public Result<Void> preferredReplicaElection(Long jobId) {
500503
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobId);
501504
List<TopicPartition> topicPartitions = new ArrayList<>();
502505
subJobPOList.stream().forEach(reassignPO -> {
503-
Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0);
504-
Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0);
505506
//替换过leader的添加到优先副本选举任务列表
506-
if (!originalLeader.equals(targetLeader)){
507+
if (!CommonUtils.checkFirstElementIsEquals(reassignPO.getReassignBrokerIds(), reassignPO.getOriginalBrokerIds())) {
507508
topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId()));
508509
}
509510
});
@@ -534,8 +535,12 @@ private Result<Void> modifyAll(ReassignJobPO jobPO, ReplaceReassignJob replaceRe
534535
if (dbSubPO == null) {
535536
// DB中不存在
536537
reassignSubJobDAO.insert(elem);
538+
return;
537539
}
538540

541+
//补全缺失信息
542+
this.completeInfo(elem,dbSubPO);
543+
539544
// 已存在则进行更新
540545
elem.setId(dbSubPO.getId());
541546
reassignSubJobDAO.updateById(elem);
@@ -565,13 +570,10 @@ public Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignRe
565570
long now = System.currentTimeMillis();
566571

567572
boolean existNotFinished = false;
568-
boolean unNeedPreferredReplicaElection = true;
573+
boolean jobSucceed = false;
569574
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobPO.getId());
570575

571576
for (ReassignSubJobPO subJobPO: subJobPOList) {
572-
if (!reassignmentResult.checkPreferredReplicaElectionUnNeed(subJobPO.getReassignBrokerIds(),subJobPO.getOriginalBrokerIds())) {
573-
unNeedPreferredReplicaElection = false;
574-
}
575577

576578
if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) {
577579
existNotFinished = true;
@@ -591,12 +593,13 @@ public Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignRe
591593
// 当前没有分区处于迁移中, 并且没有任务并不处于执行中
592594
ReassignJobPO newJobPO = new ReassignJobPO();
593595
newJobPO.setId(jobPO.getId());
596+
jobSucceed = true;
594597
newJobPO.setStatus(JobStatusEnum.SUCCESS.getStatus());
595598
newJobPO.setFinishedTime(new Date(now));
596599
reassignJobDAO.updateById(newJobPO);
597600
}
598601

599-
return Result.build(unNeedPreferredReplicaElection);
602+
return Result.build(jobSucceed);
600603
}
601604

602605
private Result<List<ReassignSubJobPO>> setJobInRunning(ReassignJobPO jobPO) {
@@ -861,4 +864,25 @@ private Result<Void> modifyRetentionTime(Long clusterPhyId, List<ReassignSubJobP
861864

862865
return returnRV;
863866
}
867+
868+
private void completeInfo(ReassignSubJobPO newPO, ReassignSubJobPO dbPO) {
869+
if (newPO.getJobId() == null) {
870+
newPO.setJobId(dbPO.getJobId());
871+
}
872+
if (newPO.getTopicName() == null) {
873+
newPO.setTopicName(dbPO.getTopicName());
874+
}
875+
if (newPO.getClusterPhyId() == null) {
876+
newPO.setClusterPhyId(dbPO.getClusterPhyId());
877+
}
878+
if (newPO.getPartitionId() == null) {
879+
newPO.setPartitionId(dbPO.getPartitionId());
880+
}
881+
if (newPO.getOriginalBrokerIds() == null || newPO.getOriginalBrokerIds().isEmpty()) {
882+
newPO.setOriginalBrokerIds(dbPO.getOriginalBrokerIds());
883+
}
884+
if (newPO.getReassignBrokerIds() == null || newPO.getReassignBrokerIds().isEmpty()) {
885+
newPO.setReassignBrokerIds(dbPO.getReassignBrokerIds());
886+
}
887+
}
864888
}

0 commit comments

Comments
 (0)