Skip to content

Commit 725ac10

Browse files
author
zengqiao
committed
1、调整KafkaZKDao位置;2、offset信息获取时,过滤掉无leader分区;3、调整验证ZK是否合法时的session超时时间
1 parent 2b76358 commit 725ac10

File tree

27 files changed

+74
-59
lines changed

27 files changed

+74
-59
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
66
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
77
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
8-
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
98
import lombok.AllArgsConstructor;
109
import lombok.Data;
1110
import lombok.NoArgsConstructor;
@@ -79,20 +78,6 @@ public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp
7978
return metadata;
8079
}
8180

82-
public static Broker buildFrom(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) {
83-
Broker metadata = new Broker();
84-
metadata.setClusterPhyId(clusterPhyId);
85-
metadata.setBrokerId(brokerId);
86-
metadata.setHost(brokerMetadata.getHost());
87-
metadata.setPort(brokerMetadata.getPort());
88-
metadata.setJmxPort(brokerMetadata.getJmxPort());
89-
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
90-
metadata.setRack(brokerMetadata.getRack());
91-
metadata.setStatus(1);
92-
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
93-
return metadata;
94-
}
95-
9681
public static Broker buildFrom(BrokerPO brokerPO) {
9782
Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class);
9883
String endpointMapStr = brokerPO.getEndpointMap();

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public static String getClusterTopicKey(Long clusterPhyId, String topicName) {
5252

5353
/**************************************************** Partition ****************************************************/
5454

55+
public static String getPartitionNoLeader(Long clusterPhyId, String topicName) {
56+
return String.format("集群ID:[%d] Topic名称:[%s] 所有分区NoLeader", clusterPhyId, topicName);
57+
}
58+
5559
public static String getPartitionNotExist(Long clusterPhyId, String topicName) {
5660
return String.format("集群ID:[%d] Topic名称:[%s] 存在非法的分区ID", clusterPhyId, topicName);
5761
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
66
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
77
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
8-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
8+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
99
import kafka.zk.KafkaZkClient;
1010
import org.springframework.beans.factory.annotation.Autowired;
1111

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
88
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
99
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
10-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
10+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
1111

1212

1313
public abstract class AbstractZKHandler {

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
1010
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
1111
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
12-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
12+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
1313
import kafka.zk.BrokerIdsZNode;
1414
import kafka.zookeeper.ZNodeChildChangeHandler;
1515

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
99
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
1010
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
11-
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationBaseData;
12-
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV1;
13-
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV2;
11+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationBaseData;
12+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV1;
13+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV2;
1414
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
15-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
15+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
1616
import kafka.zk.ConfigEntityChangeNotificationZNode;
1717
import kafka.zookeeper.ZNodeChildChangeHandler;
1818
import org.apache.zookeeper.data.Stat;

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
1212
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
1313
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
14-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
14+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
1515
import kafka.zk.ControllerZNode;
1616
import kafka.zookeeper.ZNodeChangeHandler;
1717

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
1010
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
1111
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
12-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
12+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
1313
import kafka.zk.TopicsZNode;
1414
import kafka.zookeeper.ZNodeChildChangeHandler;
1515

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,14 @@
2424
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
2525
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
2626
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
27-
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
2827
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
2928
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
3029
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
3130
import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO;
3231
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
3332
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
3433
import com.xiaojukeji.know.streaming.km.persistence.mysql.broker.BrokerDAO;
35-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
36-
import kafka.zk.BrokerIdZNode;
34+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
3735
import kafka.zk.BrokerIdsZNode;
3836
import org.apache.kafka.clients.admin.*;
3937
import org.apache.kafka.common.Node;
@@ -310,9 +308,7 @@ private Result<List<Broker>> getBrokersFromZKClient(ClusterPhy clusterPhy) {
310308

311309
List<String> brokerIdList = kafkaZKDAO.getChildren(clusterPhy.getId(), BrokerIdsZNode.path(), false);
312310
for (String brokerId: brokerIdList) {
313-
BrokerMetadata metadata = kafkaZKDAO.getData(clusterPhy.getId(), BrokerIdZNode.path(Integer.valueOf(brokerId)), BrokerMetadata.class);
314-
BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);
315-
brokerList.add(Broker.buildFrom(clusterPhy.getId(), Integer.valueOf(brokerId), metadata));
311+
brokerList.add(kafkaZKDAO.getBrokerMetadata(clusterPhy.getId(), Integer.valueOf(brokerId)));
316312
}
317313

318314
return Result.buildSuc(brokerList);

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
1414
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterValidateService;
1515
import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO;
16-
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
17-
import com.xiaojukeji.know.streaming.km.persistence.zk.impl.KafkaZKDAOImpl;
16+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
17+
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl.KafkaZKDAOImpl;
1818
import kafka.server.KafkaConfig;
1919
import lombok.extern.slf4j.Slf4j;
2020
import org.apache.kafka.clients.admin.*;

0 commit comments

Comments
 (0)