Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
.configureSsl(
truststoreConfig,
new ClustersProperties.KeystoreConfig(
config.getKeystoreType(),
config.getKeystoreCertificate(),
config.getKeystoreLocation(),
config.getKeystorePassword()
)
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.kafbat.ui.model.MetricsScrapeProperties.JMX_METRICS_TYPE;

import io.kafbat.ui.api.model.SecurityProtocol;
import jakarta.annotation.PostConstruct;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
Expand Down Expand Up @@ -63,7 +64,9 @@ public static class Cluster {
@NotBlank(message = "field bootstrapServers for for cluster could not be blank")
String bootstrapServers;

SecurityProtocol securityProtocol;
TruststoreConfig ssl;
KeystoreConfig kafkaSsl;

String schemaRegistry;
SchemaRegistryAuth schemaRegistryAuth;
Expand Down Expand Up @@ -108,6 +111,8 @@ public static class MetricsConfig {
Boolean ssl;
String username;
String password;
StoreType keystoreType;
String keystoreCertificate;
String keystoreLocation;
String keystorePassword;

Expand Down Expand Up @@ -143,6 +148,8 @@ public static class ConnectCluster {
String address;
String username;
String password;
StoreType keystoreType;
String keystoreCertificate;
String keystoreLocation;
String keystorePassword;
}
Expand All @@ -154,9 +161,14 @@ public static class SchemaRegistryAuth {
String password;
}

public enum StoreType {
JKS, PKCS12, PEM
}

@Data
@ToString(exclude = {"truststorePassword"})
public static class TruststoreConfig {
StoreType truststoreType;
String truststoreLocation;
String truststorePassword;
boolean verifySsl = true;
Expand All @@ -167,6 +179,8 @@ public static class TruststoreConfig {
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
StoreType keystoreType;
String keystoreCertificate;
String keystoreLocation;
String keystorePassword;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,24 @@ public class MetricsScrapeProperties {

public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster) {
var metrics = Objects.requireNonNull(cluster.getMetrics());

KeystoreConfig keystoreConfig = null;
if (metrics.getKeystoreLocation() != null) {
keystoreConfig = new KeystoreConfig(
metrics.getKeystoreType(),
metrics.getKeystoreCertificate(),
metrics.getKeystoreLocation(),
metrics.getKeystorePassword()
);
}

return MetricsScrapeProperties.builder()
.port(metrics.getPort())
.ssl(Optional.ofNullable(metrics.getSsl()).orElse(false))
.username(metrics.getUsername())
.password(metrics.getPassword())
.truststoreConfig(cluster.getSsl())
.keystoreConfig(
metrics.getKeystoreLocation() != null
? new KeystoreConfig(metrics.getKeystoreLocation(), metrics.getKeystorePassword())
: null
)
.keystoreConfig(keystoreConfig)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.fromSupplier(() -> {
Properties properties = new Properties();
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
cluster.getOriginalProperties().getSsl(),
cluster.getOriginalProperties().getKafkaSsl(),
cluster.getOriginalProperties().getSecurityProtocol(),
properties
);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,12 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
cluster.getOriginalProperties().getSsl(),
cluster.getOriginalProperties().getKafkaSsl(),
cluster.getOriginalProperties().getSecurityProtocol(),
props
);
props.putAll(cluster.getProperties());
props.putAll(cluster.getConsumerProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static io.kafbat.ui.util.KafkaServicesValidation.validateKsql;
import static io.kafbat.ui.util.KafkaServicesValidation.validatePrometheusStore;
import static io.kafbat.ui.util.KafkaServicesValidation.validateSchemaRegistry;
import static io.kafbat.ui.util.KafkaServicesValidation.validateTruststore;
import static io.kafbat.ui.util.KafkaServicesValidation.validateSslBundle;

import io.kafbat.ui.client.RetryingKafkaConnectClient;
import io.kafbat.ui.config.ClustersProperties;
Expand Down Expand Up @@ -105,20 +105,22 @@ public KafkaCluster create(ClustersProperties properties,

public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
if (clusterProperties.getSsl() != null) {
Optional<String> errMsg = validateTruststore(clusterProperties.getSsl());
Optional<String> errMsg = validateSslBundle(clusterProperties.getSsl(), clusterProperties.getKafkaSsl());
if (errMsg.isPresent()) {
return Mono.just(new ClusterConfigValidationDTO()
.kafka(new ApplicationPropertyValidationDTO()
.error(true)
.errorMessage("Truststore not valid: " + errMsg.get())));
.errorMessage("Truststore/Keystore not valid: " + errMsg.get())));
}
}

return Mono.zip(
validateClusterConnection(
clusterProperties.getBootstrapServers(),
convertProperties(clusterProperties.getProperties()),
clusterProperties.getSsl()
clusterProperties.getSsl(),
clusterProperties.getKafkaSsl(),
clusterProperties.getSecurityProtocol()
),
schemaRegistryConfigured(clusterProperties)
? validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
public static KafkaProducer<byte[], byte[]> createProducer(ClustersProperties.Cluster cluster,
Map<String, Object> additionalProps) {
Properties properties = new Properties();
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
cluster.getSsl(),
cluster.getKafkaSsl(),
cluster.getSecurityProtocol(),
properties
);
properties.putAll(cluster.getProperties());
properties.putAll(cluster.getProducerProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,7 @@ private Map<String, Object> prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperti
if (isSslJmxEndpoint(scrapeProperties)) {
var truststoreConfig = scrapeProperties.getTruststoreConfig();
var keystoreConfig = scrapeProperties.getKeystoreConfig();
JmxSslSocketFactory.setSslContextThreadLocal(
truststoreConfig != null ? truststoreConfig.getTruststoreLocation() : null,
truststoreConfig != null ? truststoreConfig.getTruststorePassword() : null,
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null
);
JmxSslSocketFactory.setSslContextThreadLocal(truststoreConfig, keystoreConfig);
JmxSslSocketFactory.editJmxConnectorEnv(env);
}

Expand Down Expand Up @@ -144,4 +139,3 @@ private List<RawMetric> extractObjectMetrics(ObjectName objectName, MBeanServerC
}

}

Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package io.kafbat.ui.service.metrics.scrape.jmx;

import com.google.common.base.Preconditions;
import java.io.FileInputStream;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.util.SslBundleUtil;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ResourceUtils;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.springframework.boot.ssl.SslBundle;

/*
* Purpose of this class to provide an ability to connect to different JMX endpoints using different keystores.
Expand Down Expand Up @@ -79,18 +77,13 @@ public static boolean initialized() {
private record HostAndPort(String host, int port) {
}

private record Ssl(@Nullable String truststoreLocation,
@Nullable String truststorePassword,
@Nullable String keystoreLocation,
@Nullable String keystorePassword) {
private record Ssl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
}

public static void setSslContextThreadLocal(@Nullable String truststoreLocation,
@Nullable String truststorePassword,
@Nullable String keystoreLocation,
@Nullable String keystorePassword) {
SSL_CONTEXT_THREAD_LOCAL.set(
new Ssl(truststoreLocation, truststorePassword, keystoreLocation, keystorePassword));
public static void setSslContextThreadLocal(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
SSL_CONTEXT_THREAD_LOCAL.set(new Ssl(truststoreConfig, keystoreConfig));
}

// should be called when (host:port) -> factory cache should be invalidated (ex. on app config reload)
Expand Down Expand Up @@ -118,33 +111,16 @@ public JmxSslSocketFactory() {
@SneakyThrows
private javax.net.ssl.SSLSocketFactory createFactoryFromThreadLocalCtx() {
Ssl ssl = Preconditions.checkNotNull(SSL_CONTEXT_THREAD_LOCAL.get());

var trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
if (ssl.truststoreLocation() != null && ssl.truststorePassword() != null) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(
new FileInputStream((ResourceUtils.getFile(ssl.truststoreLocation()))),
ssl.truststorePassword().toCharArray()
);
trustManagerFactory.init(trustStore);
}

var keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
if (ssl.keystoreLocation() != null && ssl.keystorePassword() != null) {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(
new FileInputStream(ResourceUtils.getFile(ssl.keystoreLocation())),
ssl.keystorePassword().toCharArray()
);
keyManagerFactory.init(keyStore, ssl.keystorePassword().toCharArray());
SslBundle bundle = SslBundleUtil.loadBundle(ssl.truststoreConfig(), ssl.keystoreConfig);

SSLContext ctx;
if (bundle != null) {
ctx = bundle.createSslContext();
} else {
ctx = SSLContext.getInstance("TLS");
ctx.init(null, null, null);
}

SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(
keyManagerFactory.getKeyManagers(),
trustManagerFactory.getTrustManagers(),
null
);
return ctx.getSocketFactory();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package io.kafbat.ui.util;

import io.kafbat.ui.api.model.SecurityProtocol;
import io.kafbat.ui.config.ClustersProperties;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory;
import org.springframework.boot.ssl.SslBundle;

public final class KafkaClientSslPropertiesUtil {

private KafkaClientSslPropertiesUtil() {
}

public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig,
@Nullable SecurityProtocol securityProtocol,
Properties sink) {
if (truststoreConfig == null) {
return;
if (securityProtocol != null) {
sink.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
}

if (!truststoreConfig.isVerifySsl()) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}

if (truststoreConfig.getTruststoreLocation() == null) {
SslBundle bundle = SslBundleUtil.loadBundle(truststoreConfig, keystoreConfig);
if (bundle == null) {
return;
}

sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());

if (truststoreConfig.getTruststorePassword() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
}

sink.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, SslBundleSslEngineFactory.class);
sink.put(SslBundle.class.getName(), bundle);
}
}
Loading
Loading