Skip to content

Commit 64f79e3

Browse files
committed
Fixed partitions stats storage in cache
1 parent 70eeb5f commit 64f79e3

File tree

3 files changed

+176
-20
lines changed

3 files changed

+176
-20
lines changed

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.kafbat.ui.model;
22

3+
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
4+
5+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
36
import java.math.BigDecimal;
47
import java.util.List;
58
import java.util.Map;
@@ -10,6 +13,7 @@
1013
import lombok.Data;
1114
import org.apache.kafka.clients.admin.ConfigEntry;
1215
import org.apache.kafka.clients.admin.TopicDescription;
16+
import org.apache.kafka.common.config.TopicConfig;
1317

1418
@Data
1519
@Builder(toBuilder = true)
@@ -132,7 +136,96 @@ public static InternalTopic from(TopicDescription topicDescription,
132136

133137
topic.cleanUpPolicy(
134138
configs.stream()
135-
.filter(config -> config.name().equals("cleanup.policy"))
139+
.filter(config -> config.name().equals(CLEANUP_POLICY_CONFIG))
140+
.findFirst()
141+
.map(ConfigEntry::value)
142+
.map(CleanupPolicy::fromString)
143+
.orElse(CleanupPolicy.UNKNOWN)
144+
);
145+
146+
return topic.build();
147+
}
148+
149+
public static InternalTopic from(ScrapedClusterState.TopicState topicState,
150+
@Nullable String internalTopicPrefix) {
151+
var topic = InternalTopic.builder();
152+
TopicDescription topicDescription = topicState.description();
153+
154+
internalTopicPrefix = internalTopicPrefix == null || internalTopicPrefix.isEmpty()
155+
? "_"
156+
: internalTopicPrefix;
157+
158+
topic.internal(
159+
topicDescription.isInternal() || topicDescription.name().startsWith(internalTopicPrefix)
160+
);
161+
topic.name(topicDescription.name());
162+
163+
List<InternalPartition> partitions = topicDescription.partitions().stream()
164+
.map(partition -> {
165+
var partitionDto = InternalPartition.builder();
166+
167+
partitionDto.leader(partition.leader() != null ? partition.leader().id() : null);
168+
partitionDto.partition(partition.partition());
169+
partitionDto.inSyncReplicasCount(partition.isr().size());
170+
partitionDto.replicasCount(partition.replicas().size());
171+
List<InternalReplica> replicas = partition.replicas().stream()
172+
.map(r ->
173+
InternalReplica.builder()
174+
.broker(r.id())
175+
.inSync(partition.isr().contains(r))
176+
.leader(partition.leader() != null && partition.leader().id() == r.id())
177+
.build())
178+
.collect(Collectors.toList());
179+
partitionDto.replicas(replicas);
180+
181+
Optional.ofNullable(
182+
topicState.startOffsets().get(partition.partition())
183+
).ifPresent(partitionDto::offsetMin);
184+
185+
Optional.ofNullable(
186+
topicState.endOffsets().get(partition.partition())
187+
).ifPresent(partitionDto::offsetMax);
188+
189+
Optional.ofNullable(topicState.partitionsSegmentStats())
190+
.flatMap(s -> Optional.ofNullable(s.get(partition.partition())))
191+
.ifPresent(stats -> {
192+
partitionDto.segmentCount(stats.getSegmentsCount());
193+
partitionDto.segmentSize(stats.getSegmentSize());
194+
});
195+
196+
197+
return partitionDto.build();
198+
})
199+
.toList();
200+
201+
topic.partitions(partitions.stream().collect(
202+
Collectors.toMap(InternalPartition::getPartition, t -> t)));
203+
204+
var partitionsStats = new PartitionsStats(topicDescription);
205+
topic.replicas(partitionsStats.getReplicasCount());
206+
topic.partitionCount(partitionsStats.getPartitionsCount());
207+
topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount());
208+
topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount());
209+
210+
topic.replicationFactor(
211+
topicDescription.partitions().isEmpty()
212+
? 0
213+
: topicDescription.partitions().get(0).replicas().size()
214+
);
215+
216+
Optional.ofNullable(topicState.segmentStats())
217+
.ifPresent(stats -> {
218+
topic.segmentCount(stats.getSegmentsCount());
219+
topic.segmentSize(stats.getSegmentSize());
220+
});
221+
222+
topic.topicConfigs(
223+
topicState.configs().stream().map(InternalTopicConfig::from).collect(Collectors.toList())
224+
);
225+
226+
topic.cleanUpPolicy(
227+
topicState.configs().stream()
228+
.filter(config -> config.name().equals(CLEANUP_POLICY_CONFIG))
136229
.findFirst()
137230
.map(ConfigEntry::value)
138231
.map(CleanupPolicy::fromString)

api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private static ScrapedClusterState create(ClusterDescription clusterDescription,
210210
}
211211

212212
private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties,
213-
Map<String, TopicState> topicStates) {
213+
Map<String, TopicState> topicStates) {
214214
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
215215
List<InternalTopic> topics = topicStates.values().stream().map(
216216
topicState -> buildInternalTopic(topicState, clustersProperties)
@@ -233,17 +233,8 @@ private static <T> Map<Integer, T> filterTopic(String topicForFilter, Map<TopicP
233233
.collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue));
234234
}
235235

