Skip to content

Commit 92e3d43

Browse files
authored
Merge branch 'main' into fix-null-value-internal-topic
2 parents a901bc2 + 13f545b commit 92e3d43

File tree

17 files changed

+385
-57
lines changed

17 files changed

+385
-57
lines changed

api/src/main/java/io/kafbat/ui/service/AdminClientService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import io.kafbat.ui.model.KafkaCluster;
44
import reactor.core.publisher.Mono;
55

6-
public interface AdminClientService {
6+
public interface AdminClientService extends AutoCloseable {
77

88
Mono<ReactiveAdminClient> get(KafkaCluster cluster);
99

10+
void invalidate(KafkaCluster cluster, Throwable e);
11+
1012
}

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
@Service
2121
@Slf4j
22-
public class AdminClientServiceImpl implements AdminClientService, Closeable {
22+
public class AdminClientServiceImpl implements AdminClientService {
2323

2424
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
2525

@@ -62,6 +62,21 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
6262
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
6363
}
6464

65+
@Override
66+
public void invalidate(KafkaCluster cluster, Throwable e) {
67+
if (e.getClass().getCanonicalName().startsWith("org.apache.kafka.common.errors")) {
68+
log.warn("AdminClient for the cluster {} is invalidated due to {}", cluster.getName(), e.getMessage());
69+
ReactiveAdminClient client = adminClientCache.remove(cluster.getName());
70+
if (client != null) {
71+
try {
72+
client.close();
73+
} catch (Exception ce) {
74+
log.info("Error while closing AdminClient for the cluster {}", cluster.getName(), ce);
75+
}
76+
}
77+
}
78+
}
79+
6580
@Override
6681
public void close() {
6782
adminClientCache.values().forEach(ReactiveAdminClient::close);

api/src/main/java/io/kafbat/ui/service/BrokerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
7373
.map(description -> description.getNodes().stream()
7474
.map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics()))
7575
.collect(Collectors.toList()))
76-
.flatMapMany(Flux::fromIterable);
76+
.flatMapMany(Flux::fromIterable)
77+
.doOnError(e -> adminClientService.invalidate(cluster, e));
7778
}
7879

7980
public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
4949
)
5050
)
5151
).doOnError(e ->
52-
log.error("Failed to collect cluster {} info", cluster.getName(), e))
52+
log.error("Failed to collect cluster {} info", cluster.getName(), e)
53+
).doOnError(e -> adminClientService.invalidate(cluster, e))
5354
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
5455
}
5556

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topi
169169
.flatMap(ac -> ac.describeTopic(topicName)
170170
.switchIfEmpty(Mono.error(new TopicNotFoundException()))
171171
.then(ac.getTopicsConfig(List.of(topicName), true))
172-
.map(m -> m.values().stream().findFirst().orElse(List.of())));
172+
.map(m -> m.values().stream().findFirst().orElse(List.of())))
173+
.doOnError(e -> adminClientService.invalidate(cluster, e));
173174
}
174175

175176
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
@@ -178,6 +179,7 @@ private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient admi
178179
topicData.getPartitions(),
179180
topicData.getReplicationFactor(),
180181
topicData.getConfigs())
182+
.doOnError(e -> adminClientService.invalidate(c, e))
181183
.thenReturn(topicData)
182184
.onErrorMap(t -> new TopicMetadataException(t.getMessage(), t))
183185
.then(loadTopicAfterCreation(c, topicData.getName()));
@@ -492,7 +494,8 @@ private Mono<List<InternalTopic>> filterExisting(KafkaCluster cluster, Collectio
492494
.map(existing -> topics
493495
.stream()
494496
.filter(s -> existing.contains(s.getName()))
495-
.collect(toList()));
497+
.collect(toList())
498+
).doOnError(e -> adminClientService.invalidate(cluster, e));
496499
}
497500

498501
}

api/src/test/java/io/kafbat/ui/mapper/KafkaConnectMapperTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ void toKafkaConnect() {
4242

4343
ConnectorDTO connectorDto = new ConnectorDTO();
4444
connectorDto.setName(UUID.randomUUID().toString());
45+
46+
String traceMessage = connectorState == ConnectorStateDTO.FAILED
47+
? "Test error trace for failed connector"
48+
: null;
49+
4550
connectorDto.setStatus(
46-
new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString())
51+
new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString(), traceMessage)
4752
);
4853

