Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kroxylicious-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>
io.kroxylicious.kubernetes.api.common.Condition
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>

<io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
java.lang.Object
</io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
</existingJavaTypes>
<packageOverrides>
<!-- the default package name ($apiGroup.$apiVersion) doesn't work for us -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec;
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder;
import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel;
Expand Down Expand Up @@ -203,17 +204,13 @@ private List<NamedFilterDefinition> filterDefinitions(Context<KafkaProxy> contex
String filterDefinitionName = filterDefinitionName(filterCrRef);

var filterCr = resolutionResult.filter(filterCrRef).orElseThrow();
if (filterCr.getAdditionalProperties().get("spec") instanceof Map<?, ?> spec) {
String type = (String) spec.get("type");
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
}
else {
throw new InvalidClusterException(ClusterCondition.filterInvalid(ResourcesUtil.name(cluster), filterDefinitionName, "`spec` was not an `object`."));
}
var spec = filterCr.getSpec();
String type = spec.getType();
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());

}).toList();
}
Expand All @@ -228,14 +225,14 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
}
}

private SecureConfigInterpolator.InterpolationResult interpolateConfig(Map<?, ?> spec) {
private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) {
SecureConfigInterpolator.InterpolationResult result;
Object configTemplate = spec.get("configTemplate");
Object configTemplate = spec.getConfigTemplate();
if (configTemplate != null) {
result = secureConfigInterpolator.interpolate(configTemplate);
}
else {
result = new SecureConfigInterpolator.InterpolationResult(spec.get("config"), Set.of(), Set.of());
throw new IllegalArgumentException("ConfigTemplate is not found in KafkaProtocolFilterSpec");
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.javaoperatorsdk.operator.api.reconciler.Context;

import io.kroxylicious.kubernetes.api.common.FilterRef;
Expand All @@ -25,6 +24,7 @@
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
import io.kroxylicious.kubernetes.operator.ResourcesUtil;
import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult;

Expand Down Expand Up @@ -52,7 +52,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
.collect(ResourcesUtil.toByLocalRefMap());
Map<LocalRef<KafkaService>, KafkaService> clusterRefs = context.getSecondaryResources(KafkaService.class).stream()
.collect(ResourcesUtil.toByLocalRefMap());
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters = context.getSecondaryResources(GenericKubernetesResource.class).stream()
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters = context.getSecondaryResources(KafkaProtocolFilter.class).stream()
.collect(ResourcesUtil.toByLocalRefMap());
var resolutionResult = virtualKafkaClusters.stream().map(cluster -> determineUnresolvedDependencies(cluster, ingresses, clusterRefs, filters))
.collect(Collectors.toSet());
Expand All @@ -64,7 +64,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaCluster cluster,
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> ingresses,
Map<LocalRef<KafkaService>, KafkaService> clusterRefs,
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
VirtualKafkaClusterSpec spec = cluster.getSpec();
Set<LocalRef<?>> unresolvedDependencies = new HashSet<>();
determineUnresolvedIngresses(spec, ingresses).forEach(unresolvedDependencies::add);
Expand All @@ -74,7 +74,7 @@ private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaClus
}

private Stream<? extends LocalRef<?>> determineUnresolvedFilters(VirtualKafkaClusterSpec spec,
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
List<FilterRef> filtersList = spec.getFilterRefs();
if (filtersList == null) {
return Stream.empty();
Expand Down Expand Up @@ -102,7 +102,7 @@ private static Stream<? extends LocalRef<?>> determineUnresolvedIngresses(Virtua
.filter(ref -> !ingresses.containsKey(ref));
}

private static boolean filterResourceMatchesRef(FilterRef filterRef, GenericKubernetesResource filterResource) {
private static boolean filterResourceMatchesRef(FilterRef filterRef, KafkaProtocolFilter filterResource) {
String apiVersion = filterResource.getApiVersion();
var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/"));
return filterResourceGroup.equals(filterRef.getGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import java.util.Set;
import java.util.function.Predicate;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;

import io.kroxylicious.kubernetes.api.common.FilterRef;
import io.kroxylicious.kubernetes.api.common.LocalRef;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
import io.kroxylicious.kubernetes.operator.ResourcesUtil;

import static java.util.Comparator.comparing;
Expand All @@ -32,7 +31,7 @@
* describes which dependencies could not be resolved per VirtualKafkaCluster.
*/
public class ResolutionResult {
private final Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters;
private final Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters;
private final Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses;
private final Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs;
private final Set<ClusterResolutionResult> clusterResolutionResults;
Expand All @@ -53,7 +52,7 @@ public boolean isFullyResolved() {

}

ResolutionResult(Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters,
ResolutionResult(Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters,
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses,
Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs,
Set<ClusterResolutionResult> clusterResolutionResults) {
Expand Down Expand Up @@ -127,15 +126,15 @@ public Optional<KafkaService> kafkaServiceRef(VirtualKafkaCluster cluster) {
* Get all resolved Filters
* @return filters
*/
public Collection<GenericKubernetesResource> filters() {
public Collection<KafkaProtocolFilter> filters() {
return filters.values();
}

/**
* Get the resolved GenericKubernetesResource for a filterRef
* @return optional containing the resource if resolved, else empty
*/
public Optional<GenericKubernetesResource> filter(FilterRef filterRef) {
public Optional<KafkaProtocolFilter> filter(FilterRef filterRef) {
return filters().stream()
.filter(filterResource -> {
String apiVersion = filterResource.getApiVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -48,6 +47,7 @@
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;

import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -363,18 +363,18 @@ private static Context<KafkaProxy> buildContext(Path testDir, List<VirtualKafkaC
doReturn(resourceContext).when(context).managedWorkflowAndDependentResourceContext();

var runtimeDecl = OperatorMain.runtimeDecl();
Set<GenericKubernetesResource> filterInstances = new HashSet<>();
Set<KafkaProtocolFilter> filterInstances = new HashSet<>();
for (var filterApi : runtimeDecl.filterApis()) {
String fileName = "in-" + filterApi.kind() + "-*.yaml";
try (var dirStream = Files.newDirectoryStream(testDir, fileName)) {
for (Path p : dirStream) {
GenericKubernetesResource resource = YAML_MAPPER.readValue(p.toFile(), GenericKubernetesResource.class);
KafkaProtocolFilter resource = YAML_MAPPER.readValue(p.toFile(), KafkaProtocolFilter.class);
assertMinimalMetadata(resource.getMetadata(), fileName);
filterInstances.add(resource);
}
}
}
doReturn(filterInstances).when(context).getSecondaryResources(GenericKubernetesResource.class);
doReturn(filterInstances).when(context).getSecondaryResources(KafkaProtocolFilter.class);
doReturn(Set.copyOf(virtualKafkaClusters)).when(context).getSecondaryResources(VirtualKafkaCluster.class);
doReturn(Set.copyOf(kafkaServiceRefs)).when(context).getSecondaryResources(KafkaService.class);
doReturn(Set.copyOf(ingresses)).when(context).getSecondaryResources(KafkaProxyIngress.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.assertj.core.api.AbstractStringAssert;
Expand Down Expand Up @@ -40,6 +41,8 @@
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterBuilder;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder;
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;

import static io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressspec.ClusterIP.Protocol.TCP;
Expand All @@ -56,6 +59,7 @@ class ProxyReconcilerIT {
private static final String PROXY_A = "proxy-a";
private static final String PROXY_B = "proxy-b";
private static final String CLUSTER_FOO_REF = "fooref";
private static final String FILTER_NAME = "validation";
private static final String CLUSTER_FOO = "foo";
private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip";
private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092";
Expand Down Expand Up @@ -88,6 +92,7 @@ void beforeEach() {
.withAdditionalCustomResourceDefinition(VirtualKafkaCluster.class)
.withAdditionalCustomResourceDefinition(KafkaService.class)
.withAdditionalCustomResourceDefinition(KafkaProxyIngress.class)
.withAdditionalCustomResourceDefinition(KafkaProtocolFilter.class)
.waitForNamespaceDeletion(true)
.withConfigurationService(x -> x.withCloseClientOnStop(false))
.build();
Expand Down Expand Up @@ -119,12 +124,13 @@ public KafkaProxyIngress ingress(String name) {

CreatedResources doCreate() {
KafkaProxy proxy = extension.create(kafkaProxy(PROXY_A));
KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME));
KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP));
KafkaProxyIngress ingressBar = extension.create(clusterIpIngress(CLUSTER_BAR_CLUSTERIP_INGRESS, proxy));
Set<KafkaService> clusterRefs = Set.of(barClusterRef);
VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar));
VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar, filter));
Set<VirtualKafkaCluster> clusters = Set.of(clusterBar);
assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of());
assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP, filter.getSpec().getType()), Set.of());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think a rude assertion that RecordValidation is in the config is enough for your PR. The IT needs a rework to clean up entity creation and I think to allow for better config validations than contains/not-contains

assertDeploymentMountsConfigConfigMap(proxy);
assertDeploymentBecomesReady(proxy);
assertServiceTargetsProxyInstances(proxy, clusterBar, ingressBar);
Expand Down Expand Up @@ -274,9 +280,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() {

KafkaService fooClusterRef = extension.create(clusterRef(CLUSTER_FOO_REF, CLUSTER_FOO_BOOTSTRAP));
KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP));
KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME));

VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo));
VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar));
VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo, filter));
VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar, filter));

