diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index 32ebc2936..aaa9a55d9 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -1,5 +1,8 @@ package io.kafbat.ui.model; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; + +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import java.math.BigDecimal; import java.util.List; import java.util.Map; @@ -10,6 +13,8 @@ import lombok.Data; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; @Data @Builder(toBuilder = true) @@ -132,7 +137,7 @@ public static InternalTopic from(TopicDescription topicDescription, topic.cleanUpPolicy( configs.stream() - .filter(config -> config.name().equals("cleanup.policy")) + .filter(config -> config.name().equals(CLEANUP_POLICY_CONFIG)) .findFirst() .map(ConfigEntry::value) .map(CleanupPolicy::fromString) @@ -142,6 +147,31 @@ public static InternalTopic from(TopicDescription topicDescription, return topic.build(); } + public static InternalTopic from(ScrapedClusterState.TopicState topicState, + @Nullable String internalTopicPrefix) { + Map offsets = + topicState.description().partitions().stream().map(p -> Map.entry( + new TopicPartition(topicState.name(), p.partition()), + new InternalPartitionsOffsets.Offsets( + topicState.startOffsets().get(p.partition()), + topicState.endOffsets().get(p.partition()) + ) + ) + ).filter(e -> + e.getValue().getEarliest() != null && e.getValue().getLatest() != null + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return from( + topicState.description(), + topicState.configs(), + new InternalPartitionsOffsets(offsets), + null, + topicState.segmentStats(), + topicState.partitionsSegmentStats(), + internalTopicPrefix + ); + } + public @Nullable Long getMessagesCount() { Long result = null; if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) { diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java index fe8c2e6fc..685aa2a0d 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java @@ -210,7 +210,7 @@ private static ScrapedClusterState create(ClusterDescription clusterDescription, } private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties, - Map topicStates) { + Map topicStates) { ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts(); List topics = topicStates.values().stream().map( topicState -> buildInternalTopic(topicState, clustersProperties) @@ -233,17 +233,8 @@ private static Map filterTopic(String topicForFilter, Map e.getKey().partition(), Map.Entry::getValue)); } - private static InternalTopic buildInternalTopic(TopicState state, ClustersProperties clustersProperties) { - return InternalTopic.from( - state.description(), - state.configs(), - InternalPartitionsOffsets.empty(), - null, - state.segmentStats(), - state.partitionsSegmentStats(), - clustersProperties.getInternalTopicPrefix() - ); + private static InternalTopic buildInternalTopic(TopicState state, + ClustersProperties clustersProperties) { + return InternalTopic.from(state, clustersProperties.getInternalTopicPrefix()); } - - } diff --git a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java index 9ad9c651f..ff3a621ed 100644 --- a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java @@ -1,11 +1,11 @@ package io.kafbat.ui.service; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -13,7 +13,7 @@ import io.kafbat.ui.controller.TopicsController; import io.kafbat.ui.mapper.ClusterMapper; import io.kafbat.ui.mapper.ClusterMapperImpl; -import io.kafbat.ui.model.InternalLogDirStats; +import io.kafbat.ui.model.CleanupPolicy; import io.kafbat.ui.model.InternalPartition; import io.kafbat.ui.model.InternalPartitionsOffsets; import io.kafbat.ui.model.InternalTopic; @@ -27,7 +27,6 @@ import io.kafbat.ui.service.audit.AuditService; import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.AccessControlServiceMock; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -37,10 +36,11 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import reactor.core.publisher.Mono; @@ -70,6 +70,20 @@ class TopicsServicePaginationTest { private void init(Map topicsInCache) { KafkaCluster kafkaCluster = buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME); statisticsCache.replace(kafkaCluster, Statistics.empty()); + + Map offsets = topicsInCache.values().stream() + .flatMap(t -> + t.getPartitions().values().stream() + .map(p -> + Map.entry( + new TopicPartition(t.getName(), p.getPartition()), + new InternalPartitionsOffsets.Offsets(p.getOffsetMin(), p.getOffsetMax()) + ) + ) + ).filter(e -> + e.getValue().getEarliest() != null && e.getValue().getLatest() != null + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + statisticsCache.update( kafkaCluster, topicsInCache.entrySet().stream().collect( @@ -78,8 +92,11 @@ private void init(Map topicsInCache) { v -> toTopicDescription(v.getValue()) ) ), - Map.of(), - new InternalPartitionsOffsets(Map.of()), + topicsInCache.entrySet().stream() + .map(t -> + Map.entry(t.getKey(), List.of(new ConfigEntry(CLEANUP_POLICY_CONFIG, "delete"))) + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + new InternalPartitionsOffsets(offsets), clustersProperties ); when(adminClientService.get(isA(KafkaCluster.class))).thenReturn(Mono.just(reactiveAdminClient)); @@ -318,4 +335,61 @@ void shouldListTopicsOrderedByPartitionsCount() { ); } + @Test + void shouldListTopicsOrderedByMessagesCount() { + Map internalTopics = IntStream.rangeClosed(1, 100).boxed() + .map(i -> new TopicDescription(UUID.randomUUID().toString(), false, + IntStream.range(0, i) + .mapToObj(p -> + new TopicPartitionInfo(p, null, List.of(), List.of())) + .collect(Collectors.toList()))) + .map(topicDescription -> + InternalTopic.from(topicDescription, List.of(), + new InternalPartitionsOffsets( + topicDescription.partitions().stream() + .map(p -> Map.entry( + new TopicPartition(topicDescription.name(), p.partition()), + new InternalPartitionsOffsets.Offsets(0L, (long) p.partition()) + )).collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + )) + ), + Metrics.empty(), null, null, "_") + .toBuilder().cleanUpPolicy(CleanupPolicy.DELETE).build() + ).collect(Collectors.toMap(InternalTopic::getName, Function.identity())); + + init(internalTopics); + + var topicsSortedAsc = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, + null, TopicColumnsToSortDTO.MESSAGES_COUNT, null, null).block(); + + assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4); + assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25); + assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf( + internalTopics.values().stream() + .map(clusterMapper::toTopic) + .sorted(Comparator.comparing( + (t) -> t.getMessagesCount().get() + )) + .limit(25) + .collect(Collectors.toList()) + ); + + var topicsSortedDesc = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, + null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block(); + + assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4); + assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25); + assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf( + internalTopics.values().stream() + .map(clusterMapper::toTopic) + .sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed()) + .limit(25) + .collect(Collectors.toList()) + ); + } + }