Skip to content

Commit d6cbc7f

Browse files
authored
BE: Invalidate Admin client on kafka error (#1468)
1 parent 3a7b70f commit d6cbc7f

File tree

6 files changed

+53
-6
lines changed

6 files changed

+53
-6
lines changed

api/src/main/java/io/kafbat/ui/service/AdminClientService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import io.kafbat.ui.model.KafkaCluster;
44
import reactor.core.publisher.Mono;
55

6-
public interface AdminClientService {
6+
public interface AdminClientService extends AutoCloseable {
77

88
Mono<ReactiveAdminClient> get(KafkaCluster cluster);
99

10+
void invalidate(KafkaCluster cluster, Throwable e);
11+
1012
}

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
@Service
2121
@Slf4j
22-
public class AdminClientServiceImpl implements AdminClientService, Closeable {
22+
public class AdminClientServiceImpl implements AdminClientService {
2323

2424
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
2525

@@ -62,6 +62,21 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
6262
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
6363
}
6464

65+
@Override
66+
public void invalidate(KafkaCluster cluster, Throwable e) {
67+
if (e.getClass().getCanonicalName().startsWith("org.apache.kafka.common.errors")) {
68+
log.warn("AdminClient for the cluster {} is invalidated due to {}", cluster.getName(), e.getMessage());
69+
ReactiveAdminClient client = adminClientCache.remove(cluster.getName());
70+
if (client != null) {
71+
try {
72+
client.close();
73+
} catch (Exception ce) {
74+
log.info("Error while closing AdminClient for the cluster {}", cluster.getName(), ce);
75+
}
76+
}
77+
}
78+
}
79+
6580
@Override
6681
public void close() {
6782
adminClientCache.values().forEach(ReactiveAdminClient::close);

api/src/main/java/io/kafbat/ui/service/BrokerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
7373
.map(description -> description.getNodes().stream()
7474
.map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics()))
7575
.collect(Collectors.toList()))
76-
.flatMapMany(Flux::fromIterable);
76+
.flatMapMany(Flux::fromIterable)
77+
.doOnError(e -> adminClientService.invalidate(cluster, e));
7778
}
7879

7980
public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
4949
)
5050
)
5151
).doOnError(e ->
52-
log.error("Failed to collect cluster {} info", cluster.getName(), e))
52+
log.error("Failed to collect cluster {} info", cluster.getName(), e)
53+
).doOnError(e -> adminClientService.invalidate(cluster, e))
5354
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
5455
}
5556

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topi
169169
.flatMap(ac -> ac.describeTopic(topicName)
170170
.switchIfEmpty(Mono.error(new TopicNotFoundException()))
171171
.then(ac.getTopicsConfig(List.of(topicName), true))
172-
.map(m -> m.values().stream().findFirst().orElse(List.of())));
172+
.map(m -> m.values().stream().findFirst().orElse(List.of())))
173+
.doOnError(e -> adminClientService.invalidate(cluster, e));
173174
}
174175

175176
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
@@ -178,6 +179,7 @@ private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient admi
178179
topicData.getPartitions(),
179180
topicData.getReplicationFactor(),
180181
topicData.getConfigs())
182+
.doOnError(e -> adminClientService.invalidate(c, e))
181183
.thenReturn(topicData)
182184
.onErrorMap(t -> new TopicMetadataException(t.getMessage(), t))
183185
.then(loadTopicAfterCreation(c, topicData.getName()));
@@ -492,7 +494,8 @@ private Mono<List<InternalTopic>> filterExisting(KafkaCluster cluster, Collectio
492494
.map(existing -> topics
493495
.stream()
494496
.filter(s -> existing.contains(s.getName()))
495-
.collect(toList()));
497+
.collect(toList())
498+
).doOnError(e -> adminClientService.invalidate(cluster, e));
496499
}
497500

498501
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.kafbat.ui.service;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.AbstractIntegrationTest;
6+
import io.kafbat.ui.model.KafkaCluster;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.kafka.common.errors.TimeoutException;
9+
import org.junit.jupiter.api.Test;
10+
11+
@Slf4j
12+
class AdminClientServiceTest extends AbstractIntegrationTest {
13+
@Test
14+
void testInvalidateOnFailure() {
15+
AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
16+
ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
17+
KafkaCluster cluster = clustersStorage.getClusterByName(LOCAL).get();
18+
ReactiveAdminClient clientBefore = adminClientService.get(cluster).block();
19+
ReactiveAdminClient clientBeforeRepeat = adminClientService.get(cluster).block();
20+
assertThat(clientBeforeRepeat).isEqualTo(clientBefore);
21+
adminClientService.invalidate(cluster, new TimeoutException());
22+
ReactiveAdminClient clientAfter = adminClientService.get(cluster).block();
23+
assertThat(clientAfter).isNotEqualTo(clientBefore);
24+
}
25+
}

0 commit comments

Comments
 (0)