236-
private static InternalTopic buildInternalTopic(TopicState state, ClustersProperties clustersProperties) {
237-
return InternalTopic.from(
238-
state.description(),
239-
state.configs(),
240-
InternalPartitionsOffsets.empty(),
241-
null,
242-
state.segmentStats(),
243-
state.partitionsSegmentStats(),
244-
clustersProperties.getInternalTopicPrefix()
245-
);
236+
private static InternalTopic buildInternalTopic(TopicState state,
237+
ClustersProperties clustersProperties) {
238+
return InternalTopic.from(state, clustersProperties.getInternalTopicPrefix());
246239
}
247-
248-
249240
}

api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package io.kafbat.ui.service;
22

3+
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
34
import static org.assertj.core.api.Assertions.assertThat;
45
import static org.mockito.ArgumentMatchers.any;
56
import static org.mockito.ArgumentMatchers.anyBoolean;
67
import static org.mockito.ArgumentMatchers.anyList;
78
import static org.mockito.ArgumentMatchers.isA;
8-
import static org.mockito.ArgumentMatchers.isNull;
99
import static org.mockito.Mockito.mock;
1010
import static org.mockito.Mockito.when;
1111

1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.controller.TopicsController;
1414
import io.kafbat.ui.mapper.ClusterMapper;
1515
import io.kafbat.ui.mapper.ClusterMapperImpl;
16-
import io.kafbat.ui.model.InternalLogDirStats;
16+
import io.kafbat.ui.model.CleanupPolicy;
1717
import io.kafbat.ui.model.InternalPartition;
1818
import io.kafbat.ui.model.InternalPartitionsOffsets;
1919
import io.kafbat.ui.model.InternalTopic;
@@ -27,7 +27,6 @@
2727
import io.kafbat.ui.service.audit.AuditService;
2828
import io.kafbat.ui.service.rbac.AccessControlService;
2929
import io.kafbat.ui.util.AccessControlServiceMock;
30-
import java.util.ArrayList;
3130
import java.util.Comparator;
3231
import java.util.List;
3332
import java.util.Map;
@@ -37,10 +36,11 @@
3736
import java.util.function.Function;
3837
import java.util.stream.Collectors;
3938
import java.util.stream.IntStream;
39+
import org.apache.kafka.clients.admin.ConfigEntry;
4040
import org.apache.kafka.clients.admin.TopicDescription;
41+
import org.apache.kafka.common.TopicPartition;
4142
import org.apache.kafka.common.TopicPartitionInfo;
4243
import org.junit.jupiter.api.Test;
43-
import org.mockito.ArgumentMatchers;
4444
import org.mockito.Mockito;
4545
import reactor.core.publisher.Mono;
4646

