Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import io.kafbat.ui.model.KafkaCluster;
import reactor.core.publisher.Mono;

public interface AdminClientService {
public interface AdminClientService extends AutoCloseable {

Mono<ReactiveAdminClient> get(KafkaCluster cluster);

void invalidate(KafkaCluster cluster, Throwable e);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

@Service
@Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable {
public class AdminClientServiceImpl implements AdminClientService {

private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;

Expand Down Expand Up @@ -62,6 +62,21 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
}

@Override
public void invalidate(KafkaCluster cluster, Throwable e) {
if (e.getClass().getCanonicalName().startsWith("org.apache.kafka.common.errors")) {
log.warn("AdminClient for the cluster {} is invalidated due to {}", cluster.getName(), e.getMessage());
ReactiveAdminClient client = adminClientCache.remove(cluster.getName());
if (client != null) {
try {
client.close();
} catch (Exception ce) {
log.info("Error while closing AdminClient for the cluster {}", cluster.getName(), ce);
}
}
}
}

@Override
public void close() {
adminClientCache.values().forEach(ReactiveAdminClient::close);
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/io/kafbat/ui/service/BrokerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
.map(description -> description.getNodes().stream()
.map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics()))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
.flatMapMany(Flux::fromIterable)
.doOnError(e -> adminClientService.invalidate(cluster, e));
}

public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
)
)
).doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
log.error("Failed to collect cluster {} info", cluster.getName(), e)
).doOnError(e -> adminClientService.invalidate(cluster, e))
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
}

Expand Down
7 changes: 5 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topi
.flatMap(ac -> ac.describeTopic(topicName)
.switchIfEmpty(Mono.error(new TopicNotFoundException()))
.then(ac.getTopicsConfig(List.of(topicName), true))
.map(m -> m.values().stream().findFirst().orElse(List.of())));
.map(m -> m.values().stream().findFirst().orElse(List.of())))
.doOnError(e -> adminClientService.invalidate(cluster, e));
}

private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
Expand All @@ -178,6 +179,7 @@ private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient admi
topicData.getPartitions(),
topicData.getReplicationFactor(),
topicData.getConfigs())
.doOnError(e -> adminClientService.invalidate(c, e))
.thenReturn(topicData)
.onErrorMap(t -> new TopicMetadataException(t.getMessage(), t))
.then(loadTopicAfterCreation(c, topicData.getName()));
Expand Down Expand Up @@ -492,7 +494,8 @@ private Mono<List<InternalTopic>> filterExisting(KafkaCluster cluster, Collectio
.map(existing -> topics
.stream()
.filter(s -> existing.contains(s.getName()))
.collect(toList()));
.collect(toList())
).doOnError(e -> adminClientService.invalidate(cluster, e));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kafbat.ui.service;

import static org.assertj.core.api.Assertions.assertThat;

import io.kafbat.ui.AbstractIntegrationTest;
import io.kafbat.ui.model.KafkaCluster;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.jupiter.api.Test;

@Slf4j
class AdminClientServiceTest extends AbstractIntegrationTest {
@Test
void testInvalidateOnFailure() {
AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
KafkaCluster cluster = clustersStorage.getClusterByName(LOCAL).get();
ReactiveAdminClient clientBefore = adminClientService.get(cluster).block();
ReactiveAdminClient clientBeforeRepeat = adminClientService.get(cluster).block();
assertThat(clientBeforeRepeat).isEqualTo(clientBefore);
adminClientService.invalidate(cluster, new TimeoutException());
ReactiveAdminClient clientAfter = adminClientService.get(cluster).block();
assertThat(clientAfter).isNotEqualTo(clientBefore);
}
}
Loading