Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ private Comparator<InternalTopic> getComparatorForTopic(
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
case MESSAGES_NUMBER -> Comparator.comparing(
InternalTopic::getMessagesNumber,
Comparator.nullsLast(Long::compareTo)
);
default -> defaultComparator;
};
}
Expand Down
9 changes: 9 additions & 0 deletions api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.common.resource.ResourceType;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.openapitools.jackson.nullable.JsonNullable;

@Mapper(componentModel = "spring")
public interface ClusterMapper {
Expand Down Expand Up @@ -104,6 +105,14 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {

TopicDTO toTopic(InternalTopic topic);

default <T> JsonNullable<T> toJsonNullable(T value) {
if (value == null) {
return JsonNullable.undefined();
} else {
return JsonNullable.of(value);
}
}

PartitionDTO toPartition(InternalPartition topic);

BrokerDTO toBrokerDto(InternalBroker broker);
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/InternalTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;

@Data
@Builder(toBuilder = true)
Expand Down Expand Up @@ -143,4 +144,16 @@ public static InternalTopic from(TopicDescription topicDescription,
return topic.build();
}

public @Nullable Long getMessagesNumber() {
Long result = null;
if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) {
result = 0L;
if (partitions != null && !partitions.isEmpty()) {
for (InternalPartition partition : partitions.values()) {
result += (partition.getOffsetMax() - partition.getOffsetMin());
}
}
}
return result;
}
}
2 changes: 2 additions & 0 deletions contract-typespec/api/topics.tsp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ enum TopicColumnsToSort {
TOTAL_PARTITIONS,
REPLICATION_FACTOR,
SIZE,
MESSAGES_NUMBER
}

model Topic {
Expand All @@ -170,6 +171,7 @@ model Topic {
bytesInPerSec?: float64;
bytesOutPerSec?: float64;
underReplicatedPartitions?: int32;
messagesNumber?: int64 | null;
cleanUpPolicy?: CleanUpPolicy;
partitions?: Partition[];
}
Expand Down
5 changes: 5 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,7 @@ components:
- TOTAL_PARTITIONS
- REPLICATION_FACTOR
- SIZE
- MESSAGES_NUMBER

SchemaColumnsToSort:
type: string
Expand Down Expand Up @@ -2747,6 +2748,10 @@ components:
type: array
items:
$ref: "#/components/schemas/Partition"
messagesNumber:
type: integer
format: int64
nullable: true
required:
- name

Expand Down
Loading