Skip to content

Commit 287f048

Browse files
authored
Merge pull request kroxylicious#1964 from SamBarker/use-kpf
Replace GenericKubernetesResource with KafkaProtocolFilter
2 parents 233fdbe + 8345ebe commit 287f048

File tree

12 files changed

+94
-174
lines changed

12 files changed

+94
-174
lines changed

kroxylicious-operator/pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@
357357
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>
358358
io.kroxylicious.kubernetes.api.common.Condition
359359
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.Conditions>
360-
360+
<io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
361+
java.lang.Object
362+
</io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
361363
</existingJavaTypes>
362364
<packageOverrides>
363365
<!-- the default package name ($apiGroup.$apiVersion) doesn't work for us -->

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.io.IOException;
99
import java.net.InetSocketAddress;
1010
import java.time.Duration;
11-
import java.util.List;
1211
import java.util.Map;
1312
import java.util.Properties;
1413
import java.util.function.IntSupplier;
@@ -27,8 +26,6 @@
2726
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
2827
import io.prometheus.metrics.exporter.httpserver.MetricsHandler;
2928

30-
import io.kroxylicious.kubernetes.operator.config.FilterApiDecl;
31-
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
3229
import io.kroxylicious.kubernetes.operator.management.UnsupportedHttpMethodFilter;
3330
import io.kroxylicious.proxy.service.HostPort;
3431
import io.kroxylicious.proxy.tag.VisibleForTesting;
@@ -81,7 +78,7 @@ public static void main(String[] args) {
8178
*/
8279
void start() {
8380
operator.installShutdownHook(Duration.ofSeconds(10));
84-
operator.register(new ProxyReconciler(runtimeDecl()));
81+
operator.register(new ProxyReconciler());
8582
addHttpGetHandler("/", () -> 404);
8683
managementServer.start();
8784
operator.start();
@@ -120,12 +117,6 @@ void stop() {
120117
LOGGER.info("Operator stopped.");
121118
}
122119

123-
@NonNull
124-
static RuntimeDecl runtimeDecl() {
125-
// TODO read these from some configuration CR
126-
return new RuntimeDecl(List.of(new FilterApiDecl("filter.kroxylicious.io", "v1alpha1", "KafkaProtocolFilter")));
127-
}
128-
129120
private MicrometerMetrics enablePrometheusMetrics() {
130121
return MicrometerMetrics.newPerResourceCollectingMicrometerMetricsBuilder(Metrics.globalRegistry)
131122
.withCleanUpDelayInSeconds(35)

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.LinkedHashSet;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.Objects;
1718
import java.util.Optional;
1819
import java.util.Set;
1920
import java.util.stream.Collectors;
@@ -33,6 +34,7 @@
3334
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
3435
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
3536
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
37+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec;
3638
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
3739
import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder;
3840
import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel;
@@ -203,17 +205,13 @@ private List<NamedFilterDefinition> filterDefinitions(Context<KafkaProxy> contex
203205
String filterDefinitionName = filterDefinitionName(filterCrRef);
204206

205207
var filterCr = resolutionResult.filter(filterCrRef).orElseThrow();
206-
if (filterCr.getAdditionalProperties().get("spec") instanceof Map<?, ?> spec) {
207-
String type = (String) spec.get("type");
208-
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
209-
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
210-
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
211-
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
212-
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
213-
}
214-
else {
215-
throw new InvalidClusterException(ClusterCondition.filterInvalid(ResourcesUtil.name(cluster), filterDefinitionName, "`spec` was not an `object`."));
216-
}
208+
var spec = filterCr.getSpec();
209+
String type = spec.getType();
210+
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
211+
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
212+
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
213+
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
214+
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
217215

218216
}).toList();
219217
}
@@ -228,16 +226,9 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
228226
}
229227
}
230228

