Skip to content

Commit 980a436

Browse files
committed
Replace GenericKubernetesResource with KafkaProtocolFilter
Signed-off-by: Sam Barker <[email protected]> rh-pre-commit.version: 2.0.1 rh-pre-commit.check-secrets: ENABLED
1 parent 233fdbe commit 980a436

File tree

6 files changed

+81
-51
lines changed

6 files changed

+81
-51
lines changed

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Map;
1717
import java.util.Optional;
1818
import java.util.Set;
19+
import java.util.TreeMap;
1920
import java.util.stream.Collectors;
2021

2122
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,6 +34,8 @@
3334
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
3435
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
3536
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
37+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec;
38+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate;
3639
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
3740
import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder;
3841
import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel;
@@ -46,11 +49,11 @@
4649
import io.kroxylicious.proxy.config.admin.ManagementConfiguration;
4750
import io.kroxylicious.proxy.config.admin.PrometheusMetricsConfig;
4851

49-
import edu.umd.cs.findbugs.annotations.NonNull;
50-
5152
import static io.kroxylicious.kubernetes.operator.Labels.standardLabels;
5253
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
5354

55+
import edu.umd.cs.findbugs.annotations.NonNull;
56+
5457
/**
5558
* Generates a Kube {@code ConfigMap} containing the proxy config YAML.
5659
*/
@@ -203,17 +206,13 @@ private List<NamedFilterDefinition> filterDefinitions(Context<KafkaProxy> contex
203206
String filterDefinitionName = filterDefinitionName(filterCrRef);
204207

205208
var filterCr = resolutionResult.filter(filterCrRef).orElseThrow();
206-
if (filterCr.getAdditionalProperties().get("spec") instanceof Map<?, ?> spec) {
207-
String type = (String) spec.get("type");
208-
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
209-
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
210-
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
211-
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
212-
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
213-
}
214-
else {
215-
throw new InvalidClusterException(ClusterCondition.filterInvalid(ResourcesUtil.name(cluster), filterDefinitionName, "`spec` was not an `object`."));
216-
}
209+
var spec = filterCr.getSpec();
210+
String type = spec.getType();
211+
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
212+
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
213+
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
214+
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
215+
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
217216

218217
}).toList();
219218
}
@@ -228,14 +227,17 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
228227
}
229228
}
230229

231-
private SecureConfigInterpolator.InterpolationResult interpolateConfig(Map<?, ?> spec) {
230+
private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) {
232231
SecureConfigInterpolator.InterpolationResult result;
233-
Object configTemplate = spec.get("configTemplate");
232+
ConfigTemplate configTemplate = spec.getConfigTemplate();
234233
if (configTemplate != null) {
235-
result = secureConfigInterpolator.interpolate(configTemplate);
234+
//Nasty nsaty hack. Revisit
235+
final TreeMap<String, Object> configProperties = new TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder()));
236+
configProperties.putAll(configTemplate.getAdditionalProperties());
237+
result = secureConfigInterpolator.interpolate(configProperties);
236238
}
237239
else {
238-
result = new SecureConfigInterpolator.InterpolationResult(spec.get("config"), Set.of(), Set.of());
240+
throw new IllegalArgumentException("ConfigTemplate is not found in KafkaProtocolFilterSpec");
239241
}
240242
return result;
241243
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.stream.Collectors;
1616
import java.util.stream.Stream;
1717

18-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
1918
import io.javaoperatorsdk.operator.api.reconciler.Context;
2019

2120
import io.kroxylicious.kubernetes.api.common.FilterRef;
@@ -25,6 +24,7 @@
2524
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
2625
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
2726
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
27+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
2828
import io.kroxylicious.kubernetes.operator.ResourcesUtil;
2929
import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult;
3030