assertProxyConfigContents(proxyA, Set.of(CLUSTER_FOO_BOOTSTRAP), Set.of());
assertProxyConfigContents(proxyB, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of());
Expand Down Expand Up @@ -312,13 +319,13 @@ private AbstractStringAssert<?> assertThatProxyConfigFor(KafkaProxy proxy) {
}

private static VirtualKafkaCluster virtualKafkaCluster(String clusterName, KafkaProxy proxy, KafkaService clusterRef,
KafkaProxyIngress ingress) {
KafkaProxyIngress ingress, KafkaProtocolFilter filter) {
return new VirtualKafkaClusterBuilder().withNewMetadata().withName(clusterName).endMetadata()
.withNewSpec()
.withTargetKafkaServiceRef(new KafkaServiceRefBuilder().withName(name(clusterRef)).build())
.withNewProxyRef().withName(name(proxy)).endProxyRef()
.addNewIngressRef().withName(name(ingress)).endIngressRef()
.withFilterRefs()
.withFilterRefs().addNewFilterRef().withName(name(filter)).endFilterRef()
.endSpec().build();
}

Expand All @@ -329,6 +336,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot
.endSpec().build();
}

private static KafkaProtocolFilter filter(String name) {
return new KafkaProtocolFilterBuilder().withNewMetadata().withName(name).endMetadata()
.withNewSpec().withType("RecordValidation").withConfigTemplate(Map.of("rules", List.of(Map.of("allowNulls", false))))
.endSpec().build();
}

KafkaProxy kafkaProxy(String name) {
// @formatter:off
return new KafkaProxyBuilder()
Expand Down
Loading
Loading