Skip to content

Commit fc1e8cc

Browse files
committed
refactor: use record classes
1 parent 318bcc9 commit fc1e8cc

37 files changed

+115
-219
lines changed

api/src/main/java/io/kafbat/ui/model/InternalClusterState.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package io.kafbat.ui.model;
22

33
import com.google.common.base.Throwables;
4-
import io.kafbat.ui.model.BrokerDiskUsageDTO;
5-
import io.kafbat.ui.model.MetricsCollectionErrorDTO;
6-
import io.kafbat.ui.model.ServerStatusDTO;
74
import java.math.BigDecimal;
85
import java.util.List;
96
import java.util.Optional;
@@ -40,8 +37,8 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
4037
.stackTrace(Throwables.getStackTraceAsString(e)))
4138
.orElse(null);
4239
topicCount = statistics.getTopicDescriptions().size();
43-
brokerCount = statistics.getClusterDescription().getNodes().size();
44-
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
40+
brokerCount = statistics.getClusterDescription().nodes().size();
41+
activeControllers = Optional.ofNullable(statistics.getClusterDescription().controller())
4542
.map(Node::id)
4643
.orElse(null);
4744
version = statistics.getVersion();

api/src/main/java/io/kafbat/ui/model/InternalPartitionsOffsets.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,11 @@
44
import com.google.common.collect.Table;
55
import java.util.Map;
66
import java.util.Optional;
7-
import lombok.Value;
87
import org.apache.kafka.common.TopicPartition;
98

10-
119
public class InternalPartitionsOffsets {
1210

13-
@Value
14-
public static class Offsets {
15-
Long earliest;
16-
Long latest;
11+
public record Offsets(Long earliest, Long latest) {
1712
}
1813

1914
private final Table<String, Integer, Offsets> offsets = HashBasedTable.create();
Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
package io.kafbat.ui.model;
22

33
import lombok.Builder;
4-
import lombok.Data;
5-
import lombok.RequiredArgsConstructor;
64

7-
@Data
85
@Builder
9-
@RequiredArgsConstructor
10-
public class InternalReplica {
11-
private final int broker;
12-
private final boolean leader;
13-
private final boolean inSync;
6+
public record InternalReplica(int broker, boolean leader, boolean inSync) {
147
}

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public static InternalTopic from(TopicDescription topicDescription,
7777

7878
partitionsOffsets.get(topicDescription.name(), partition.partition())
7979
.ifPresent(offsets -> {
80-
partitionDto.offsetMin(offsets.getEarliest());
81-
partitionDto.offsetMax(offsets.getLatest());
80+
partitionDto.offsetMin(offsets.earliest());
81+
partitionDto.offsetMax(offsets.latest());
8282
});
8383

8484
var segmentStats =

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec)
7979
}
8080
try {
8181
var deserResult = keyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
82-
message.setKey(deserResult.getResult());
82+
message.setKey(deserResult.result());
8383
message.setKeySerde(keySerdeName);
84-
message.setKeyDeserializeProperties(deserResult.getAdditionalProperties());
84+
message.setKeyDeserializeProperties(deserResult.additionalProperties());
8585
} catch (Exception e) {
8686
log.trace("Error deserializing key for key topic: {}, partition {}, offset {}, with serde {}",
8787
rec.topic(), rec.partition(), rec.offset(), keySerdeName, e);
8888
var deserResult = fallbackKeyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
89-
message.setKey(deserResult.getResult());
89+
message.setKey(deserResult.result());
9090
message.setKeySerde(fallbackSerdeName);
9191
}
9292
}
@@ -98,15 +98,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
9898
try {
9999
var deserResult = valueDeserializer.deserialize(
100100
new RecordHeadersImpl(rec.headers()), rec.value().get());
101-
message.setContent(deserResult.getResult());
101+
message.setContent(deserResult.result());
102102
message.setValueSerde(valueSerdeName);
103-
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
103+
message.setValueDeserializeProperties(deserResult.additionalProperties());
104104
} catch (Exception e) {
105105
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
106106
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
107107
var deserResult = fallbackValueDeserializer.deserialize(
108108
new RecordHeadersImpl(rec.headers()), rec.value().get());
109-
message.setContent(deserResult.getResult());
109+
message.setContent(deserResult.result());
110110
message.setValueSerde(fallbackSerdeName);
111111
}
112112
}

api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,10 @@
1818
import java.util.concurrent.ConcurrentHashMap;
1919
import java.util.stream.Collectors;
2020
import lombok.SneakyThrows;
21-
import lombok.Value;
22-
2321

2422
class CustomSerdeLoader {
2523

26-
@Value
27-
static class CustomSerde {
28-
Serde serde;
29-
ClassLoader classLoader;
24+
record CustomSerde(Serde serde, ClassLoader classLoader) {
3025
}
3126

3227
// serde location -> classloader

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,10 @@ private SerdeInstance loadAndInitCustomSerde(ClustersProperties.SerdeConfig serd
266266
serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
267267
return new SerdeInstance(
268268
serdeConfig.getName(),
269-
loaded.getSerde(),
269+
loaded.serde(),
270270
nullablePattern(serdeConfig.getTopicKeysPattern()),
271271
nullablePattern(serdeConfig.getTopicValuesPattern()),
272-
loaded.getClassLoader()
272+
loaded.classLoader()
273273
);
274274
}
275275

api/src/main/java/io/kafbat/ui/service/BrokerService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private Mono<List<ConfigEntry>> loadBrokersConfig(
5252
}
5353

5454
private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
55-
if (statisticsCache.get(cluster).getClusterDescription().getNodes()
55+
if (statisticsCache.get(cluster).getClusterDescription().nodes()
5656
.stream().noneMatch(node -> node.id() == brokerId)) {
5757
return Flux.error(
5858
new NotFoundException(String.format("Broker with id %s not found", brokerId)));
@@ -70,7 +70,7 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
7070
return adminClientService
7171
.get(cluster)
7272
.flatMap(ReactiveAdminClient::describeCluster)
73-
.map(description -> description.getNodes().stream()
73+
.map(description -> description.nodes().stream()
7474
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
7575
.collect(Collectors.toList()))
7676
.flatMapMany(Flux::fromIterable);
@@ -113,7 +113,7 @@ private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getC
113113
KafkaCluster cluster, List<Integer> reqBrokers) {
114114
return adminClientService.get(cluster)
115115
.flatMap(admin -> {
116-
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
116+
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().nodes()
117117
.stream()
118118
.map(Node::id)
119119
.collect(Collectors.toList());

api/src/main/java/io/kafbat/ui/service/DeserializationService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
143143
return new SerdeDescriptionDTO()
144144
.name(serdeInstance.getName())
145145
.description(serdeInstance.description().orElse(null))
146-
.schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
147-
.additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
146+
.schema(schemaOpt.map(SchemaDescription::schema).orElse(null))
147+
.additionalProperties(schemaOpt.map(SchemaDescription::additionalProperties).orElse(null))
148148
.preferred(preferred);
149149
}
150150

api/src/main/java/io/kafbat/ui/service/FeatureService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
6161
}
6262

6363
private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
64-
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
64+
var authorizedOps = Optional.ofNullable(clusterDescription.authorizedOperations()).orElse(Set.of());
6565
boolean canEdit = aclViewEnabled(adminClient)
6666
&& (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
6767
return canEdit

0 commit comments

Comments
 (0)