|
27 | 27 | import kafka.zk.TopicsZNode; |
28 | 28 | import org.apache.kafka.clients.admin.*; |
29 | 29 | import org.apache.kafka.common.TopicPartitionInfo; |
| 30 | +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; |
30 | 31 | import org.springframework.beans.factory.annotation.Autowired; |
31 | 32 | import org.springframework.dao.DuplicateKeyException; |
32 | 33 | import org.springframework.stereotype.Service; |
33 | 34 |
|
34 | 35 | import java.util.*; |
| 36 | +import java.util.concurrent.ExecutionException; |
35 | 37 | import java.util.concurrent.TimeUnit; |
36 | 38 | import java.util.function.Function; |
37 | 39 | import java.util.stream.Collectors; |
@@ -84,6 +86,13 @@ public Map<Integer, List<Integer>> getTopicPartitionMapFromKafka(Long clusterPhy |
84 | 86 | } |
85 | 87 |
|
86 | 88 | return partitionMap; |
| 89 | + } catch (ExecutionException e) { |
| 90 | + log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); |
| 91 | + if (e.getCause() instanceof UnknownTopicOrPartitionException) { |
| 92 | + throw new AdminOperateException(String.format("Kafka does not host Topic:[%s]", topicName), e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); |
| 93 | + } |
| 94 | + |
| 95 | + throw new AdminOperateException("get topic info from kafka failed", e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); |
87 | 96 | } catch (Exception e) { |
88 | 97 | log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); |
89 | 98 | throw new AdminOperateException("get topic info from kafka failed", e, ResultStatus.KAFKA_OPERATE_FAILED); |
|
0 commit comments