Skip to content

Commit 69beb63

Browse files
authored
Merge pull request kroxylicious#1889 from k-wall/impl-kafkaclusterrefs
Implementing basic KafkaClusterRef
2 parents 75baab3 + 5c7220a commit 69beb63

30 files changed

+704
-103
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
---
8+
kind: KafkaClusterRef
9+
apiVersion: kroxylicious.io/v1alpha1
10+
metadata:
11+
name: my-cluster
12+
namespace: my-proxy
13+
spec:
14+
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092

kroxylicious-operator/examples/record-encryption/03.VirtualKafkaCluster.my-cluster.yaml renamed to kroxylicious-operator/examples/record-encryption/04.VirtualKafkaCluster.my-cluster.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ spec:
1414
proxyRef:
1515
name: simple
1616
targetCluster:
17-
bootstrapping:
18-
bootstrapAddress:
19-
my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
17+
clusterRef:
18+
name: my-cluster
2019
filters:
2120
- group: filter.kroxylicious.io
2221
kind: KafkaProtocolFilter
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
---
8+
kind: KafkaClusterRef
9+
apiVersion: kroxylicious.io/v1alpha1
10+
metadata:
11+
name: my-cluster
12+
namespace: my-proxy
13+
spec:
14+
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092

kroxylicious-operator/examples/record-validation/03.VirtualKafkaCluster.my-cluster.yaml renamed to kroxylicious-operator/examples/record-validation/04.VirtualKafkaCluster.my-cluster.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ spec:
1414
proxyRef:
1515
name: simple
1616
targetCluster:
17-
bootstrapping:
18-
bootstrapAddress: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
17+
clusterRef:
18+
name: my-cluster
1919
filters:
2020
- group: filter.kroxylicious.io
2121
kind: RecordValidation
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
---
8+
kind: KafkaClusterRef
9+
apiVersion: kroxylicious.io/v1alpha1
10+
metadata:
11+
name: my-cluster
12+
namespace: my-proxy
13+
spec:
14+
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092

kroxylicious-operator/examples/simple/02.VirtualKafkaCluster.my-cluster.yaml renamed to kroxylicious-operator/examples/simple/03.VirtualKafkaCluster.my-cluster.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ spec:
1414
proxyRef:
1515
name: simple
1616
targetCluster:
17-
bootstrapping:
18-
bootstrapAddress: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
17+
clusterRef:
18+
name: my-cluster

kroxylicious-operator/install/01.ClusterRole.kroxylicious-operator-watched.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ rules:
1919
resources:
2020
- kafkaproxies
2121
- virtualkafkaclusters
22+
- kafkaclusterrefs
2223
verbs:
2324
- get
2425
- list

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigSecret.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
2929
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource;
3030
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
31+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3132

