diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index e73029c4f..6fbfce1d7 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -61,6 +61,8 @@ public static class Cluster { MetricsConfigData metrics; Map properties; + Map consumerProperties; + Map producerProperties; boolean readOnly = false; Long pollingThrottleRate; @@ -200,6 +202,8 @@ private void setMetricsDefaults() { private void flattenClusterProperties() { for (Cluster cluster : clusters) { cluster.setProperties(flattenClusterProperties(null, cluster.getProperties())); + cluster.setConsumerProperties(flattenClusterProperties(null, cluster.getConsumerProperties())); + cluster.setProducerProperties(flattenClusterProperties(null, cluster.getProducerProperties())); } } diff --git a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java index 94e097373..6e2a00988 100644 --- a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java +++ b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java @@ -24,6 +24,8 @@ public class KafkaCluster { private final String version; private final String bootstrapServers; private final Properties properties; + private final Properties consumerProperties; + private final Properties producerProperties; private final boolean readOnly; private final MetricsConfig metricsConfig; private final DataMasking masking; diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 282bdc5b6..f1256cf00 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -266,6 +266,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster, Properties props = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); props.putAll(cluster.getProperties()); + props.putAll(cluster.getConsumerProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index 16e41ff84..a74a33ce4 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -53,6 +53,8 @@ public KafkaCluster create(ClustersProperties properties, builder.name(clusterProperties.getName()); builder.bootstrapServers(clusterProperties.getBootstrapServers()); builder.properties(convertProperties(clusterProperties.getProperties())); + builder.consumerProperties(convertProperties(clusterProperties.getConsumerProperties())); + builder.producerProperties(convertProperties(clusterProperties.getProducerProperties())); builder.readOnly(clusterProperties.isReadOnly()); builder.masking(DataMasking.create(clusterProperties.getMasking())); builder.pollingSettings(PollingSettings.create(clusterProperties, properties)); diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index c94472d56..b33be8b76 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -201,6 +201,7 @@ public static KafkaProducer createProducer(KafkaCluster cluster, Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); + properties.putAll(cluster.getProducerProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index 9e9e903cc..6c6c73113 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.IsolationLevel; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.function.ThrowingConsumer; import org.junit.jupiter.api.io.TempDir; @@ -102,6 +103,12 @@ public void initialize(@NotNull ConfigurableApplicationContext context) { System.setProperty("kafka.clusters.0.audit.topicAuditEnabled", "true"); System.setProperty("kafka.clusters.0.audit.consoleAuditEnabled", "true"); + System.setProperty("kafka.clusters.0.consumerProperties.request.timeout.ms", "60000"); + System.setProperty("kafka.clusters.0.consumerProperties.isolation.level", + IsolationLevel.READ_COMMITTED.toString()); + System.setProperty("kafka.clusters.0.producerProperties.request.timeout.ms", "45000"); + System.setProperty("kafka.clusters.0.producerProperties.max.block.ms", "80000"); + System.setProperty("kafka.clusters.1.name", SECOND_LOCAL); System.setProperty("kafka.clusters.1.readOnly", "true"); System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers()); diff --git a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java index 756bd5c91..c436869a2 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java @@ -1,6 +1,7 @@ package io.kafbat.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.kafbat.ui.AbstractIntegrationTest; import io.kafbat.ui.model.BrokerConfigDTO; @@ -10,6 +11,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.IsolationLevel; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -79,6 +84,22 @@ void testAlterReadonlyConfig() { .expectStatus().isBadRequest(); } + @Test + void testKafkaClientCustomProperties() { + KafkaCluster cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).orElseThrow(); + + Properties consumerProps = cluster.getConsumerProperties(); + + assertEquals("60000", consumerProps.getProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals(IsolationLevel.READ_COMMITTED.toString(), + consumerProps.getProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); + + Properties producerProps = cluster.getProducerProperties(); + + assertEquals("45000", producerProps.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals("80000", producerProps.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG)); + } + private Optional getConfig(String name) { List configs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, 1)