From 1871839bebf03fedcedff5d6f9652ae4622985ee Mon Sep 17 00:00:00 2001 From: farhad salehi Date: Thu, 21 Aug 2025 05:58:20 +0330 Subject: [PATCH 1/2] Update ApplicationConfigController.java --- .../ApplicationConfigController.java | 311 +++++++++++++++++- 1 file changed, 303 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java index 32c7d0432..6a6c292cc 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java @@ -19,9 +19,16 @@ import io.kafbat.ui.util.ApplicationRestarter; import io.kafbat.ui.util.DynamicConfigOperations; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationContext; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.Resource; import org.springframework.http.ResponseEntity; import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.Part; @@ -31,17 +38,251 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; +import org.yaml.snakeyaml.Yaml; +import java.io.InputStream; +import java.util.Set; @Slf4j @RestController @RequiredArgsConstructor -public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi { +public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi, HealthIndicator { private final DynamicConfigOperations dynamicConfigOperations; private final ApplicationRestarter restarter; private final KafkaClusterFactory kafkaClusterFactory; private final ApplicationInfoService applicationInfoService; private final DynamicConfigMapper configMapper; + private final ApplicationContext applicationContext; + + private final AtomicBoolean configValid = new AtomicBoolean(false); + private final AtomicBoolean validationInProgress = new AtomicBoolean(false); + + @jakarta.annotation.PostConstruct + public void validateInitialConfig() { + try { + log.info("Starting initial configuration validation..."); + validateConfigOnStartup(); + configValid.set(true); + log.info("Configuration validation passed"); + } catch (Exception e) { + configValid.set(false); + log.error("CRITICAL: Initial configuration validation failed. Application will exit.", e); + System.exit(1); + } + } + + @Override + public Health health() { + if (!configValid.get()) { + return Health.down() + .withDetail("reason", "Configuration validation failed") + .withDetail("action", "Pod will restart automatically") + .build(); + } + + if (validationInProgress.get()) { + return Health.down() + .withDetail("reason", "Configuration validation in progress") + .build(); + } + + return Health.up().build(); + } + + @EventListener(ApplicationReadyEvent.class) + public void validateConfigOnStartup() { + validationInProgress.set(true); + try { + log.info("Performing comprehensive configuration validation..."); + + // Validate YAML structure first (catches typos like 'rabc' instead of 'rbac') + validateYamlStructure(); + + DynamicConfigOperations.PropertiesStructure currentConfig = dynamicConfigOperations.getCurrentProperties(); + validateRequiredSections(currentConfig); + + if (currentConfig.getKafka() != null) { + ClustersProperties clustersProperties = convertToClustersProperties(currentConfig.getKafka()); + validateClustersConfig(clustersProperties) + .doOnNext(validations -> { + validations.forEach((clusterName, validation) -> { + if (validation != null && !isValidationSuccessful(validation)) { + throw new IllegalStateException("Cluster validation failed for: " + clusterName); + } + }); + }) + .block(); + } + + validateRbacConfig(currentConfig); + + log.info("Configuration validation completed successfully"); + configValid.set(true); + + } catch (Exception e) { + configValid.set(false); + log.error("Configuration validation failed: {}", e.getMessage(), e); + throw new RuntimeException("Configuration validation failed", e); + } finally { + validationInProgress.set(false); + } + } + + // Enhanced YAML structure validation to catch typos + private void validateYamlStructure() { + try { + Yaml yaml = new Yaml(); + // Try multiple possible config file locations + String[] configPaths = { + "file:./kafka-ui/config.yml" + }; + + for (String configPath : configPaths) { + try { + Resource configResource = applicationContext.getResource(configPath); + if (configResource.exists()) { + try (InputStream inputStream = configResource.getInputStream()) { + Map configMap = yaml.load(inputStream); + + // Check for correct section names + if (!configMap.containsKey("rbac")) { + // Look for common typos + boolean foundTypo = false; + for (String key : configMap.keySet()) { + if (key.toLowerCase().contains("rbac") || key.toLowerCase().contains("role") || + key.toLowerCase().contains("access") || key.toLowerCase().contains("auth")) { + if (!key.equals("rbac")) { + foundTypo = true; + throw new IllegalArgumentException("Configuration error: Found section '" + key + + "' instead of 'rbac'. Please correct the section name to 'rbac'. " + + "Available sections: " + configMap.keySet()); + } + } + } + + if (!foundTypo) { + throw new IllegalArgumentException("Missing 'rbac' section in configuration. " + + "Available sections: " + configMap.keySet()); + } + } + + // Validate other required sections + if (!configMap.containsKey("auth")) { + throw new IllegalArgumentException("Missing 'auth' section in configuration"); + } + + if (!configMap.containsKey("kafka")) { + throw new IllegalArgumentException("Missing 'kafka' section in configuration"); + } + + log.debug("YAML structure validation passed for: {}", configPath); + return; // Stop after first successful validation + } + } + } catch (Exception e) { + if (e instanceof IllegalArgumentException) { + throw e; // Re-throw validation errors + } + // Continue to next config path if this one fails + log.debug("Config path {} not available: {}", configPath, e.getMessage()); + } + } + + log.warn("Could not find config file for YAML structure validation"); + + } catch (IllegalArgumentException e) { + throw e; // Re-throw validation errors + } catch (Exception e) { + log.warn("YAML structure validation failed: {}", e.getMessage()); + // Don't fail completely - rely on object validation as fallback + } + } + + // Helper method to check if validation was successful + private boolean isValidationSuccessful(ClusterConfigValidationDTO validation) { + try { + // Try to use reflection to check validation status + // First try isValid() method + try { + return (Boolean) validation.getClass().getMethod("isValid").invoke(validation); + } catch (NoSuchMethodException e) { + // If isValid() doesn't exist, try getValid() method + try { + return (Boolean) validation.getClass().getMethod("getValid").invoke(validation); + } catch (NoSuchMethodException ex) { + // If neither method exists, check for error fields + try { + Object errors = validation.getClass().getMethod("getErrors").invoke(validation); + if (errors instanceof java.util.Collection) { + return ((java.util.Collection) errors).isEmpty(); + } + } catch (NoSuchMethodException exc) { + // If no validation methods found, assume it's valid + return true; + } + } + } + } catch (Exception e) { + log.warn("Failed to check validation status: {}", e.getMessage()); + return false; + } + return true; + } + + private ClustersProperties convertToClustersProperties(Object kafkaProperties) { + if (kafkaProperties instanceof ClustersProperties) { + return (ClustersProperties) kafkaProperties; + } + return new ClustersProperties(); + } + + private void validateRbacConfig(DynamicConfigOperations.PropertiesStructure config) { + if (config.getRbac() == null) { + throw new IllegalArgumentException("Missing required section: rbac"); + } + + // Enhanced RBAC content validation + try { + Object rbac = config.getRbac(); + + // Check if RBAC has roles + boolean hasRoles = false; + try { + Object roles = rbac.getClass().getMethod("getRoles").invoke(rbac); + if (roles instanceof java.util.Collection) { + hasRoles = !((java.util.Collection) roles).isEmpty(); + if (!hasRoles) { + throw new IllegalArgumentException("RBAC section must contain at least one role definition"); + } + } + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("RBAC section is missing required 'roles' property"); + } + + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new IllegalArgumentException("Invalid RBAC configuration structure: " + e.getMessage(), e); + } + + log.debug("RBAC configuration validation passed"); + } + + private void validateRequiredSections(DynamicConfigOperations.PropertiesStructure config) { + if (config.getAuth() == null) { + throw new IllegalArgumentException("Missing required section: auth"); + } + + if (config.getKafka() == null) { + throw new IllegalArgumentException("Missing required section: kafka"); + } + + if (config.getRbac() == null) { + throw new IllegalArgumentException("Missing required section: rbac"); + } + } + + // Your existing methods below @Override public Mono> getApplicationInfo(ServerWebExchange exchange) { @@ -79,8 +320,34 @@ public Mono> restartWithConfig(Mono rest return validateAccess(context) .then(restartRequestDto) .doOnNext(restartDto -> { - var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties()); - dynamicConfigOperations.persist(newConfig); + validationInProgress.set(true); + try { + var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties()); + validateRequiredSections(newConfig); + validateRbacConfig(newConfig); + + ClustersProperties clustersProperties = convertToClustersProperties(newConfig.getKafka()); + validateClustersConfig(clustersProperties) + .doOnNext(validations -> { + boolean allValid = validations.values().stream() + .allMatch(validation -> validation != null && isValidationSuccessful(validation)); + + if (!allValid) { + throw new IllegalArgumentException("Cluster validation failed"); + } + }) + .block(); + + dynamicConfigOperations.persist(newConfig); + configValid.set(true); + + } catch (Exception e) { + configValid.set(false); + log.error("Config validation failed: {}", e.getMessage(), e); + throw new RuntimeException("Configuration validation failed", e); + } finally { + validationInProgress.set(false); + } }) .doOnEach(sig -> audit(context, sig)) .doOnSuccess(dto -> restarter.requestRestart()) @@ -110,16 +377,44 @@ public Mono> validateConfig(Mono< .applicationConfigActions(EDIT) .operationName("validateConfig") .build(); + return validateAccess(context) .then(configDto) .flatMap(config -> { - DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties()); - ClustersProperties clustersProperties = newConfig.getKafka(); - return validateClustersConfig(clustersProperties) - .map(validations -> new ApplicationConfigValidationDTO().clusters(validations)); + validationInProgress.set(true); + try { + DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties()); + validateRequiredSections(newConfig); + validateRbacConfig(newConfig); + + ClustersProperties clustersProperties = convertToClustersProperties(newConfig.getKafka()); + return validateClustersConfig(clustersProperties) + .map(validations -> { + boolean allValid = validations.values().stream() + .allMatch(validation -> validation != null && isValidationSuccessful(validation)); + + ApplicationConfigValidationDTO result = new ApplicationConfigValidationDTO() + .clusters(validations); + + // Set valid field if it exists in your DTO + try { + result.getClass().getMethod("setValid", Boolean.class).invoke(result, allValid); + } catch (Exception e) { + // If setValid method doesn't exist, ignore + } + + return result; + }); + } finally { + validationInProgress.set(false); + } }) .map(ResponseEntity::ok) - .doOnEach(sig -> audit(context, sig)); + .doOnEach(sig -> audit(context, sig)) + .onErrorResume(e -> { + log.error("Configuration validation failed: {}", e.getMessage(), e); + return Mono.error(e); + }); } private Mono> validateClustersConfig( From 03f8e16f393804c561d4b54e16421c389a060e19 Mon Sep 17 00:00:00 2001 From: farhad salehi Date: Mon, 25 Aug 2025 10:46:42 +0000 Subject: [PATCH 2/2] add config validation --- .../ConfigValidationHealthIndicator.java | 200 +++++++++++ .../ui/config/validation/ConfigValidator.java | 34 ++ .../ApplicationConfigController.java | 311 +----------------- api/src/main/resources/application.yml | 3 +- 4 files changed, 244 insertions(+), 304 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/config/validation/ConfigValidationHealthIndicator.java create mode 100644 api/src/main/java/io/kafbat/ui/config/validation/ConfigValidator.java diff --git a/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidationHealthIndicator.java b/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidationHealthIndicator.java new file mode 100644 index 000000000..f2a7d0f06 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidationHealthIndicator.java @@ -0,0 +1,200 @@ +package io.kafbat.ui.config.validation; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.ClusterConfigValidationDTO; +import io.kafbat.ui.service.KafkaClusterFactory; +import io.kafbat.ui.util.DynamicConfigOperations; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.ReactiveHealthIndicator; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ConfigValidationHealthIndicator implements ReactiveHealthIndicator { + + private final DynamicConfigOperations dynamicConfigOperations; + private final KafkaClusterFactory kafkaClusterFactory; + + private final AtomicBoolean configValid = new AtomicBoolean(false); + private final AtomicReference validationError = new AtomicReference<>("Validation not yet performed"); + + @Override + public Mono health() { + if (!configValid.get()) { + return Mono.just(Health.down() + .withDetail("reason", "Configuration validation failed") + .withDetail("error", validationError.get()) + .build()); + } + + return Mono.just(Health.up().build()); + } + + public Mono validateConfiguration() { + try { + // getCurrentProperties() returns the object directly, not a Mono + DynamicConfigOperations.PropertiesStructure currentConfig = dynamicConfigOperations.getCurrentProperties(); + boolean isValid = validateConfigStructure(currentConfig); + + configValid.set(isValid); + if (isValid) { + validationError.set(null); + log.info("Configuration validation passed"); + } + + return Mono.just(isValid); + + } catch (Exception e) { + configValid.set(false); + validationError.set(e.getMessage()); + log.error("Configuration validation failed: {}", e.getMessage()); + return Mono.just(false); + } + } + + private boolean validateConfigStructure(DynamicConfigOperations.PropertiesStructure config) { + try { + // Validate required sections + validateRequiredSections(config); + + // Validate clusters + boolean clustersValid = validateClusters(config.getKafka()); + if (!clustersValid) { + throw new IllegalArgumentException("Cluster validation failed"); + } + + return true; + + } catch (Exception e) { + throw new RuntimeException("Configuration validation failed: " + e.getMessage(), e); + } + } + + private void validateRequiredSections(DynamicConfigOperations.PropertiesStructure config) { + if (config.getAuth() == null) { + throw new IllegalArgumentException("Missing required section: auth"); + } + + if (config.getKafka() == null) { + throw new IllegalArgumentException("Missing required section: kafka"); + } + + if (config.getRbac() == null) { + throw new IllegalArgumentException("Missing required section: rbac"); + } + + // Validate that sections have basic structure + validateSectionStructure("auth", config.getAuth()); + validateSectionStructure("kafka", config.getKafka()); + validateSectionStructure("rbac", config.getRbac()); + } + + private void validateSectionStructure(String sectionName, Object sectionConfig) { + if (sectionConfig == null) { + throw new IllegalArgumentException(sectionName + " section is null"); + } + + // Basic validation based on section type + switch (sectionName) { + case "kafka": + validateKafkaStructure(sectionConfig); + break; + case "rbac": + validateRbacStructure(sectionConfig); + break; + case "auth": + validateAuthStructure(sectionConfig); + break; + } + } + + private void validateAuthStructure(Object authConfig) { + // Basic validation - check if auth config has basic structure + try { + String authString = authConfig.toString(); + if (authString.contains("null") || authString.isEmpty()) { + throw new IllegalArgumentException("Auth configuration appears to be empty or invalid"); + } + } catch (Exception e) { + throw new IllegalArgumentException("Invalid auth configuration: " + e.getMessage()); + } + } + + private void validateKafkaStructure(Object kafkaConfig) { + if (!(kafkaConfig instanceof ClustersProperties)) { + throw new IllegalArgumentException("Invalid kafka configuration type"); + } + + ClustersProperties kafka = (ClustersProperties) kafkaConfig; + + // Failure-fast: explicit error if clusters are missing or empty + if (kafka.getClusters() == null) { + throw new IllegalArgumentException("Kafka configuration missing 'clusters' property"); + } + + if (kafka.getClusters().isEmpty()) { + throw new IllegalArgumentException("Kafka clusters list cannot be empty"); + } + + // Validate each cluster has required properties + kafka.getClusters().forEach(cluster -> { + if (cluster.getName() == null || cluster.getName().trim().isEmpty()) { + throw new IllegalArgumentException("Kafka cluster missing 'name' property"); + } + if (cluster.getBootstrapServers() == null || cluster.getBootstrapServers().trim().isEmpty()) { + throw new IllegalArgumentException("Kafka cluster '" + cluster.getName() + "' missing 'bootstrapServers' property"); + } + }); + } + + private void validateRbacStructure(Object rbacConfig) { + // Basic validation using toString checks to avoid reflection + try { + String rbacString = rbacConfig.toString(); + if (rbacString.contains("roles=null") || !rbacString.contains("roles")) { + throw new IllegalArgumentException("RBAC section missing 'roles' property"); + } + + if (rbacString.contains("roles=[]") || rbacString.contains("roles=[]")) { + throw new IllegalArgumentException("RBAC roles list cannot be empty"); + } + } catch (Exception e) { + throw new IllegalArgumentException("Invalid RBAC configuration: " + e.getMessage()); + } + } + + private boolean validateClusters(Object kafkaProperties) { + if (!(kafkaProperties instanceof ClustersProperties)) { + return false; + } + + ClustersProperties clustersProperties = (ClustersProperties) kafkaProperties; + + if (clustersProperties.getClusters() == null || clustersProperties.getClusters().isEmpty()) { + return false; + } + + clustersProperties.validateAndSetDefaults(); + + // For synchronous validation, we can't use reactive streams easily + // This is a simplified synchronous validation + try { + clustersProperties.getClusters().forEach(cluster -> { + // This will throw an exception if validation fails + kafkaClusterFactory.validate(cluster).block(); + }); + return true; + } catch (Exception e) { + log.warn("Cluster validation failed: {}", e.getMessage()); + return false; + } + } +} diff --git a/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidator.java b/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidator.java new file mode 100644 index 000000000..63abecc8d --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/config/validation/ConfigValidator.java @@ -0,0 +1,34 @@ +package io.kafbat.ui.config.validation; + +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ConfigValidator { + + private final ConfigValidationHealthIndicator healthIndicator; + + @PostConstruct + public void init() { + log.info("Configuration validator initialized"); + } + + @EventListener(ApplicationReadyEvent.class) + public void validateOnStartup(ApplicationReadyEvent event) { + healthIndicator.validateConfiguration() + .subscribe(valid -> { + if (!valid) { + log.error("Application started with invalid configuration. " + + "Check health endpoint for details. Application may not function correctly."); + } else { + log.info("Application started with valid configuration"); + } + }); + } +} diff --git a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java index 6a6c292cc..32c7d0432 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java @@ -19,16 +19,9 @@ import io.kafbat.ui.util.ApplicationRestarter; import io.kafbat.ui.util.DynamicConfigOperations; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.ApplicationContext; -import org.springframework.context.event.EventListener; -import org.springframework.core.io.Resource; import org.springframework.http.ResponseEntity; import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.Part; @@ -38,251 +31,17 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -import org.yaml.snakeyaml.Yaml; -import java.io.InputStream; -import java.util.Set; @Slf4j @RestController @RequiredArgsConstructor -public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi, HealthIndicator { +public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi { private final DynamicConfigOperations dynamicConfigOperations; private final ApplicationRestarter restarter; private final KafkaClusterFactory kafkaClusterFactory; private final ApplicationInfoService applicationInfoService; private final DynamicConfigMapper configMapper; - private final ApplicationContext applicationContext; - - private final AtomicBoolean configValid = new AtomicBoolean(false); - private final AtomicBoolean validationInProgress = new AtomicBoolean(false); - - @jakarta.annotation.PostConstruct - public void validateInitialConfig() { - try { - log.info("Starting initial configuration validation..."); - validateConfigOnStartup(); - configValid.set(true); - log.info("Configuration validation passed"); - } catch (Exception e) { - configValid.set(false); - log.error("CRITICAL: Initial configuration validation failed. Application will exit.", e); - System.exit(1); - } - } - - @Override - public Health health() { - if (!configValid.get()) { - return Health.down() - .withDetail("reason", "Configuration validation failed") - .withDetail("action", "Pod will restart automatically") - .build(); - } - - if (validationInProgress.get()) { - return Health.down() - .withDetail("reason", "Configuration validation in progress") - .build(); - } - - return Health.up().build(); - } - - @EventListener(ApplicationReadyEvent.class) - public void validateConfigOnStartup() { - validationInProgress.set(true); - try { - log.info("Performing comprehensive configuration validation..."); - - // Validate YAML structure first (catches typos like 'rabc' instead of 'rbac') - validateYamlStructure(); - - DynamicConfigOperations.PropertiesStructure currentConfig = dynamicConfigOperations.getCurrentProperties(); - validateRequiredSections(currentConfig); - - if (currentConfig.getKafka() != null) { - ClustersProperties clustersProperties = convertToClustersProperties(currentConfig.getKafka()); - validateClustersConfig(clustersProperties) - .doOnNext(validations -> { - validations.forEach((clusterName, validation) -> { - if (validation != null && !isValidationSuccessful(validation)) { - throw new IllegalStateException("Cluster validation failed for: " + clusterName); - } - }); - }) - .block(); - } - - validateRbacConfig(currentConfig); - - log.info("Configuration validation completed successfully"); - configValid.set(true); - - } catch (Exception e) { - configValid.set(false); - log.error("Configuration validation failed: {}", e.getMessage(), e); - throw new RuntimeException("Configuration validation failed", e); - } finally { - validationInProgress.set(false); - } - } - - // Enhanced YAML structure validation to catch typos - private void validateYamlStructure() { - try { - Yaml yaml = new Yaml(); - // Try multiple possible config file locations - String[] configPaths = { - "file:./kafka-ui/config.yml" - }; - - for (String configPath : configPaths) { - try { - Resource configResource = applicationContext.getResource(configPath); - if (configResource.exists()) { - try (InputStream inputStream = configResource.getInputStream()) { - Map configMap = yaml.load(inputStream); - - // Check for correct section names - if (!configMap.containsKey("rbac")) { - // Look for common typos - boolean foundTypo = false; - for (String key : configMap.keySet()) { - if (key.toLowerCase().contains("rbac") || key.toLowerCase().contains("role") || - key.toLowerCase().contains("access") || key.toLowerCase().contains("auth")) { - if (!key.equals("rbac")) { - foundTypo = true; - throw new IllegalArgumentException("Configuration error: Found section '" + key + - "' instead of 'rbac'. Please correct the section name to 'rbac'. " + - "Available sections: " + configMap.keySet()); - } - } - } - - if (!foundTypo) { - throw new IllegalArgumentException("Missing 'rbac' section in configuration. " + - "Available sections: " + configMap.keySet()); - } - } - - // Validate other required sections - if (!configMap.containsKey("auth")) { - throw new IllegalArgumentException("Missing 'auth' section in configuration"); - } - - if (!configMap.containsKey("kafka")) { - throw new IllegalArgumentException("Missing 'kafka' section in configuration"); - } - - log.debug("YAML structure validation passed for: {}", configPath); - return; // Stop after first successful validation - } - } - } catch (Exception e) { - if (e instanceof IllegalArgumentException) { - throw e; // Re-throw validation errors - } - // Continue to next config path if this one fails - log.debug("Config path {} not available: {}", configPath, e.getMessage()); - } - } - - log.warn("Could not find config file for YAML structure validation"); - - } catch (IllegalArgumentException e) { - throw e; // Re-throw validation errors - } catch (Exception e) { - log.warn("YAML structure validation failed: {}", e.getMessage()); - // Don't fail completely - rely on object validation as fallback - } - } - - // Helper method to check if validation was successful - private boolean isValidationSuccessful(ClusterConfigValidationDTO validation) { - try { - // Try to use reflection to check validation status - // First try isValid() method - try { - return (Boolean) validation.getClass().getMethod("isValid").invoke(validation); - } catch (NoSuchMethodException e) { - // If isValid() doesn't exist, try getValid() method - try { - return (Boolean) validation.getClass().getMethod("getValid").invoke(validation); - } catch (NoSuchMethodException ex) { - // If neither method exists, check for error fields - try { - Object errors = validation.getClass().getMethod("getErrors").invoke(validation); - if (errors instanceof java.util.Collection) { - return ((java.util.Collection) errors).isEmpty(); - } - } catch (NoSuchMethodException exc) { - // If no validation methods found, assume it's valid - return true; - } - } - } - } catch (Exception e) { - log.warn("Failed to check validation status: {}", e.getMessage()); - return false; - } - return true; - } - - private ClustersProperties convertToClustersProperties(Object kafkaProperties) { - if (kafkaProperties instanceof ClustersProperties) { - return (ClustersProperties) kafkaProperties; - } - return new ClustersProperties(); - } - - private void validateRbacConfig(DynamicConfigOperations.PropertiesStructure config) { - if (config.getRbac() == null) { - throw new IllegalArgumentException("Missing required section: rbac"); - } - - // Enhanced RBAC content validation - try { - Object rbac = config.getRbac(); - - // Check if RBAC has roles - boolean hasRoles = false; - try { - Object roles = rbac.getClass().getMethod("getRoles").invoke(rbac); - if (roles instanceof java.util.Collection) { - hasRoles = !((java.util.Collection) roles).isEmpty(); - if (!hasRoles) { - throw new IllegalArgumentException("RBAC section must contain at least one role definition"); - } - } - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("RBAC section is missing required 'roles' property"); - } - - } catch (IllegalArgumentException e) { - throw e; - } catch (Exception e) { - throw new IllegalArgumentException("Invalid RBAC configuration structure: " + e.getMessage(), e); - } - - log.debug("RBAC configuration validation passed"); - } - - private void validateRequiredSections(DynamicConfigOperations.PropertiesStructure config) { - if (config.getAuth() == null) { - throw new IllegalArgumentException("Missing required section: auth"); - } - - if (config.getKafka() == null) { - throw new IllegalArgumentException("Missing required section: kafka"); - } - - if (config.getRbac() == null) { - throw new IllegalArgumentException("Missing required section: rbac"); - } - } - - // Your existing methods below @Override public Mono> getApplicationInfo(ServerWebExchange exchange) { @@ -320,34 +79,8 @@ public Mono> restartWithConfig(Mono rest return validateAccess(context) .then(restartRequestDto) .doOnNext(restartDto -> { - validationInProgress.set(true); - try { - var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties()); - validateRequiredSections(newConfig); - validateRbacConfig(newConfig); - - ClustersProperties clustersProperties = convertToClustersProperties(newConfig.getKafka()); - validateClustersConfig(clustersProperties) - .doOnNext(validations -> { - boolean allValid = validations.values().stream() - .allMatch(validation -> validation != null && isValidationSuccessful(validation)); - - if (!allValid) { - throw new IllegalArgumentException("Cluster validation failed"); - } - }) - .block(); - - dynamicConfigOperations.persist(newConfig); - configValid.set(true); - - } catch (Exception e) { - configValid.set(false); - log.error("Config validation failed: {}", e.getMessage(), e); - throw new RuntimeException("Configuration validation failed", e); - } finally { - validationInProgress.set(false); - } + var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties()); + dynamicConfigOperations.persist(newConfig); }) .doOnEach(sig -> audit(context, sig)) .doOnSuccess(dto -> restarter.requestRestart()) @@ -377,44 +110,16 @@ public Mono> validateConfig(Mono< .applicationConfigActions(EDIT) .operationName("validateConfig") .build(); - return validateAccess(context) .then(configDto) .flatMap(config -> { - validationInProgress.set(true); - try { - DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties()); - validateRequiredSections(newConfig); - validateRbacConfig(newConfig); - - ClustersProperties clustersProperties = convertToClustersProperties(newConfig.getKafka()); - return validateClustersConfig(clustersProperties) - .map(validations -> { - boolean allValid = validations.values().stream() - .allMatch(validation -> validation != null && isValidationSuccessful(validation)); - - ApplicationConfigValidationDTO result = new ApplicationConfigValidationDTO() - .clusters(validations); - - // Set valid field if it exists in your DTO - try { - result.getClass().getMethod("setValid", Boolean.class).invoke(result, allValid); - } catch (Exception e) { - // If setValid method doesn't exist, ignore - } - - return result; - }); - } finally { - validationInProgress.set(false); - } + DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties()); + ClustersProperties clustersProperties = newConfig.getKafka(); + return validateClustersConfig(clustersProperties) + .map(validations -> new ApplicationConfigValidationDTO().clusters(validations)); }) .map(ResponseEntity::ok) - .doOnEach(sig -> audit(context, sig)) - .onErrorResume(e -> { - log.error("Configuration validation failed: {}", e.getMessage(), e); - return Mono.error(e); - }); + .doOnEach(sig -> audit(context, sig)); } private Mono> validateClustersConfig( diff --git a/api/src/main/resources/application.yml b/api/src/main/resources/application.yml index ba26c1f9c..1c3357700 100644 --- a/api/src/main/resources/application.yml +++ b/api/src/main/resources/application.yml @@ -7,6 +7,8 @@ management: enabled: true health: enabled: true + show-details: always + show-components: always endpoints: web: exposure: @@ -18,4 +20,3 @@ logging: io.kafbat.ui: DEBUG reactor.netty.http.server.AccessLog: INFO org.hibernate.validator: WARN -