diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientService.java b/api/src/main/java/io/kafbat/ui/service/AdminClientService.java index f73a6bf6c..efaeba143 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientService.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientService.java @@ -3,8 +3,10 @@ import io.kafbat.ui.model.KafkaCluster; import reactor.core.publisher.Mono; -public interface AdminClientService { +public interface AdminClientService extends AutoCloseable { Mono get(KafkaCluster cluster); + void invalidate(KafkaCluster cluster, Throwable e); + } diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index a2cf96578..b6ff631b6 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -19,7 +19,7 @@ @Service @Slf4j -public class AdminClientServiceImpl implements AdminClientService, Closeable { +public class AdminClientServiceImpl implements AdminClientService { private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000; @@ -62,6 +62,21 @@ private Mono createAdminClient(KafkaCluster cluster) { "Error while creating AdminClient for the cluster " + cluster.getName(), th)); } + @Override + public void invalidate(KafkaCluster cluster, Throwable e) { + if (e.getClass().getCanonicalName().startsWith("org.apache.kafka.common.errors")) { + log.warn("AdminClient for the cluster {} is invalidated due to {}", cluster.getName(), e.getMessage()); + ReactiveAdminClient client = adminClientCache.remove(cluster.getName()); + if (client != null) { + try { + client.close(); + } catch (Exception ce) { + log.info("Error while closing AdminClient for the cluster {}", cluster.getName(), ce); + } + } + } + } + @Override public void close() { adminClientCache.values().forEach(ReactiveAdminClient::close); diff --git a/api/src/main/java/io/kafbat/ui/service/BrokerService.java b/api/src/main/java/io/kafbat/ui/service/BrokerService.java index c1ae5fb1d..1b7bf047b 100644 --- a/api/src/main/java/io/kafbat/ui/service/BrokerService.java +++ b/api/src/main/java/io/kafbat/ui/service/BrokerService.java @@ -73,7 +73,8 @@ public Flux getBrokers(KafkaCluster cluster) { .map(description -> description.getNodes().stream() .map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics())) .collect(Collectors.toList())) - .flatMapMany(Flux::fromIterable); + .flatMapMany(Flux::fromIterable) + .doOnError(e -> adminClientService.invalidate(cluster, e)); } public Mono updateBrokerLogDir(KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java index 985c95784..f773bd917 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java @@ -49,7 +49,8 @@ private Mono getStatistics(KafkaCluster cluster) { ) ) ).doOnError(e -> - log.error("Failed to collect cluster {} info", cluster.getName(), e)) + log.error("Failed to collect cluster {} info", cluster.getName(), e) + ).doOnError(e -> adminClientService.invalidate(cluster, e)) .onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t)))); } diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 8f15d01e8..4bb5a8fbf 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -169,7 +169,8 @@ public Mono> getTopicConfigs(KafkaCluster cluster, String topi .flatMap(ac -> ac.describeTopic(topicName) .switchIfEmpty(Mono.error(new TopicNotFoundException())) .then(ac.getTopicsConfig(List.of(topicName), true)) - .map(m -> m.values().stream().findFirst().orElse(List.of()))); + .map(m -> m.values().stream().findFirst().orElse(List.of()))) + .doOnError(e -> adminClientService.invalidate(cluster, e)); } private Mono createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) { @@ -178,6 +179,7 @@ private Mono createTopic(KafkaCluster c, ReactiveAdminClient admi topicData.getPartitions(), topicData.getReplicationFactor(), topicData.getConfigs()) + .doOnError(e -> adminClientService.invalidate(c, e)) .thenReturn(topicData) .onErrorMap(t -> new TopicMetadataException(t.getMessage(), t)) .then(loadTopicAfterCreation(c, topicData.getName())); @@ -492,7 +494,8 @@ private Mono> filterExisting(KafkaCluster cluster, Collectio .map(existing -> topics .stream() .filter(s -> existing.contains(s.getName())) - .collect(toList())); + .collect(toList()) + ).doOnError(e -> adminClientService.invalidate(cluster, e)); } } diff --git a/api/src/test/java/io/kafbat/ui/service/AdminClientServiceTest.java b/api/src/test/java/io/kafbat/ui/service/AdminClientServiceTest.java new file mode 100644 index 000000000..5fbec3b4c --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/AdminClientServiceTest.java @@ -0,0 +1,25 @@ +package io.kafbat.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.AbstractIntegrationTest; +import io.kafbat.ui.model.KafkaCluster; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.errors.TimeoutException; +import org.junit.jupiter.api.Test; + +@Slf4j +class AdminClientServiceTest extends AbstractIntegrationTest { + @Test + void testInvalidateOnFailure() { + AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class); + ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class); + KafkaCluster cluster = clustersStorage.getClusterByName(LOCAL).get(); + ReactiveAdminClient clientBefore = adminClientService.get(cluster).block(); + ReactiveAdminClient clientBeforeRepeat = adminClientService.get(cluster).block(); + assertThat(clientBeforeRepeat).isEqualTo(clientBefore); + adminClientService.invalidate(cluster, new TimeoutException()); + ReactiveAdminClient clientAfter = adminClientService.get(cluster).block(); + assertThat(clientAfter).isNotEqualTo(clientBefore); + } +}