@@ -70,6 +70,18 @@ class TopicsServicePaginationTest {
7070
private void init(Map<String, InternalTopic> topicsInCache) {
7171
KafkaCluster kafkaCluster = buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME);
7272
statisticsCache.replace(kafkaCluster, Statistics.empty());
73+
74+
Map<TopicPartition, InternalPartitionsOffsets.Offsets> offsets = topicsInCache.values().stream()
75+
.flatMap(t ->
76+
t.getPartitions().values().stream()
77+
.map(p ->
78+
Map.entry(
79+
new TopicPartition(t.getName(), p.getPartition()),
80+
new InternalPartitionsOffsets.Offsets(p.getOffsetMin(), p.getOffsetMax())
81+
)
82+
)
83+
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
84+
7385
statisticsCache.update(
7486
kafkaCluster,
7587
topicsInCache.entrySet().stream().collect(
@@ -78,8 +90,11 @@ private void init(Map<String, InternalTopic> topicsInCache) {
7890
v -> toTopicDescription(v.getValue())
7991
)
8092
),
81-
Map.of(),
82-
new InternalPartitionsOffsets(Map.of()),
93+
topicsInCache.entrySet().stream()
94+
.map(t ->
95+
Map.entry(t.getKey(), List.of(new ConfigEntry(CLEANUP_POLICY_CONFIG, "delete")))
96+
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
97+
new InternalPartitionsOffsets(offsets),
8398
clustersProperties
8499
);
85100
when(adminClientService.get(isA(KafkaCluster.class))).thenReturn(Mono.just(reactiveAdminClient));
@@ -318,4 +333,61 @@ void shouldListTopicsOrderedByPartitionsCount() {
318333
);
319334
}
320335

336+
@Test
337+
void shouldListTopicsOrderedByMessagesCount() {
338+
Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
339+
.map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
340+
IntStream.range(0, i)
341+
.mapToObj(p ->
342+
new TopicPartitionInfo(p, null, List.of(), List.of()))
343+
.collect(Collectors.toList())))
344+
.map(topicDescription ->
345+
InternalTopic.from(topicDescription, List.of(),
346+
new InternalPartitionsOffsets(
347+
topicDescription.partitions().stream()
348+
.map(p -> Map.entry(
349+
new TopicPartition(topicDescription.name(), p.partition()),
350+
new InternalPartitionsOffsets.Offsets(0L, (long) p.partition())
351+
)).collect(Collectors.toMap(
352+
Map.Entry::getKey,
353+
Map.Entry::getValue
354+
))
355+
),
356+
Metrics.empty(), null, null, "_")
357+
.toBuilder().cleanUpPolicy(CleanupPolicy.DELETE).build()
358+
).collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
359+
360+
init(internalTopics);
361+
362+
var topicsSortedAsc = topicsController
363+
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
364+
null, TopicColumnsToSortDTO.MESSAGES_COUNT, null, null).block();
365+
366+
assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
367+
assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
368+
assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
369+
internalTopics.values().stream()
370+
.map(clusterMapper::toTopic)
371+
.sorted(Comparator.comparing(
372+
(t) -> t.getMessagesCount().get()
373+
))
374+
.limit(25)
375+
.collect(Collectors.toList())
376+
);
377+
378+
var topicsSortedDesc = topicsController
379+
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
380+
null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
381+
382+
assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
383+
assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
384+
assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
385+
internalTopics.values().stream()
386+
.map(clusterMapper::toTopic)
387+
.sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
388+
.limit(25)
389+
.collect(Collectors.toList())
390+
);
391+
}
392+
321393
}

0 commit comments

Comments
 (0)