Skip to content

Commit 4114777

Browse files
author
zengqiao
committed
补充leader选举能力
1 parent 9e3c4dc commit 4114777

File tree

3 files changed

+152
-0
lines changed

3 files changed

+152
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
import org.apache.kafka.common.TopicPartition;
7+
8+
import java.util.List;
9+
10+
@Data
11+
@NoArgsConstructor
12+
public class BatchPartitionParam extends ClusterPhyParam {
13+
private List<TopicPartition> tpList;
14+
15+
public BatchPartitionParam(Long clusterPhyId, List<TopicPartition> tpList) {
16+
super(clusterPhyId);
17+
this.tpList = tpList;
18+
}
19+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.xiaojukeji.know.streaming.km.core.service.partition;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
4+
import org.apache.kafka.common.TopicPartition;
5+
6+
import java.util.List;
7+
8+
public interface OpPartitionService {
9+
10+
/**
11+
* 优先副本选举
12+
*/
13+
Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList);
14+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
6+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam;
7+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
8+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
9+
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
10+
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
11+
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
12+
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
13+
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
14+
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
15+
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
16+
import kafka.zk.KafkaZkClient;
17+
import org.apache.kafka.clients.admin.AdminClient;
18+
import org.apache.kafka.clients.admin.ElectLeadersOptions;
19+
import org.apache.kafka.clients.admin.ElectLeadersResult;
20+
import org.apache.kafka.common.ElectionType;
21+
import org.apache.kafka.common.TopicPartition;
22+
import org.springframework.beans.factory.annotation.Autowired;
23+
import org.springframework.stereotype.Service;
24+
import scala.jdk.javaapi.CollectionConverters;
25+
26+
import javax.annotation.PostConstruct;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
30+
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST;
31+
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
32+
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER;
33+
34+
35+
/**
36+
* @author didi
37+
*/
38+
@Service
39+
public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService {
40+
private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class);
41+
42+
@Autowired
43+
private KafkaAdminClient kafkaAdminClient;
44+
45+
@Autowired
46+
private KafkaAdminZKClient kafkaAdminZKClient;
47+
48+
public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection";
49+
50+
@Override
51+
protected VersionItemTypeEnum getVersionItemType() {
52+
return SERVICE_OP_PARTITION_LEADER;
53+
}
54+
55+
@PostConstruct
56+
private void init() {
57+
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient);
58+
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient);
59+
}
60+
61+
@Override
62+
public Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList) {
63+
try {
64+
return (Result<Void>) doVCHandler(
65+
clusterPhyId,
66+
PREFERRED_REPLICA_ELECTION,
67+
new BatchPartitionParam(clusterPhyId, tpList)
68+
);
69+
} catch (VCHandlerNotExistException e) {
70+
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
71+
}
72+
}
73+
74+
/**************************************************** private method ****************************************************/
75+
76+
private Result<Void> preferredReplicaElectionByZKClient(VersionItemParam itemParam) {
77+
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
78+
79+
try {
80+
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId());
81+
82+
kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet());
83+
84+
return Result.buildSuc();
85+
} catch (Exception e) {
86+
LOGGER.error(
87+
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception",
88+
partitionParam.getClusterPhyId(), e
89+
);
90+
91+
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
92+
}
93+
}
94+
95+
private Result<Void> preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) {
96+
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
97+
98+
try {
99+
AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId());
100+
101+
ElectLeadersResult electLeadersResult = adminClient.electLeaders(
102+
ElectionType.PREFERRED,
103+
new HashSet<>(partitionParam.getTpList()),
104+
new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
105+
);
106+
107+
electLeadersResult.all().get();
108+
109+
return Result.buildSuc();
110+
} catch (Exception e) {
111+
LOGGER.error(
112+
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception",
113+
partitionParam.getClusterPhyId(), e
114+
);
115+
116+
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)