4954
List<TaskDTO> tasks = new ArrayList<>();
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.kafbat.ui.service;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.AbstractIntegrationTest;
6+
import io.kafbat.ui.model.KafkaCluster;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.kafka.common.errors.TimeoutException;
9+
import org.junit.jupiter.api.Test;
10+
11+
@Slf4j
12+
class AdminClientServiceTest extends AbstractIntegrationTest {
13+
@Test
14+
void testInvalidateOnFailure() {
15+
AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
16+
ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
17+
KafkaCluster cluster = clustersStorage.getClusterByName(LOCAL).get();
18+
ReactiveAdminClient clientBefore = adminClientService.get(cluster).block();
19+
ReactiveAdminClient clientBeforeRepeat = adminClientService.get(cluster).block();
20+
assertThat(clientBeforeRepeat).isEqualTo(clientBefore);
21+
adminClientService.invalidate(cluster, new TimeoutException());
22+
ReactiveAdminClient clientAfter = adminClientService.get(cluster).block();
23+
assertThat(clientAfter).isNotEqualTo(clientBefore);
24+
}
25+
}

api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,22 @@ class KafkaConnectNgramFilterTest extends AbstractNgramFilterTest<FullConnectorI
1414

1515
@Override
1616
protected NgramFilter<FullConnectorInfoDTO> buildFilter(List<FullConnectorInfoDTO> items,
17-
boolean enabled,
18-
ClustersProperties.NgramProperties ngramProperties) {
17+
boolean enabled,
18+
ClustersProperties.NgramProperties ngramProperties) {
1919
return new KafkaConnectNgramFilter(items, enabled, ngramProperties);
2020
}
2121

2222
@Override
2323
protected List<FullConnectorInfoDTO> items() {
24-
return IntStream.range(0, 100).mapToObj(i ->
25-
new FullConnectorInfoDTO(
26-
"connect-" + i,
27-
"connector-" + i,
28-
"class",
29-
ConnectorTypeDTO.SINK,
30-
List.of(),
31-
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "reason"),
32-
1,
33-
0
34-
)
35-
).toList();
24+
return IntStream.range(0, 100).mapToObj(i -> new FullConnectorInfoDTO(
25+
"connect-" + i,
26+
"connector-" + i,
27+
"class",
28+
ConnectorTypeDTO.SINK,
29+
List.of(),
30+
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "worker-1", "reason"),
31+
1,
32+
0)).toList();
3633
}
3734

3835
@Override
@@ -55,7 +52,7 @@ protected List<FullConnectorInfoDTO> sortedItems() {
5552
"class",
5653
ConnectorTypeDTO.SINK,
5754
List.of(),
58-
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "reason"),
55+
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"),
5956
1,
6057
0
6158
),
@@ -65,7 +62,7 @@ protected List<FullConnectorInfoDTO> sortedItems() {
6562
"class",
6663
ConnectorTypeDTO.SINK,
6764
List.of(),
68-
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "reason"),
65+
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"),
6966
1,
7067
0
7168
)
@@ -86,20 +83,17 @@ protected List<FullConnectorInfoDTO> sortedResult(List<FullConnectorInfoDTO> ite
8683
"class",
8784
ConnectorTypeDTO.SINK,
8885
List.of(),
89-
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "reason"),
86+
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"),
9087
1,
91-
0
92-
),
88+
0),
9389
new FullConnectorInfoDTO(
9490
"connect-pay",
9591
"connector-pay",
9692
"class",
9793
ConnectorTypeDTO.SINK,
9894
List.of(),
99-
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "reason"),
95+
new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"),
10096
1,
101-
0
102-
)
103-
);
97+
0));
10498
}
10599
}

contract-typespec/api/kafka-connect.tsp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ enum ConnectorState {
217217
model ConnectorStatus {
218218
state: ConnectorState;
219219
workerId?: string;
220+
trace?: string;
220221
}
221222

222223
model Connector {

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3756,6 +3756,8 @@ components:
37563756
$ref: '#/components/schemas/ConnectorState'
37573757
workerId:
37583758
type: string
3759+
trace:
3760+
type: string
37593761
required:
37603762
- state
37613763

0 commit comments

Comments
 (0)