Skip to content

Commit dc36271

Browse files
authored
BE: Fixed partitions stats storage in cache (#1345)
1 parent cb3c64e commit dc36271

File tree

3 files changed

+115
-20
lines changed

3 files changed

+115
-20
lines changed

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.kafbat.ui.model;
22

3+
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
4+
5+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
36
import java.math.BigDecimal;
47
import java.util.List;
58
import java.util.Map;
@@ -10,6 +13,8 @@
1013
import lombok.Data;
1114
import org.apache.kafka.clients.admin.ConfigEntry;
1215
import org.apache.kafka.clients.admin.TopicDescription;
16+
import org.apache.kafka.common.TopicPartition;
17+
import org.apache.kafka.common.config.TopicConfig;
1318

1419
@Data
1520
@Builder(toBuilder = true)
@@ -132,7 +137,7 @@ public static InternalTopic from(TopicDescription topicDescription,
132137

133138
topic.cleanUpPolicy(
134139
configs.stream()
135-
.filter(config -> config.name().equals("cleanup.policy"))
140+
.filter(config -> config.name().equals(CLEANUP_POLICY_CONFIG))
136141
.findFirst()
137142
.map(ConfigEntry::value)
138143
.map(CleanupPolicy::fromString)
@@ -142,6 +147,31 @@ public static InternalTopic from(TopicDescription topicDescription,
142147
return topic.build();
143148
}
144149

150+
public static InternalTopic from(ScrapedClusterState.TopicState topicState,
151+
@Nullable String internalTopicPrefix) {
152+
Map<TopicPartition, InternalPartitionsOffsets.Offsets> offsets =
153+
topicState.description().partitions().stream().map(p -> Map.entry(
154+
new TopicPartition(topicState.name(), p.partition()),
155+
new InternalPartitionsOffsets.Offsets(
156+
topicState.startOffsets().get(p.partition()),
157+
topicState.endOffsets().get(p.partition())
158+
)
159+
)
160+
).filter(e ->
161+
e.getValue().getEarliest() != null && e.getValue().getLatest() != null
162+
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
163+
164+
return from(
165+
topicState.description(),
166+
topicState.configs(),
167+
new InternalPartitionsOffsets(offsets),
168+
null,
169+
topicState.segmentStats(),
170+
topicState.partitionsSegmentStats(),
171+
internalTopicPrefix
172+
);
173+
}
174+
145175
public @Nullable Long getMessagesCount() {
146176
Long result = null;
147177
if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) {

api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private static ScrapedClusterState create(ClusterDescription clusterDescription,
210210
}
211211

212212
private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties,
213-
Map<String, TopicState> topicStates) {
213+
Map<String, TopicState> topicStates) {
214214
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
215215
List<InternalTopic> topics = topicStates.values().stream().map(
216216
topicState -> buildInternalTopic(topicState, clustersProperties)
@@ -233,17 +233,8 @@ private static <T> Map<Integer, T> filterTopic(String topicForFilter, Map<TopicP
233233
.collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue));
234234
}
235235

236-
private static InternalTopic buildInternalTopic(TopicState state, ClustersProperties clustersProperties) {
237-
return InternalTopic.from(
238-
state.description(),
239-
state.configs(),
240-
InternalPartitionsOffsets.empty(),
241-
null,
242-
state.segmentStats(),
243-
state.partitionsSegmentStats(),
244-
clustersProperties.getInternalTopicPrefix()
245-
);
236+
private static InternalTopic buildInternalTopic(TopicState state,
237+
ClustersProperties clustersProperties) {
238+
return InternalTopic.from(state, clustersProperties.getInternalTopicPrefix());
246239
}
247-
248-
249240
}

api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package io.kafbat.ui.service;
22

3+
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
34
import static org.assertj.core.api.Assertions.assertThat;
45
import static org.mockito.ArgumentMatchers.any;
56
import static org.mockito.ArgumentMatchers.anyBoolean;
67
import static org.mockito.ArgumentMatchers.anyList;
78
import static org.mockito.ArgumentMatchers.isA;
8-
import static org.mockito.ArgumentMatchers.isNull;
99
import static org.mockito.Mockito.mock;
1010
import static org.mockito.Mockito.when;
1111

1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.controller.TopicsController;
1414
import io.kafbat.ui.mapper.ClusterMapper;
1515
import io.kafbat.ui.mapper.ClusterMapperImpl;
16-
import io.kafbat.ui.model.InternalLogDirStats;
16+
import io.kafbat.ui.model.CleanupPolicy;
1717
import io.kafbat.ui.model.InternalPartition;
1818
import io.kafbat.ui.model.InternalPartitionsOffsets;
1919
import io.kafbat.ui.model.InternalTopic;
@@ -27,7 +27,6 @@
2727
import io.kafbat.ui.service.audit.AuditService;
2828
import io.kafbat.ui.service.rbac.AccessControlService;
2929
import io.kafbat.ui.util.AccessControlServiceMock;
30-
import java.util.ArrayList;
3130
import java.util.Comparator;
3231
import java.util.List;
3332
import java.util.Map;
@@ -37,10 +36,11 @@
3736
import java.util.function.Function;
3837
import java.util.stream.Collectors;
3938
import java.util.stream.IntStream;
39+
import org.apache.kafka.clients.admin.ConfigEntry;
4040
import org.apache.kafka.clients.admin.TopicDescription;
41+
import org.apache.kafka.common.TopicPartition;
4142
import org.apache.kafka.common.TopicPartitionInfo;
4243
import org.junit.jupiter.api.Test;
43-
import org.mockito.ArgumentMatchers;
4444
import org.mockito.Mockito;
4545
import reactor.core.publisher.Mono;
4646

@@ -70,6 +70,20 @@ class TopicsServicePaginationTest {
7070
private void init(Map<String, InternalTopic> topicsInCache) {
7171
KafkaCluster kafkaCluster = buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME);
7272
statisticsCache.replace(kafkaCluster, Statistics.empty());
73+
74+
Map<TopicPartition, InternalPartitionsOffsets.Offsets> offsets = topicsInCache.values().stream()
75+
.flatMap(t ->
76+
t.getPartitions().values().stream()
77+
.map(p ->
78+
Map.entry(
79+
new TopicPartition(t.getName(), p.getPartition()),
80+
new InternalPartitionsOffsets.Offsets(p.getOffsetMin(), p.getOffsetMax())
81+
)
82+
)
83+
).filter(e ->
84+
e.getValue().getEarliest() != null && e.getValue().getLatest() != null
85+
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
86+
7387
statisticsCache.update(
7488
kafkaCluster,
7589
topicsInCache.entrySet().stream().collect(
@@ -78,8 +92,11 @@ private void init(Map<String, InternalTopic> topicsInCache) {
7892
v -> toTopicDescription(v.getValue())
7993
)
8094
),
81-
Map.of(),
82-
new InternalPartitionsOffsets(Map.of()),
95+
topicsInCache.entrySet().stream()
96+
.map(t ->
97+
Map.entry(t.getKey(), List.of(new ConfigEntry(CLEANUP_POLICY_CONFIG, "delete")))
98+
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
99+
new InternalPartitionsOffsets(offsets),
83100
clustersProperties
84101
);
85102
when(adminClientService.get(isA(KafkaCluster.class))).thenReturn(Mono.just(reactiveAdminClient));
@@ -318,4 +335,61 @@ void shouldListTopicsOrderedByPartitionsCount() {
318335
);
319336
}
320337

338+
@Test
339+
void shouldListTopicsOrderedByMessagesCount() {
340+
Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
341+
.map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
342+
IntStream.range(0, i)
343+
.mapToObj(p ->
344+
new TopicPartitionInfo(p, null, List.of(), List.of()))
345+
.collect(Collectors.toList())))
346+
.map(topicDescription ->
347+
InternalTopic.from(topicDescription, List.of(),
348+
new InternalPartitionsOffsets(
349+
topicDescription.partitions().stream()
350+
.map(p -> Map.entry(
351+
new TopicPartition(topicDescription.name(), p.partition()),
352+
new InternalPartitionsOffsets.Offsets(0L, (long) p.partition())
353+
)).collect(Collectors.toMap(
354+
Map.Entry::getKey,
355+
Map.Entry::getValue
356+
))
357+
),
358+
Metrics.empty(), null, null, "_")
359+
.toBuilder().cleanUpPolicy(CleanupPolicy.DELETE).build()
360+
).collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
361+
362+
init(internalTopics);
363+
364+
var topicsSortedAsc = topicsController
365+
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
366+
null, TopicColumnsToSortDTO.MESSAGES_COUNT, null, null).block();
367+
368+
assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
369+
assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
370+
assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
371+
internalTopics.values().stream()
372+
.map(clusterMapper::toTopic)
373+
.sorted(Comparator.comparing(
374+
(t) -> t.getMessagesCount().get()
375+
))
376+
.limit(25)
377+
.collect(Collectors.toList())
378+
);
379+
380+
var topicsSortedDesc = topicsController
381+
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
382+
null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
383+
384+
assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
385+
assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
386+
assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
387+
internalTopics.values().stream()
388+
.map(clusterMapper::toTopic)
389+
.sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
390+
.limit(25)
391+
.collect(Collectors.toList())
392+
);
393+
}
394+
321395
}

0 commit comments

Comments
 (0)