@@ -52,7 +52,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
5252
.collect(ResourcesUtil.toByLocalRefMap());
5353
Map<LocalRef<KafkaService>, KafkaService> clusterRefs = context.getSecondaryResources(KafkaService.class).stream()
5454
.collect(ResourcesUtil.toByLocalRefMap());
55-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters = context.getSecondaryResources(GenericKubernetesResource.class).stream()
55+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters = context.getSecondaryResources(KafkaProtocolFilter.class).stream()
5656
.collect(ResourcesUtil.toByLocalRefMap());
5757
var resolutionResult = virtualKafkaClusters.stream().map(cluster -> determineUnresolvedDependencies(cluster, ingresses, clusterRefs, filters))
5858
.collect(Collectors.toSet());
@@ -64,7 +64,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
6464
private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaCluster cluster,
6565
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> ingresses,
6666
Map<LocalRef<KafkaService>, KafkaService> clusterRefs,
67-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
67+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
6868
VirtualKafkaClusterSpec spec = cluster.getSpec();
6969
Set<LocalRef<?>> unresolvedDependencies = new HashSet<>();
7070
determineUnresolvedIngresses(spec, ingresses).forEach(unresolvedDependencies::add);
@@ -74,7 +74,7 @@ private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaClus
7474
}
7575