231-
private SecureConfigInterpolator.InterpolationResult interpolateConfig(Map<?, ?> spec) {
232-
SecureConfigInterpolator.InterpolationResult result;
233-
Object configTemplate = spec.get("configTemplate");
234-
if (configTemplate != null) {
235-
result = secureConfigInterpolator.interpolate(configTemplate);
236-
}
237-
else {
238-
result = new SecureConfigInterpolator.InterpolationResult(spec.get("config"), Set.of(), Set.of());
239-
}
240-
return result;
229+
private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) {
230+
Object configTemplate = Objects.requireNonNull(spec.getConfigTemplate(), "ConfigTemplate is required in the KafkaProtocolFilterSpec");
231+
return secureConfigInterpolator.interpolate(configTemplate);
241232
}
242233

243234
private static VirtualCluster getVirtualCluster(VirtualKafkaCluster cluster,

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyReconciler.java

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.time.Duration;
99
import java.time.ZoneId;
1010
import java.time.ZonedDateTime;
11-
import java.util.ArrayList;
1211
import java.util.List;
1312
import java.util.Objects;
1413
import java.util.Optional;
@@ -21,12 +20,10 @@
2120
import org.slf4j.LoggerFactory;
2221
import org.slf4j.spi.LoggingEventBuilder;
2322

24-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
2523
import io.fabric8.kubernetes.api.model.HasMetadata;
2624
import io.javaoperatorsdk.operator.AggregatedOperatorException;
2725
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
2826
import io.javaoperatorsdk.operator.api.reconciler.Context;
29-
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
3027
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
3128
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
3229
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
@@ -47,8 +44,7 @@
4744
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
4845
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
4946
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
50-
import io.kroxylicious.kubernetes.operator.config.FilterApiDecl;
51-
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
47+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
5248
import io.kroxylicious.proxy.tag.VisibleForTesting;
5349

5450
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -79,7 +75,6 @@
7975
})
8076
// @formatter:on
8177
public class ProxyReconciler implements
82-
ContextInitializer<KafkaProxy>,
8378
Reconciler<KafkaProxy> {
8479

8580
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyReconciler.class);
@@ -88,17 +83,7 @@ public class ProxyReconciler implements
8883
public static final String DEPLOYMENT_DEP = "deployment";
8984
public static final String CLUSTERS_DEP = "clusters";
9085

