Skip to content

Commit cbc54f8

Browse files
authored
Added custom properties & fixed reset policy (#134)
* Added custom properties support
1 parent dd26014 commit cbc54f8

File tree

8 files changed

+29
-9
lines changed

8 files changed

+29
-9
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.provectus.kafka.ui.cluster.config;
22

3+
import java.util.Properties;
34
import lombok.Data;
45
import org.springframework.boot.context.properties.ConfigurationProperties;
56
import org.springframework.context.annotation.Configuration;
@@ -24,5 +25,6 @@ public static class Cluster {
2425
String protobufFile;
2526
String protobufMessageName;
2627
int jmxPort;
28+
Properties properties;
2729
}
2830
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
9797
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
9898
if (!versions.isEmpty()) {
9999
final Integer version = versions.iterator().next();
100-
final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false);
100+
final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic());
101+
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
101102
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
102103
try {
103104
protobufDeserializer.deserialize(record.topic(), record.value().get());

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
44
import com.provectus.kafka.ui.cluster.model.*;
55
import com.provectus.kafka.ui.model.*;
6+
import java.util.Properties;
67
import org.mapstruct.Mapper;
78
import org.mapstruct.Mapping;
89

@@ -22,6 +23,7 @@ public interface ClusterMapper {
2223
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
2324
Cluster toCluster(KafkaCluster cluster);
2425
@Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath")
26+
@Mapping(target = "properties", source = "properties", qualifiedByName="setProperties")
2527
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
2628
@Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
2729
ClusterStats toClusterStats(InternalClusterMetrics metrics);
@@ -73,4 +75,12 @@ default Path resolvePath(String path) {
7375
}
7476
}
7577

78+
default Properties setProperties(Properties properties) {
79+
Properties copy = new Properties();
80+
if (properties!=null) {
81+
copy.putAll(properties);
82+
}
83+
return copy;
84+
}
85+
7686
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.provectus.kafka.ui.cluster.model;
22

33
import com.provectus.kafka.ui.model.ServerStatus;
4+
import java.util.Properties;
45
import lombok.Builder;
56
import lombok.Data;
67

@@ -24,4 +25,5 @@ public class KafkaCluster {
2425
private final Throwable lastZookeeperException;
2526
private final Path protobufFile;
2627
private final String protobufMessageName;
28+
private final Properties properties;
2729
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster c
128128

129129
public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
130130
Properties properties = new Properties();
131+
properties.putAll(cluster.getProperties());
131132
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
132133
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
133134
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private boolean treeContainsValue(JsonNode tree, String query) {
8888

8989
@RequiredArgsConstructor
9090
private static class RecordEmitter {
91-
private static final int MAX_POLLS_COUNT = 30;
91+
private static final int MAX_EMPTY_POLLS_COUNT = 3;
9292
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
9393

9494
private final KafkaService kafkaService;
@@ -98,15 +98,16 @@ private static class RecordEmitter {
9898

9999
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
100100
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
101-
// assignPartitions(consumer);
102-
// seekOffsets(consumer);
103101
assignAndSeek(consumer);
104-
int pollsCount = 0;
105-
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
102+
int emptyPollsCount = 0;
103+
log.info("assignment: {}", consumer.assignment());
104+
while (!sink.isCancelled()) {
106105
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
107106
log.info("{} records polled", records.count());
108-
if (records.count() == 0) {
107+
if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) {
109108
break;
109+
} else {
110+
emptyPollsCount++;
110111
}
111112
records.iterator()
112113
.forEachRemaining(sink::next);

kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
209209

210210
public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
211211
Properties properties = new Properties();
212+
properties.putAll(kafkaCluster.getProperties());
212213
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
213214
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
214215
AdminClient adminClient = AdminClient.create(properties);
@@ -245,10 +246,12 @@ public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
245246

246247
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
247248
Properties props = new Properties();
249+
props.putAll(cluster.getProperties());
248250
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
249251
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
250252
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
251253
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
254+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
252255

253256
return new KafkaConsumer<>(props);
254257
}

kafka-ui-api/src/main/resources/application.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
kafka:
22
clusters:
33
- name: local
4-
bootstrapServers: b-1.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:9092
5-
zookeeper: z-2.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:2181
4+
bootstrapServers: localhost:9092
5+
zookeeper: localhost:2181
66
schemaRegistry: http://localhost:8081
77
admin-client-timeout: 5000
88
zookeeper:

0 commit comments

Comments
 (0)