33+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaClusterRef;
3234
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
3335
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
36+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
3437
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.Filters;
3538
import io.kroxylicious.proxy.config.ConfigParser;
3639
import io.kroxylicious.proxy.config.Configuration;
@@ -130,9 +133,11 @@ String generateProxyConfig(KafkaProxy primary,
130133

131134
List<VirtualKafkaCluster> virtualKafkaClusters = ResourcesUtil.clustersInNameOrder(context).toList();
132135

136+
Map<ResourceID, KafkaClusterRef> clusterRefs = ResourcesUtil.clusterRefs(context);
137+
133138
List<NamedFilterDefinition> filterDefinitions = buildFilterDefinitions(context, virtualKafkaClusters);
134139

135-
var virtualClusters = buildVirtualClusters(primary, context, virtualKafkaClusters);
140+
var virtualClusters = buildVirtualClusters(primary, context, virtualKafkaClusters, clusterRefs);
136141

137142
Configuration configuration = new Configuration(
138143
new AdminHttpConfiguration(null, null, new EndpointsConfiguration(new PrometheusMetricsConfig())), filterDefinitions,
@@ -146,11 +151,12 @@ String generateProxyConfig(KafkaProxy primary,
146151
}
147152

148153
@NonNull
149-
private static List<VirtualCluster> buildVirtualClusters(KafkaProxy primary, Context<KafkaProxy> context, List<VirtualKafkaCluster> clusters) {
154+
private static List<VirtualCluster> buildVirtualClusters(KafkaProxy primary, Context<KafkaProxy> context, List<VirtualKafkaCluster> clusters,
155+
Map<ResourceID, KafkaClusterRef> clusterRefs) {
150156
AtomicInteger clusterNum = new AtomicInteger(0);
151157
return clusters.stream()
152158
.filter(cluster -> !SharedKafkaProxyContext.isBroken(context, cluster))
153-
.map(cluster -> getVirtualCluster(primary, cluster, clusterNum.getAndIncrement()))
159+
.map(cluster -> getVirtualCluster(primary, cluster, clusterNum.getAndIncrement(), clusterRefs))
154160
.toList();
155161
}
156162

@@ -259,8 +265,16 @@ private static GenericKubernetesResource filterResourceFromRef(VirtualKafkaClust
259265

260266
private static VirtualCluster getVirtualCluster(KafkaProxy primary,
261267
VirtualKafkaCluster cluster,
262-
int clusterNum) {
263-
String bootstrap = cluster.getSpec().getTargetCluster().getBootstrapping().getBootstrapAddress();
268+
int clusterNum, Map<ResourceID, KafkaClusterRef> clusterRefs) {
269+
270+
var kafkaClusterRef = clusterTargetClusterResourceID(cluster).map(clusterRefs::get);
271+
272+
if (kafkaClusterRef.isEmpty()) {
273+
// I should be a condition
274+
throw new IllegalStateException("boom!");
275+
}
276+
277+
String bootstrap = kafkaClusterRef.get().getSpec().getBootstrapServers();
264278
return new VirtualCluster(
265279
cluster.getMetadata().getName(), new TargetCluster(bootstrap, Optional.empty()),
266280
null,
@@ -273,4 +287,12 @@ private static VirtualCluster getVirtualCluster(KafkaProxy primary,
273287
false, false,
274288
filterNamesForCluster(cluster));
275289
}
290+
291+
private static Optional<ResourceID> clusterTargetClusterResourceID(VirtualKafkaCluster cluster) {
292+
var namespace = cluster.getMetadata().getNamespace();
293+
return Optional.ofNullable(cluster.getSpec())
294+
.map(VirtualKafkaClusterSpec::getTargetCluster)
295+
.map(io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.TargetCluster::getClusterRef)
296+
.map(r -> new ResourceID(r.getName(), namespace));
297+
}
276298
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyReconciler.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,16 @@
4141
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
4242
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
4343

44+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaClusterRef;
4445
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
4546
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyBuilder;
4647
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
48+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
4749
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Conditions;
4850
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.ConditionsBuilder;
51+
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.ProxyRef;
52+
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.TargetCluster;
53+
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.targetcluster.ClusterRef;
4954
import io.kroxylicious.kubernetes.operator.config.FilterApiDecl;
5055
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
5156
import io.kroxylicious.proxy.tag.VisibleForTesting;
@@ -299,6 +304,7 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<KafkaProx
299304
}
300305
}
301306
eventSources.add(buildVirtualKafkaClusterInformer(context));
307+
eventSources.add(buildKafkaClusterRefInformer(context));
302308
return EventSourceInitializer.nameEventSources(eventSources.toArray(new EventSource[0]));
303309
}
304310

@@ -310,6 +316,56 @@ private static InformerEventSource<?, KafkaProxy> buildVirtualKafkaClusterInform
310316
return new InformerEventSource<>(configuration, context);
311317
}
312318

319+
private static InformerEventSource<?, KafkaProxy> buildKafkaClusterRefInformer(EventSourceContext<KafkaProxy> context) {
320+
InformerConfiguration<KafkaClusterRef> configuration = InformerConfiguration.from(KafkaClusterRef.class)
321+
.withSecondaryToPrimaryMapper(kafkaClusterRefToProxyMapper(context))
322+
.withPrimaryToSecondaryMapper(proxyToKafkaClusterRefMapper(context))
323+
.build();
324+
return new InformerEventSource<>(configuration, context);
325+
}
326+
327+
@VisibleForTesting
328+
static @NonNull SecondaryToPrimaryMapper<KafkaClusterRef> kafkaClusterRefToProxyMapper(EventSourceContext<KafkaProxy> context) {
329+
return kafkaClusterRef -> {
330+
// find all virtual clusters that reference this kafkaClusterRef
331+
332+
var proxyNames = resourcesInSameNamespace(context, kafkaClusterRef, VirtualKafkaCluster.class)
333+
.filter(vkc -> vkc.getSpec().getTargetCluster().getClusterRef().getName().equals(kafkaClusterRef.getMetadata().getName()))
334+
.map(VirtualKafkaCluster::getSpec)
335+
.map(VirtualKafkaClusterSpec::getProxyRef)
336+
.map(ProxyRef::getName)
337+
.collect(Collectors.toSet());
338+
339+
Set<ResourceID> proxyIds = filteredResourceIdsInSameNamespace(context, kafkaClusterRef, KafkaProxy.class,
340+
proxy -> proxyNames.contains(proxy.getMetadata().getName()));
341+
LOGGER.debug("Event source KafkaClusterRef SecondaryToPrimaryMapper got {}", proxyIds);
342+
return proxyIds;
343+
};
344+
}
345+
346+
/**
347+
* @param context context
348+
* @return mapper
349+
*/
350+
@VisibleForTesting
351+
static @NonNull PrimaryToSecondaryMapper<HasMetadata> proxyToKafkaClusterRefMapper(EventSourceContext<KafkaProxy> context) {
352+
return primary -> {
353+
// Load all the virtual clusters for the KafkaProxy, then extract all the referenced KafkaClusterRef resource ids.
354+
var clusterRefNames = resourcesInSameNamespace(context, primary, VirtualKafkaCluster.class)
355+
.filter(vkc -> vkc.getSpec().getProxyRef().getName().equals(primary.getMetadata().getName()))
356+
.map(VirtualKafkaCluster::getSpec)
357+
.map(VirtualKafkaClusterSpec::getTargetCluster)
358+
.map(TargetCluster::getClusterRef)
359+
.map(ClusterRef::getName)
360+
.collect(Collectors.toSet());
361+
362+
Set<ResourceID> kafkaClusterRefs = filteredResourceIdsInSameNamespace(context, primary, KafkaClusterRef.class,
363+
cluster -> clusterRefNames.contains(cluster.getMetadata().getName()));
364+
LOGGER.debug("Event source KafkaClusterRef PrimaryToSecondaryMapper got {}", kafkaClusterRefs);
365+
return kafkaClusterRefs;
366+
};
367+
}
368+
313369
@NonNull
314370
private static InformerEventSource<GenericKubernetesResource, KafkaProxy> eventSourceForFilter(
315371
EventSourceContext<KafkaProxy> context,

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ResourcesUtil.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@
77
package io.kroxylicious.kubernetes.operator;
88

99
import java.util.Comparator;
10+
import java.util.Map;
11+
import java.util.function.Function;
12+
import java.util.stream.Collectors;
1013
import java.util.stream.Stream;
1114

1215
import io.fabric8.kubernetes.api.model.HasMetadata;
1316
import io.fabric8.kubernetes.api.model.OwnerReference;
1417
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
1518
import io.javaoperatorsdk.operator.api.reconciler.Context;
19+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1620

21+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaClusterRef;
1722
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
1823
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
1924

@@ -76,7 +81,14 @@ static <O extends HasMetadata> OwnerReference ownerReferenceTo(O owner) {
7681

7782
static Stream<VirtualKafkaCluster> clustersInNameOrder(Context<KafkaProxy> context) {
7883
return context.getSecondaryResources(VirtualKafkaCluster.class)
79-
.stream().sorted(Comparator.comparing(virtualKafkaCluster -> virtualKafkaCluster.getMetadata().getName()));
84+
.stream()
85+
.sorted(Comparator.comparing(virtualKafkaCluster -> virtualKafkaCluster.getMetadata().getName()));
86+
}
87+
88+
static Map<ResourceID, KafkaClusterRef> clusterRefs(Context<KafkaProxy> context) {
89+
return context.getSecondaryResources(KafkaClusterRef.class)
90+
.stream()
91+
.collect(Collectors.toMap(ResourceID::fromResource, Function.identity()));
8092
}
8193

8294
}

0 commit comments

Comments
 (0)