91-
private final RuntimeDecl runtimeDecl;
92-
93-
public ProxyReconciler(RuntimeDecl runtimeDecl) {
94-
this.runtimeDecl = runtimeDecl;
95-
}
96-
97-
@Override
98-
public void initContext(
99-
KafkaProxy primary,
100-
Context<KafkaProxy> context) {
101-
SharedKafkaProxyContext.runtimeDecl(context, runtimeDecl);
86+
public ProxyReconciler() {
10287
}
10388

10489
/**
@@ -291,24 +276,11 @@ private static boolean isReadyEqualsTrue(Condition oldReady) {
291276

292277
@Override
293278
public List<EventSource<?, KafkaProxy>> prepareEventSources(EventSourceContext<KafkaProxy> context) {
294-
var eventSources = new ArrayList<EventSource<?, KafkaProxy>>(this.runtimeDecl.filterApis().size());
295-
for (var filterKind : this.runtimeDecl.filterApis()) {
296-
try {
297-
eventSources.add(eventSourceForFilter(context, filterKind));
298-
}
299-
catch (Exception e) {
300-
throw new OperatorConfigurationException("EventSource for " + filterKind + " could not be created.\n"
301-
+ "Hints:\n"
302-
+ "1. Check the Kind '" + filterKind.kind() + "' is present in the output of "
303-
+ "`kubectl api-resources --api-group=" + filterKind.group() + "`.\n"
304-
+ "2. Check access controls allow the operator to 'get,list,watch' this API.",
305-
e);
306-
}
307-
}
308-
eventSources.add(buildVirtualKafkaClusterInformer(context));
309-
eventSources.add(buildKafkaServiceInformer(context));
310-
eventSources.add(buildKafkaProxyIngressInformer(context));
311-
return eventSources;
279+
return List.of(
280+
eventSourceForFilter(context),
281+
buildVirtualKafkaClusterInformer(context),
282+
buildKafkaServiceInformer(context),
283+
buildKafkaProxyIngressInformer(context));
312284
}
313285

314286
private static InformerEventSource<?, KafkaProxy> buildVirtualKafkaClusterInformer(EventSourceContext<KafkaProxy> context) {
@@ -375,11 +347,9 @@ private static InformerEventSource<?, KafkaProxy> buildKafkaServiceInformer(Even
375347
}
376348

377349
@NonNull
378-
private static InformerEventSource<GenericKubernetesResource, KafkaProxy> eventSourceForFilter(
379-
EventSourceContext<KafkaProxy> context,
380-
FilterApiDecl filterApiDecl) {
350+
private static InformerEventSource<KafkaProtocolFilter, KafkaProxy> eventSourceForFilter(EventSourceContext<KafkaProxy> context) {
381351

382-
var configuration = InformerEventSourceConfiguration.from(filterApiDecl.groupVersionKind(), KafkaProxy.class)
352+
var configuration = InformerEventSourceConfiguration.from(KafkaProtocolFilter.class, KafkaProxy.class)
383353
.withSecondaryToPrimaryMapper(filterToProxy(context))
384354
.withPrimaryToSecondaryMapper(proxyToFilters(context))
385355
.build();
@@ -443,8 +413,8 @@ private static InformerEventSource<GenericKubernetesResource, KafkaProxy> eventS
443413
}
444414

445415
@VisibleForTesting
446-
static @NonNull SecondaryToPrimaryMapper<GenericKubernetesResource> filterToProxy(EventSourceContext<KafkaProxy> context) {
447-
return (GenericKubernetesResource filter) -> {
416+
static @NonNull SecondaryToPrimaryMapper<KafkaProtocolFilter> filterToProxy(EventSourceContext<KafkaProxy> context) {
417+
return (KafkaProtocolFilter filter) -> {
448418
// filters don't point to a proxy, but must be in the same namespace as the proxy/proxies which reference the,
449419
// so when a filter changes we reconcile all the proxies in the same namespace
450420
Set<ResourceID> proxiesInFilterNamespace = filteredResourceIdsInSameNamespace(context, filter, KafkaProxy.class, proxy -> true);

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/SharedKafkaProxyContext.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
1717
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
18-
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
1918

2019
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
2120

@@ -28,16 +27,8 @@ public class SharedKafkaProxyContext {
2827
private SharedKafkaProxyContext() {
2928
}
3029

31-
static final String RUNTIME_DECL_KEY = "runtime";
3230
static final String CLUSTER_CONDITIONS_KEY = "cluster_conditions";
3331

34-
/**
35-
* Set the RuntimeDecl
36-
*/
37-
static void runtimeDecl(Context<KafkaProxy> context, RuntimeDecl runtimeDecl) {
38-
context.managedWorkflowAndDependentResourceContext().put(RUNTIME_DECL_KEY, runtimeDecl);
39-
}
40-
4132
/**
4233
* Associate a condition with a specific cluster.
4334
*/

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/config/RuntimeDecl.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.stream.Collectors;
1616
import java.util.stream.Stream;
1717

18-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
1918
import io.javaoperatorsdk.operator.api.reconciler.Context;
2019

2120
import io.kroxylicious.kubernetes.api.common.FilterRef;
@@ -25,6 +24,7 @@
2524
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
2625
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
2726
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
27+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
2828
import io.kroxylicious.kubernetes.operator.ResourcesUtil;
2929
import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult;
3030

@@ -52,7 +52,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
5252
.collect(ResourcesUtil.toByLocalRefMap());
5353
Map<LocalRef<KafkaService>, KafkaService> clusterRefs = context.getSecondaryResources(KafkaService.class).stream()
5454
.collect(ResourcesUtil.toByLocalRefMap());
55-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters = context.getSecondaryResources(GenericKubernetesResource.class).stream()
55+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters = context.getSecondaryResources(KafkaProtocolFilter.class).stream()
5656
.collect(ResourcesUtil.toByLocalRefMap());
5757
var resolutionResult = virtualKafkaClusters.stream().map(cluster -> determineUnresolvedDependencies(cluster, ingresses, clusterRefs, filters))
5858
.collect(Collectors.toSet());
@@ -64,7 +64,7 @@ public ResolutionResult deepResolve(Context<KafkaProxy> context, UnresolvedDepen
6464
private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaCluster cluster,
6565
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> ingresses,
6666
Map<LocalRef<KafkaService>, KafkaService> clusterRefs,
67-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
67+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
6868
VirtualKafkaClusterSpec spec = cluster.getSpec();
6969
Set<LocalRef<?>> unresolvedDependencies = new HashSet<>();
7070
determineUnresolvedIngresses(spec, ingresses).forEach(unresolvedDependencies::add);
@@ -74,7 +74,7 @@ private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaClus
7474
}
7575

7676
private Stream<? extends LocalRef<?>> determineUnresolvedFilters(VirtualKafkaClusterSpec spec,
77-
Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters) {
77+
Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters) {
7878
List<FilterRef> filtersList = spec.getFilterRefs();
7979
if (filtersList == null) {
8080
return Stream.empty();
@@ -102,7 +102,7 @@ private static Stream<? extends LocalRef<?>> determineUnresolvedIngresses(Virtua
102102
.filter(ref -> !ingresses.containsKey(ref));
103103
}
104104

105-
private static boolean filterResourceMatchesRef(FilterRef filterRef, GenericKubernetesResource filterResource) {
105+
private static boolean filterResourceMatchesRef(FilterRef filterRef, KafkaProtocolFilter filterResource) {
106106
String apiVersion = filterResource.getApiVersion();
107107
var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/"));
108108
return filterResourceGroup.equals(filterRef.getGroup())

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
import java.util.Set;
1616
import java.util.function.Predicate;
1717

18-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
19-
2018
import io.kroxylicious.kubernetes.api.common.FilterRef;
2119
import io.kroxylicious.kubernetes.api.common.LocalRef;
2220
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
2321
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
2422
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
23+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
2524
import io.kroxylicious.kubernetes.operator.ResourcesUtil;
2625

2726
import static java.util.Comparator.comparing;
@@ -32,7 +31,7 @@
3231
* describes which dependencies could not be resolved per VirtualKafkaCluster.
3332
*/
3433
public class ResolutionResult {
35-
private final Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters;
34+
private final Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters;
3635
private final Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses;
3736
private final Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs;
3837
private final Set<ClusterResolutionResult> clusterResolutionResults;
@@ -53,7 +52,7 @@ public boolean isFullyResolved() {
5352

5453
}
5554

56-
ResolutionResult(Map<LocalRef<GenericKubernetesResource>, GenericKubernetesResource> filters,
55+
ResolutionResult(Map<LocalRef<KafkaProtocolFilter>, KafkaProtocolFilter> filters,
5756
Map<LocalRef<KafkaProxyIngress>, KafkaProxyIngress> kafkaProxyIngresses,
5857
Map<LocalRef<KafkaService>, KafkaService> kafkaServiceRefs,
5958
Set<ClusterResolutionResult> clusterResolutionResults) {
@@ -127,15 +126,15 @@ public Optional<KafkaService> kafkaServiceRef(VirtualKafkaCluster cluster) {
127126
* Get all resolved Filters
128127
* @return filters
129128
*/
130-
public Collection<GenericKubernetesResource> filters() {
129+
public Collection<KafkaProtocolFilter> filters() {
131130
return filters.values();
132131
}
133132

134133
/**
135134
* Get the resolved GenericKubernetesResource for a filterRef
136135
* @return optional containing the resource if resolved, else empty
137136
*/
138-
public Optional<GenericKubernetesResource> filter(FilterRef filterRef) {
137+
public Optional<KafkaProtocolFilter> filter(FilterRef filterRef) {
139138
return filters().stream()
140139
.filter(filterResource -> {
141140
String apiVersion = filterResource.getApiVersion();

0 commit comments

Comments
 (0)