7676
private Stream<? extends LocalRef<?>> determineUnresolvedFilters(VirtualKafkaClusterSpec spec,
77-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
77+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
7878
List<FilterRef> filtersList = spec.getFilterRefs();
7979
if (filtersList == null) {
8080
return Stream.empty();
@@ -102,7 +102,7 @@ private static Stream<? extends LocalRef<?>> determineUnresolvedIngresses(Virtua
102102
.filter(ref -> !ingresses.containsKey(ref));
103103
}
104104

105-
private static boolean filterResourceMatchesRef(FilterRef filterRef, GenericKubernetesResource filterResource) {
105+
private static boolean filterResourceMatchesRef(FilterRef filterRef, KafkaProtocolFilter filterResource) {
106106
String apiVersion = filterResource.getApiVersion();
107107
var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/"));
108108
return filterResourceGroup.equals(filterRef.getGroup())

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
import java.util.Set;
1616
import java.util.function.Predicate;
1717

18-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
19-
2018
import io.kroxylicious.kubernetes.api.common.FilterRef;
2119
import io.kroxylicious.kubernetes.api.common.LocalRef;
2220
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
2321
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
2422
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
23+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
2524
import io.kroxylicious.kubernetes.operator.ResourcesUtil;
2625

2726
import static java.util.Comparator.comparing;
@@ -32,7 +31,7 @@
3231
* describes which dependencies could not be resolved per VirtualKafkaCluster.
3332
*/
3433
public class ResolutionResult {
35-
private final Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters;
34+
private final Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters;
3635
private final Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses;
3736
private final Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs;
3837
private final Set<ClusterResolutionResult> clusterResolutionResults;
@@ -53,7 +52,7 @@ public boolean isFullyResolved() {
5352

5453
}
5554

56-
ResolutionResult(Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters,
55+
ResolutionResult(Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters,
5756
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses,
5857
Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs,
5958
Set<ClusterResolutionResult> clusterResolutionResults) {
@@ -127,15 +126,15 @@ public Optional<KafkaService> kafkaServiceRef(VirtualKafkaCluster cluster) {
127126
* Get all resolved Filters
128127
* @return filters
129128
*/
130-
public Collection<GenericKubernetesResource> filters() {
129+
public Collection<KafkaProtocolFilter> filters() {
131130
return filters.values();
132131
}
133132

134133
/**
135134
* Get the resolved GenericKubernetesResource for a filterRef
136135
* @return optional containing the resource if resolved, else empty
137136
*/
138-
public Optional<GenericKubernetesResource> filter(FilterRef filterRef) {
137+
public Optional<KafkaProtocolFilter> filter(FilterRef filterRef) {
139138
return filters().stream()
140139
.filter(filterResource -> {
141140
String apiVersion = filterResource.getApiVersion();

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
3636
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
3737

38-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
3938
import io.fabric8.kubernetes.api.model.HasMetadata;
4039
import io.fabric8.kubernetes.api.model.ObjectMeta;
4140
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -48,17 +47,18 @@
4847
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
4948
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
5049
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
50+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
5151
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
5252

53-
import edu.umd.cs.findbugs.annotations.NonNull;
54-
5553
import static io.kroxylicious.kubernetes.operator.ProxyDeployment.KROXYLICIOUS_IMAGE_ENV_VAR;
5654
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
5755
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
5856
import static org.assertj.core.api.Assertions.assertThat;
5957
import static org.mockito.Mockito.doReturn;
6058
import static org.mockito.Mockito.mock;
6159

60+
import edu.umd.cs.findbugs.annotations.NonNull;
61+
6262
class DerivedResourcesTest {
6363

6464
static final YAMLMapper YAML_MAPPER = new YAMLMapper()
@@ -363,18 +363,18 @@ private static Context<KafkaProxy> buildContext(Path testDir, List<VirtualKafkaC
363363
doReturn(resourceContext).when(context).managedWorkflowAndDependentResourceContext();
364364

365365
var runtimeDecl = OperatorMain.runtimeDecl();
366-
Set<GenericKubernetesResource> filterInstances = new HashSet<>();
366+
Set<KafkaProtocolFilter> filterInstances = new HashSet<>();
367367
for (var filterApi : runtimeDecl.filterApis()) {
368368
String fileName = "in-" + filterApi.kind() + "-*.yaml";
369369
try (var dirStream = Files.newDirectoryStream(testDir, fileName)) {
370370
for (Path p : dirStream) {
371-
GenericKubernetesResource resource = YAML_MAPPER.readValue(p.toFile(), GenericKubernetesResource.class);
371+
KafkaProtocolFilter resource = YAML_MAPPER.readValue(p.toFile(), KafkaProtocolFilter.class);
372372
assertMinimalMetadata(resource.getMetadata(), fileName);
373373
filterInstances.add(resource);
374374
}
375375
}
376376
}
377-
doReturn(filterInstances).when(context).getSecondaryResources(GenericKubernetesResource.class);
377+
doReturn(filterInstances).when(context).getSecondaryResources(KafkaProtocolFilter.class);
378378
doReturn(Set.copyOf(virtualKafkaClusters)).when(context).getSecondaryResources(VirtualKafkaCluster.class);
379379
doReturn(Set.copyOf(kafkaServiceRefs)).when(context).getSecondaryResources(KafkaService.class);
380380
doReturn(Set.copyOf(ingresses)).when(context).getSecondaryResources(KafkaProxyIngress.class);

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolatorTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.fabric8.kubernetes.api.model.Volume;
1919
import io.fabric8.kubernetes.api.model.VolumeMount;
2020

21+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate;
22+
2123
import static org.assertj.core.api.Assertions.assertThat;
2224
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2325

@@ -68,6 +70,33 @@ void shouldInterpolateInAnObject() throws JsonProcessingException {
6870
Map<String, SecureConfigProvider> secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER);
6971
var i = new SecureConfigInterpolator("/base", secret);
7072

73+
var jsonValue = YAML_MAPPER.readValue("""
74+
filterOneConfig: true
75+
password: "${secret:my-secret:filter-one}"
76+
""", ConfigTemplate.class);
77+
78+
// when
79+
var result = i.interpolate(jsonValue);
80+
81+
// then
82+
assertThat(result.volumes()).singleElement().extracting(Volume::getName).isEqualTo("secrets-my-secret");
83+
assertThat(result.volumes()).singleElement().extracting(Volume::getSecret).extracting(SecretVolumeSource::getSecretName).isEqualTo("my-secret");
84+
85+
assertThat(result.mounts()).singleElement().extracting(VolumeMount::getName).isEqualTo("secrets-filter-one");
86+
assertThat(result.mounts()).singleElement().extracting(VolumeMount::getMountPath).isEqualTo("/base/secret/filter-one");
87+
88+
assertThat(YAML_MAPPER.writeValueAsString(result.config())).isEqualTo("""
89+
filterOneConfig: true
90+
password: /base/secret/my-secret/filter-one
91+
""");
92+
}
93+
94+
@Test
95+
void shouldInterpolateInnMap() throws JsonProcessingException {
96+
// given
97+
Map<String, SecureConfigProvider> secret = Map.of("secret", MountedResourceConfigProvider.SECRET_PROVIDER);
98+
var i = new SecureConfigInterpolator("/base", secret);
99+
71100
var jsonValue = YAML_MAPPER.readValue("""
72101
kms: AwsKms
73102
object:

0 commit comments

Comments
 (0)