Skip to content

Commit 66afaa4

Browse files
yazebochanRoman Nedzvetskiygermanosin
authored
Added begin and end offset param (#78)
* Added begin and end offset param * moved consumer to try with resources block * Fixed some problems * Moved to gettopicdetails * Cleanup code Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: German Osin <[email protected]>
1 parent 5ad3f5c commit 66afaa4

File tree

6 files changed

+88
-52
lines changed

6 files changed

+88
-52
lines changed

docker/kafka-clusters-only.yaml

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,39 @@ services:
2323
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
2424
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
2525
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
26-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
27-
JMX_PORT: 9997
28-
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
29-
30-
kafka01:
31-
image: confluentinc/cp-kafka:5.1.0
32-
depends_on:
33-
- zookeeper0
34-
ports:
35-
- 29093:29093
36-
- 9999:9999
37-
environment:
38-
KAFKA_BROKER_ID: 2
39-
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
40-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
41-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
42-
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
26+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
4427
JMX_PORT: 9997
4528
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
29+
#
30+
# kafka01:
31+
# image: confluentinc/cp-kafka:5.1.0
32+
# depends_on:
33+
# - zookeeper0
34+
# ports:
35+
# - 29093:29093
36+
# - 9999:9999
37+
# environment:
38+
# KAFKA_BROKER_ID: 2
39+
# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
40+
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
41+
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
42+
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43+
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
44+
# JMX_PORT: 9997
45+
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
4646

4747
kafka-init-topics0:
4848
image: confluentinc/cp-kafka:5.1.0
4949
depends_on:
5050
- kafka0
51-
command: "bash -c 'echo Waiting for Kafka to be ready... && \
52-
cub kafka-ready -b kafka0:9092 1 20 && \
53-
kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \
54-
kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'"
51+
- kafka1
52+
command:
53+
"kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \
54+
This is message 1 && \
55+
This is message 2 && \
56+
This is message 3 && \
57+
Message 4 && \
58+
Message 5"
5559
environment:
5660
KAFKA_BROKER_ID: ignored
5761
KAFKA_ZOOKEEPER_CONNECT: ignored
@@ -79,7 +83,7 @@ services:
7983
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090
8084
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
8185
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
82-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
86+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
8387
JMX_PORT: 9998
8488
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
8589

@@ -97,21 +101,21 @@ services:
97101
networks:
98102
- default
99103

100-
schemaregistry0:
101-
image: confluentinc/cp-schema-registry:5.1.0
102-
depends_on:
103-
- zookeeper0
104-
- kafka0
105-
- kafka01
106-
ports:
107-
- 8085:8085
108-
environment:
109-
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
110-
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
111-
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
112-
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
113-
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
114-
115-
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
116-
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
117-
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
104+
# schemaregistry0:
105+
# image: confluentinc/cp-schema-registry:5.1.0
106+
# depends_on:
107+
# - zookeeper0
108+
# - kafka0
109+
# - kafka01
110+
# ports:
111+
# - 8085:8085
112+
# environment:
113+
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
114+
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
115+
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
116+
# SCHEMA_REGISTRY_HOST_NAME: schemaregistry
117+
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
118+
#
119+
# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
120+
# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
121+
# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.provectus.kafka.ui.cluster.model;
22

3+
import com.provectus.kafka.ui.model.TopicPartitionDto;
34
import lombok.Builder;
45
import lombok.Data;
56
import org.apache.kafka.common.TopicPartition;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ public List<Topic> getTopics(String name) {
5555

5656
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
5757
return clustersStorage.getClusterByName(name)
58-
.map(KafkaCluster::getTopics)
59-
.map(t -> t.get(topicName))
60-
.map(clusterMapper::toTopicDetails);
58+
.map(c -> {
59+
var topic = c.getTopics().get(topicName);
60+
return clusterMapper
61+
.toTopicDetails(topic)
62+
.partitions(kafkaService.partitionDtoList(topic, c));
63+
});
6164
}
6265

6366
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {

kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import com.provectus.kafka.ui.cluster.model.*;
44
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
55
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
6-
import com.provectus.kafka.ui.model.ConsumerGroup;
7-
import com.provectus.kafka.ui.model.ServerStatus;
8-
import com.provectus.kafka.ui.model.Topic;
9-
import com.provectus.kafka.ui.model.TopicFormData;
6+
import com.provectus.kafka.ui.model.*;
107
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
118
import lombok.RequiredArgsConstructor;
129
import lombok.SneakyThrows;
@@ -26,7 +23,6 @@
2623
import reactor.util.function.Tuple2;
2724
import reactor.util.function.Tuples;
2825

29-
import java.math.BigDecimal;
3026
import java.util.*;
3127
import java.util.concurrent.ConcurrentHashMap;
3228
import java.util.stream.Collectors;
@@ -351,4 +347,26 @@ private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, Intern
351347
})
352348
);
353349
}
350+
351+
public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
352+
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
353+
return getTopicPartitionOffset(cluster, topicPartitions);
354+
}
355+
356+
private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
357+
try (var consumer = createConsumer(c)) {
358+
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(topicPartitions);
359+
final Map<TopicPartition, Long> latest = consumer.endOffsets(topicPartitions);
360+
361+
return topicPartitions.stream()
362+
.map( tp -> new TopicPartitionDto()
363+
.topic(tp.topic())
364+
.partition(tp.partition())
365+
.offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
366+
.offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
367+
).collect(Collectors.toList());
368+
} catch (Exception e) {
369+
return Collections.emptyList();
370+
}
371+
}
354372
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.provectus.kafka.ui.cluster.service.ClusterService;
66
import com.provectus.kafka.ui.model.*;
77
import lombok.RequiredArgsConstructor;
8-
98
import org.apache.commons.lang3.tuple.Pair;
109
import org.springframework.http.HttpStatus;
1110
import org.springframework.http.ResponseEntity;
@@ -14,12 +13,11 @@
1413
import reactor.core.publisher.Flux;
1514
import reactor.core.publisher.Mono;
1615

16+
import javax.validation.Valid;
1717
import java.util.Collections;
1818
import java.util.List;
1919
import java.util.function.Function;
2020

21-
import javax.validation.Valid;
22-
2321
@RestController
2422
@RequiredArgsConstructor
2523
public class MetricsRestController implements ApiClustersApi {

kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,10 @@ components:
383383
TopicDetails:
384384
type: object
385385
properties:
386+
partitions:
387+
type: array
388+
items:
389+
$ref: "#/components/schemas/TopicPartitionDto"
386390
partitionCount:
387391
type: integer
388392
replicationFactor:
@@ -486,9 +490,17 @@ components:
486490
type: string
487491
partition:
488492
type: integer
493+
offsetMax:
494+
type: integer
495+
format: int64
496+
offsetMin:
497+
type: integer
498+
format: int64
489499
required:
490500
- topic
491501
- partition
502+
- offsetMax
503+
- offsetMin
492504

493505
ConsumerTopicPartitionDetail:
494506
type: object

0 commit comments

Comments
 (0)