From 2a5fb3a68e72807f3a607f0a2767304d1815c2ce Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Tue, 18 Mar 2025 17:06:32 +1300 Subject: [PATCH 1/4] Replace GenericKubernetesResource with KafkaProtocolFilter Signed-off-by: Sam Barker rh-pre-commit.version: 2.0.1 rh-pre-commit.check-secrets: ENABLED --- .../operator/ProxyConfigConfigMap.java | 36 ++++++++++--------- .../resolver/DependencyResolverImpl.java | 10 +++--- .../operator/resolver/ResolutionResult.java | 11 +++--- .../operator/DerivedResourcesTest.java | 12 +++---- .../SecureConfigInterpolatorTest.java | 29 +++++++++++++++ .../resolver/DependencyResolverImplTest.java | 34 +++++++++--------- 6 files changed, 81 insertions(+), 51 deletions(-) diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java index d0617f58fc..87a91abd7c 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,6 +34,8 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate; import io.kroxylicious.kubernetes.operator.model.ProxyModel; import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder; import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel; @@ -46,11 +49,11 @@ import io.kroxylicious.proxy.config.admin.ManagementConfiguration; import io.kroxylicious.proxy.config.admin.PrometheusMetricsConfig; -import edu.umd.cs.findbugs.annotations.NonNull; - import static io.kroxylicious.kubernetes.operator.Labels.standardLabels; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Generates a Kube {@code ConfigMap} containing the proxy config YAML. */ @@ -203,17 +206,13 @@ private List filterDefinitions(Context contex String filterDefinitionName = filterDefinitionName(filterCrRef); var filterCr = resolutionResult.filter(filterCrRef).orElseThrow(); - if (filterCr.getAdditionalProperties().get("spec") instanceof Map spec) { - String type = (String) spec.get("type"); - SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec); - ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext(); - putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes()); - putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts()); - return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config()); - } - else { - throw new InvalidClusterException(ClusterCondition.filterInvalid(ResourcesUtil.name(cluster), filterDefinitionName, "`spec` was not an `object`.")); - } + var spec = filterCr.getSpec(); + String type = spec.getType(); + SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec); + ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext(); + putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes()); + putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts()); + return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config()); }).toList(); } @@ -228,14 +227,17 @@ private static void putOrMerged(ManagedWorkflowAndDependentResourceContext c } } - private SecureConfigInterpolator.InterpolationResult interpolateConfig(Map spec) { + private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) { SecureConfigInterpolator.InterpolationResult result; - Object configTemplate = spec.get("configTemplate"); + ConfigTemplate configTemplate = spec.getConfigTemplate(); if (configTemplate != null) { - result = secureConfigInterpolator.interpolate(configTemplate); + //Nasty nsaty hack. Revisit + final TreeMap configProperties = new TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder())); + configProperties.putAll(configTemplate.getAdditionalProperties()); + result = secureConfigInterpolator.interpolate(configProperties); } else { - result = new SecureConfigInterpolator.InterpolationResult(spec.get("config"), Set.of(), Set.of()); + throw new IllegalArgumentException("ConfigTemplate is not found in KafkaProtocolFilterSpec"); } return result; } diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java index f945b7b274..3c4ce36eac 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java @@ -15,7 +15,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.kroxylicious.kubernetes.api.common.FilterRef; @@ -25,6 +24,7 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.ResourcesUtil; import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult; @@ -52,7 +52,7 @@ public ResolutionResult deepResolve(Context context, UnresolvedDepen .collect(ResourcesUtil.toByLocalRefMap()); Map, KafkaService> clusterRefs = context.getSecondaryResources(KafkaService.class).stream() .collect(ResourcesUtil.toByLocalRefMap()); - Map, GenericKubernetesResource> filters = context.getSecondaryResources(GenericKubernetesResource.class).stream() + Map, KafkaProtocolFilter> filters = context.getSecondaryResources(KafkaProtocolFilter.class).stream() .collect(ResourcesUtil.toByLocalRefMap()); var resolutionResult = virtualKafkaClusters.stream().map(cluster -> determineUnresolvedDependencies(cluster, ingresses, clusterRefs, filters)) .collect(Collectors.toSet()); @@ -64,7 +64,7 @@ public ResolutionResult deepResolve(Context context, UnresolvedDepen private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaCluster cluster, Map, KafkaProxyIngress> ingresses, Map, KafkaService> clusterRefs, - Map, GenericKubernetesResource> filters) { + Map, KafkaProtocolFilter> filters) { VirtualKafkaClusterSpec spec = cluster.getSpec(); Set> unresolvedDependencies = new HashSet<>(); determineUnresolvedIngresses(spec, ingresses).forEach(unresolvedDependencies::add); @@ -74,7 +74,7 @@ private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaClus } private Stream> determineUnresolvedFilters(VirtualKafkaClusterSpec spec, - Map, GenericKubernetesResource> filters) { + Map, KafkaProtocolFilter> filters) { List filtersList = spec.getFilterRefs(); if (filtersList == null) { return Stream.empty(); @@ -102,7 +102,7 @@ private static Stream> determineUnresolvedIngresses(Virtua .filter(ref -> !ingresses.containsKey(ref)); } - private static boolean filterResourceMatchesRef(FilterRef filterRef, GenericKubernetesResource filterResource) { + private static boolean filterResourceMatchesRef(FilterRef filterRef, KafkaProtocolFilter filterResource) { String apiVersion = filterResource.getApiVersion(); var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/")); return filterResourceGroup.equals(filterRef.getGroup()) diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java index c92b6e3130..531c20eee8 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java @@ -15,13 +15,12 @@ import java.util.Set; import java.util.function.Predicate; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; - import io.kroxylicious.kubernetes.api.common.FilterRef; import io.kroxylicious.kubernetes.api.common.LocalRef; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.ResourcesUtil; import static java.util.Comparator.comparing; @@ -32,7 +31,7 @@ * describes which dependencies could not be resolved per VirtualKafkaCluster. */ public class ResolutionResult { - private final Map, GenericKubernetesResource> filters; + private final Map, KafkaProtocolFilter> filters; private final Map, KafkaProxyIngress> kafkaProxyIngresses; private final Map, KafkaService> kafkaServiceRefs; private final Set clusterResolutionResults; @@ -53,7 +52,7 @@ public boolean isFullyResolved() { } - ResolutionResult(Map, GenericKubernetesResource> filters, + ResolutionResult(Map, KafkaProtocolFilter> filters, Map, KafkaProxyIngress> kafkaProxyIngresses, Map, KafkaService> kafkaServiceRefs, Set clusterResolutionResults) { @@ -127,7 +126,7 @@ public Optional kafkaServiceRef(VirtualKafkaCluster cluster) { * Get all resolved Filters * @return filters */ - public Collection filters() { + public Collection filters() { return filters.values(); } @@ -135,7 +134,7 @@ public Collection filters() { * Get the resolved GenericKubernetesResource for a filterRef * @return optional containing the resource if resolved, else empty */ - public Optional filter(FilterRef filterRef) { + public Optional filter(FilterRef filterRef) { return filters().stream() .filter(filterResource -> { String apiVersion = filterResource.getApiVersion(); diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java index 9dfeba1457..4aab862cab 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java @@ -35,7 +35,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -48,10 +47,9 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.config.RuntimeDecl; -import edu.umd.cs.findbugs.annotations.NonNull; - import static io.kroxylicious.kubernetes.operator.ProxyDeployment.KROXYLICIOUS_IMAGE_ENV_VAR; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace; @@ -59,6 +57,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import edu.umd.cs.findbugs.annotations.NonNull; + class DerivedResourcesTest { static final YAMLMapper YAML_MAPPER = new YAMLMapper() @@ -363,18 +363,18 @@ private static Context buildContext(Path testDir, List filterInstances = new HashSet<>(); + Set filterInstances = new HashSet<>(); for (var filterApi : runtimeDecl.filterApis()) { String fileName = "in-" + filterApi.kind() + "-*.yaml"; try (var dirStream = Files.newDirectoryStream(testDir, fileName)) { for (Path p : dirStream) { - GenericKubernetesResource resource = YAML_MAPPER.readValue(p.toFile(), GenericKubernetesResource.class); + KafkaProtocolFilter resource = YAML_MAPPER.readValue(p.toFile(), KafkaProtocolFilter.class); assertMinimalMetadata(resource.getMetadata(), fileName); filterInstances.add(resource); } } } - doReturn(filterInstances).when(context).getSecondaryResources(GenericKubernetesResource.class); + doReturn(filterInstances).when(context).getSecondaryResources(KafkaProtocolFilter.class); doReturn(Set.copyOf(virtualKafkaClusters)).when(context).getSecondaryResources(VirtualKafkaCluster.class); doReturn(Set.copyOf(kafkaServiceRefs)).when(context).getSecondaryResources(KafkaService.class); doReturn(Set.copyOf(ingresses)).when(context).getSecondaryResources(KafkaProxyIngress.class); diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java index 0bb00190b1..01d873cf6a 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java @@ -18,6 +18,8 @@ import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -68,6 +70,33 @@ void shouldInterpolateInAnObject() throws JsonProcessingException { Map secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER); var i = new SecureConfigInterpolator("/base", secret); + var jsonValue = YAML_MAPPER.readValue(""" + filterOneConfig: true + password: "${secret:my-secret:filter-one}" + """, ConfigTemplate.class); + + // when + var result = i.interpolate(jsonValue); + + // then + assertThat(result.volumes()).singleElement().extracting(Volume::getName).isEqualTo("secrets-my-secret"); + assertThat(result.volumes()).singleElement().extracting(Volume::getSecret).extracting(SecretVolumeSource::getSecretName).isEqualTo("my-secret"); + + assertThat(result.mounts()).singleElement().extracting(VolumeMount::getName).isEqualTo("secrets-filter-one"); + assertThat(result.mounts()).singleElement().extracting(VolumeMount::getMountPath).isEqualTo("/base/secret/filter-one"); + + assertThat(YAML_MAPPER.writeValueAsString(result.config())).isEqualTo(""" + filterOneConfig: true + password: /base/secret/my-secret/filter-one + """); + } + + @Test + void shouldInterpolateInnMap() throws JsonProcessingException { + // given + Map secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER); + var i = new SecureConfigInterpolator("/base", secret); + var jsonValue = YAML_MAPPER.readValue(""" kms: AwsKms object: diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java index e9b6de0f9f..917f720a70 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java @@ -9,17 +9,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.OngoingStubbing; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; -import io.fabric8.kubernetes.api.model.GenericKubernetesResourceBuilder; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.kroxylicious.kubernetes.api.common.FilterRef; @@ -34,6 +30,8 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterBuilder; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder; import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult; import static org.assertj.core.api.Assertions.assertThat; @@ -92,7 +90,7 @@ void testNullFiltersOnVirtualClusterTolerated() { @Test void testSingleFilterUnreferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -113,7 +111,7 @@ void testSingleFilterUnreferenced() { @Test void testSingleFilterReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -134,8 +132,8 @@ void testSingleFilterReferenced() { @Test void testMultipleFiltersReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); - GenericKubernetesResource filter2 = protocolFilter("filterName2"); + KafkaProtocolFilter filter = protocolFilter("filterName"); + KafkaProtocolFilter filter2 = protocolFilter("filterName2"); givenFiltersInContext(filter, filter2); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -157,7 +155,7 @@ void testMultipleFiltersReferenced() { @Test void testSubsetOfFiltersReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -179,7 +177,7 @@ void testSubsetOfFiltersReferenced() { @Test void testUnresolvedFilter() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("clusterRef")); givenIngressesInContext(); @@ -320,16 +318,17 @@ private static FilterRef filterRef(String name) { return new FilterRefBuilder().withName(name).build(); } - private static GenericKubernetesResource protocolFilter(String name) { - return new GenericKubernetesResourceBuilder() + private static KafkaProtocolFilter protocolFilter(String name) { + return new KafkaProtocolFilterBuilder() .withApiVersion("filter.kroxylicious.io/v1alpha") .withKind("KafkaProtocolFilter") .withNewMetadata().withName(name) - .endMetadata().build(); + .endMetadata() + .build(); } - private void givenFiltersInContext(GenericKubernetesResource... resources) { - givenSecondaryResourcesInContext(GenericKubernetesResource.class, resources); + private void givenFiltersInContext(KafkaProtocolFilter... resources) { + givenSecondaryResourcesInContext(KafkaProtocolFilter.class, resources); } private void givenIngressesInContext(KafkaProxyIngress... ingresses) { @@ -344,8 +343,9 @@ private void givenClusterRefsInContext(KafkaService... clusterRefs) { givenSecondaryResourcesInContext(KafkaService.class, clusterRefs); } - private OngoingStubbing> givenSecondaryResourcesInContext(Class type, T... resources) { - return when(mockContext.getSecondaryResources(type)).thenReturn(Arrays.stream(resources).collect(Collectors.toSet())); + @SafeVarargs + private void givenSecondaryResourcesInContext(Class type, T... resources) { + when(mockContext.getSecondaryResources(type)).thenReturn(Arrays.stream(resources).collect(Collectors.toSet())); } private static VirtualKafkaCluster virtualCluster(List filterRefs, String clusterRef, List ingressRefs) { From 1e5b30b4d348047ac0dd96d8f3f6a1f48f179749 Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Tue, 18 Mar 2025 17:08:41 +1300 Subject: [PATCH 2/4] fixup! Replace GenericKubernetesResource with KafkaProtocolFilter Signed-off-by: Sam Barker rh-pre-commit.version: 2.0.1 rh-pre-commit.check-secrets: ENABLED --- .../kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java index 87a91abd7c..e0020b7683 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java @@ -231,7 +231,7 @@ private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProt SecureConfigInterpolator.InterpolationResult result; ConfigTemplate configTemplate = spec.getConfigTemplate(); if (configTemplate != null) { - //Nasty nsaty hack. Revisit + // Nasty nasty hack. Revisit final TreeMap configProperties = new TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder())); configProperties.putAll(configTemplate.getAdditionalProperties()); result = secureConfigInterpolator.interpolate(configProperties); From 3ab0b854555cef8913708965e1b5c499175ff659 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 19 Mar 2025 09:36:55 +1300 Subject: [PATCH 3/4] Use Object for configTemplate Signed-off-by: Tom Bentley --- kroxylicious-operator/pom.xml | 4 ++- .../operator/ProxyConfigConfigMap.java | 13 +++------ .../operator/DerivedResourcesTest.java | 4 +-- .../SecureConfigInterpolatorTest.java | 29 ------------------- 4 files changed, 9 insertions(+), 41 deletions(-) diff --git a/kroxylicious-operator/pom.xml b/kroxylicious-operator/pom.xml index ed3f61b0b1..f51ff5c24e 100644 --- a/kroxylicious-operator/pom.xml +++ b/kroxylicious-operator/pom.xml @@ -357,7 +357,9 @@ io.kroxylicious.kubernetes.api.common.Condition - + + java.lang.Object + diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java index e0020b7683..c69e47ef8d 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java @@ -16,7 +16,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,7 +34,6 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec; -import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate; import io.kroxylicious.kubernetes.operator.model.ProxyModel; import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder; import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel; @@ -49,11 +47,11 @@ import io.kroxylicious.proxy.config.admin.ManagementConfiguration; import io.kroxylicious.proxy.config.admin.PrometheusMetricsConfig; +import edu.umd.cs.findbugs.annotations.NonNull; + import static io.kroxylicious.kubernetes.operator.Labels.standardLabels; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace; -import edu.umd.cs.findbugs.annotations.NonNull; - /** * Generates a Kube {@code ConfigMap} containing the proxy config YAML. */ @@ -229,12 +227,9 @@ private static void putOrMerged(ManagedWorkflowAndDependentResourceContext c private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) { SecureConfigInterpolator.InterpolationResult result; - ConfigTemplate configTemplate = spec.getConfigTemplate(); + Object configTemplate = spec.getConfigTemplate(); if (configTemplate != null) { - // Nasty nasty hack. Revisit - final TreeMap configProperties = new TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder())); - configProperties.putAll(configTemplate.getAdditionalProperties()); - result = secureConfigInterpolator.interpolate(configProperties); + result = secureConfigInterpolator.interpolate(configTemplate); } else { throw new IllegalArgumentException("ConfigTemplate is not found in KafkaProtocolFilterSpec"); diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java index 4aab862cab..cc2becadc4 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java @@ -50,6 +50,8 @@ import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.config.RuntimeDecl; +import edu.umd.cs.findbugs.annotations.NonNull; + import static io.kroxylicious.kubernetes.operator.ProxyDeployment.KROXYLICIOUS_IMAGE_ENV_VAR; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name; import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace; @@ -57,8 +59,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import edu.umd.cs.findbugs.annotations.NonNull; - class DerivedResourcesTest { static final YAMLMapper YAML_MAPPER = new YAMLMapper() diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java index 01d873cf6a..0bb00190b1 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java @@ -18,8 +18,6 @@ import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; -import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate; - import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -70,33 +68,6 @@ void shouldInterpolateInAnObject() throws JsonProcessingException { Map secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER); var i = new SecureConfigInterpolator("/base", secret); - var jsonValue = YAML_MAPPER.readValue(""" - filterOneConfig: true - password: "${secret:my-secret:filter-one}" - """, ConfigTemplate.class); - - // when - var result = i.interpolate(jsonValue); - - // then - assertThat(result.volumes()).singleElement().extracting(Volume::getName).isEqualTo("secrets-my-secret"); - assertThat(result.volumes()).singleElement().extracting(Volume::getSecret).extracting(SecretVolumeSource::getSecretName).isEqualTo("my-secret"); - - assertThat(result.mounts()).singleElement().extracting(VolumeMount::getName).isEqualTo("secrets-filter-one"); - assertThat(result.mounts()).singleElement().extracting(VolumeMount::getMountPath).isEqualTo("/base/secret/filter-one"); - - assertThat(YAML_MAPPER.writeValueAsString(result.config())).isEqualTo(""" - filterOneConfig: true - password: /base/secret/my-secret/filter-one - """); - } - - @Test - void shouldInterpolateInnMap() throws JsonProcessingException { - // given - Map secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER); - var i = new SecureConfigInterpolator("/base", secret); - var jsonValue = YAML_MAPPER.readValue(""" kms: AwsKms object: From 6e54025d3da3399d59864a906da369c586bf0e35 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 19 Mar 2025 16:52:39 +1300 Subject: [PATCH 4/4] Include KafkaProtocolFilter in integration tests Filters are what we're all about! Lets add them to the integration tests. Resolves #1968 Signed-off-by: Robert Young --- .../operator/ProxyReconcilerIT.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java index e540df1d1e..7e06ebeeff 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Set; import org.assertj.core.api.AbstractStringAssert; @@ -40,6 +41,8 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterBuilder; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder; import io.kroxylicious.kubernetes.operator.config.RuntimeDecl; import static io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressspec.ClusterIP.Protocol.TCP; @@ -56,6 +59,7 @@ class ProxyReconcilerIT { private static final String PROXY_A = "proxy-a"; private static final String PROXY_B = "proxy-b"; private static final String CLUSTER_FOO_REF = "fooref"; + private static final String FILTER_NAME = "validation"; private static final String CLUSTER_FOO = "foo"; private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip"; private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092"; @@ -88,6 +92,7 @@ void beforeEach() { .withAdditionalCustomResourceDefinition(VirtualKafkaCluster.class) .withAdditionalCustomResourceDefinition(KafkaService.class) .withAdditionalCustomResourceDefinition(KafkaProxyIngress.class) + .withAdditionalCustomResourceDefinition(KafkaProtocolFilter.class) .waitForNamespaceDeletion(true) .withConfigurationService(x -> x.withCloseClientOnStop(false)) .build(); @@ -119,12 +124,13 @@ public KafkaProxyIngress ingress(String name) { CreatedResources doCreate() { KafkaProxy proxy = extension.create(kafkaProxy(PROXY_A)); + KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME)); KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP)); KafkaProxyIngress ingressBar = extension.create(clusterIpIngress(CLUSTER_BAR_CLUSTERIP_INGRESS, proxy)); Set clusterRefs = Set.of(barClusterRef); - VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar)); + VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar, filter)); Set clusters = Set.of(clusterBar); - assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of()); + assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP, filter.getSpec().getType()), Set.of()); assertDeploymentMountsConfigConfigMap(proxy); assertDeploymentBecomesReady(proxy); assertServiceTargetsProxyInstances(proxy, clusterBar, ingressBar); @@ -274,9 +280,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() { KafkaService fooClusterRef = extension.create(clusterRef(CLUSTER_FOO_REF, CLUSTER_FOO_BOOTSTRAP)); KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP)); + KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME)); - VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo)); - VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar)); + VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo, filter)); + VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar, filter)); assertProxyConfigContents(proxyA, Set.of(CLUSTER_FOO_BOOTSTRAP), Set.of()); assertProxyConfigContents(proxyB, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of()); @@ -312,13 +319,13 @@ private AbstractStringAssert assertThatProxyConfigFor(KafkaProxy proxy) { } private static VirtualKafkaCluster virtualKafkaCluster(String clusterName, KafkaProxy proxy, KafkaService clusterRef, - KafkaProxyIngress ingress) { + KafkaProxyIngress ingress, KafkaProtocolFilter filter) { return new VirtualKafkaClusterBuilder().withNewMetadata().withName(clusterName).endMetadata() .withNewSpec() .withTargetKafkaServiceRef(new KafkaServiceRefBuilder().withName(name(clusterRef)).build()) .withNewProxyRef().withName(name(proxy)).endProxyRef() .addNewIngressRef().withName(name(ingress)).endIngressRef() - .withFilterRefs() + .withFilterRefs().addNewFilterRef().withName(name(filter)).endFilterRef() .endSpec().build(); } @@ -329,6 +336,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot .endSpec().build(); } + private static KafkaProtocolFilter filter(String name) { + return new KafkaProtocolFilterBuilder().withNewMetadata().withName(name).endMetadata() + .withNewSpec().withType("RecordValidation").withConfigTemplate(Map.of("rules", List.of(Map.of("allowNulls", false)))) + .endSpec().build(); + } + KafkaProxy kafkaProxy(String name) { // @formatter:off return new KafkaProxyBuilder()