Skip to content

Commit 30b4193

Browse files
authored
Merge pull request kroxylicious#1904 from robobario/hasmetadata-util
Operator Refactor: add utils for accessing common metadata
2 parents 49b670c + 4c6c77a commit 30b4193

File tree

11 files changed

+171
-48
lines changed

11 files changed

+171
-48
lines changed

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ClusterService.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
2727

2828
import static io.kroxylicious.kubernetes.operator.Labels.standardLabels;
29+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
30+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
2931

3032
/**
3133
* Generates the Kube {@code Service} for a single virtual cluster.
@@ -46,14 +48,14 @@ public ClusterService() {
4648
*/
4749
static String serviceName(VirtualKafkaCluster cluster) {
4850
Objects.requireNonNull(cluster);
49-
return cluster.getMetadata().getName();
51+
return name(cluster);
5052
}
5153

5254
/**
5355
* @return the fully qualified service hostname
5456
*/
5557
static String absoluteServiceHost(KafkaProxy primary, VirtualKafkaCluster cluster) {
56-
return serviceName(cluster) + "." + primary.getMetadata().getNamespace() + ".svc.cluster.local";
58+
return serviceName(cluster) + "." + namespace(primary) + ".svc.cluster.local";
5759
}
5860

5961
/**
@@ -62,13 +64,13 @@ static String absoluteServiceHost(KafkaProxy primary, VirtualKafkaCluster cluste
6264
* @return The name of the cluster corresponding to the given Service
6365
*/
6466
static String clusterName(Service service) {
65-
return service.getMetadata().getName();
67+
return name(service);
6668
}
6769

6870
static Map<Integer, String> clusterPorts(Context<KafkaProxy> context, VirtualKafkaCluster cluster) {
6971
List<VirtualKafkaCluster> clusters = ResourcesUtil.clustersInNameOrder(context).toList();
7072
for (int clusterNum = 0; clusterNum < clusters.size(); clusterNum++) {
71-
if (clusters.get(clusterNum).getMetadata().getName().equals(cluster.getMetadata().getName())) {
73+
if (name(clusters.get(clusterNum)).equals(name(cluster))) {
7274
if (SharedKafkaProxyContext.isBroken(context, cluster)) {
7375
return Map.of();
7476
}
@@ -77,14 +79,14 @@ static Map<Integer, String> clusterPorts(Context<KafkaProxy> context, VirtualKaf
7779
return IntStream.range(startPort, startPort + numBrokerPorts).boxed()
7880
.collect(Collectors.<Integer, Integer, String, TreeMap<Integer, String>> toMap(
7981
portNum -> portNum,
80-
portNum -> cluster.getMetadata().getName() + "-" + portNum,
82+
portNum -> name(cluster) + "-" + portNum,
8183
(v1, v2) -> {
8284
throw new IllegalStateException();
8385
},
8486
TreeMap::new));
8587
}
8688
}
87-
throw new IllegalArgumentException("Couldn't find cluster with name " + cluster.getMetadata().getName());
89+
throw new IllegalArgumentException("Couldn't find cluster with name " + name(cluster));
8890
}
8991

9092
protected Service clusterService(KafkaProxy primary,
@@ -94,7 +96,7 @@ protected Service clusterService(KafkaProxy primary,
9496
var serviceSpecBuilder = new ServiceBuilder()
9597
.withNewMetadata()
9698
.withName(serviceName(cluster))
97-
.withNamespace(primary.getMetadata().getNamespace())
99+
.withNamespace(namespace(primary))
98100
.addToLabels(standardLabels(primary))
99101
.addNewOwnerReferenceLike(ResourcesUtil.ownerReferenceTo(primary)).endOwnerReference()
100102
.endMetadata()
@@ -123,7 +125,7 @@ public Map<String, Service> desiredResources(
123125
return ResourcesUtil.clustersInNameOrder(context)
124126
.filter(cluster -> !SharedKafkaProxyContext.isBroken(context, cluster))
125127
.collect(Collectors.toMap(
126-
cluster -> cluster.getMetadata().getName(),
128+
ResourcesUtil::name,
127129
cluster -> clusterService(primary, context, cluster)));
128130
}
129131

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/Labels.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static Map<String, String> standardLabels(KafkaProxy proxy) {
1919
labels.put("app.kubernetes.io/part-of", "kafka");
2020
labels.put("app.kubernetes.io/managed-by", "kroxylicious-operator");
2121
labels.put("app.kubernetes.io/name", "kroxylicious-proxy");
22-
labels.put("app.kubernetes.io/instance", proxy.getMetadata().getName());
22+
labels.put("app.kubernetes.io/instance", ResourcesUtil.name(proxy));
2323
labels.put("app.kubernetes.io/component", "proxy");
2424
return labels;
2525
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigSecret.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import edu.umd.cs.findbugs.annotations.NonNull;
5151

5252
import static io.kroxylicious.kubernetes.operator.Labels.standardLabels;
53+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
54+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
5355

5456
/**
5557
* Generates a Kube {@code Secret} containing the proxy config YAML.
@@ -93,7 +95,7 @@ public ProxyConfigSecret() {
9395
* @return The {@code metadata.name} of the desired Secret {@code Secret}.
9496
*/
9597
static String secretName(KafkaProxy primary) {
96-
return primary.getMetadata().getName();
98+
return name(primary);
9799
}
98100

99101
public static List<Volume> secureVolumes(ManagedDependentResourceContext managedDependentResourceContext) {
@@ -119,7 +121,7 @@ protected Secret desired(KafkaProxy primary,
119121
return new SecretBuilder()
120122
.editOrNewMetadata()
121123
.withName(secretName(primary))
122-
.withNamespace(primary.getMetadata().getNamespace())
124+
.withNamespace(namespace(primary))
123125
.addToLabels(standardLabels(primary))
124126
.addNewOwnerReferenceLike(ResourcesUtil.ownerReferenceTo(primary)).endOwnerReference()
125127
.endMetadata()
@@ -212,7 +214,7 @@ private List<NamedFilterDefinition> filterDefinitions(Context<KafkaProxy> contex
212214
return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config());
213215
}
214216
else {
215-
throw new InvalidClusterException(ClusterCondition.filterInvalid(cluster.getMetadata().getName(), filterDefinitionName, "`spec` was not an `object`."));
217+
throw new InvalidClusterException(ClusterCondition.filterInvalid(name(cluster), filterDefinitionName, "`spec` was not an `object`."));
216218
}
217219

218220
}).toList();
@@ -242,7 +244,7 @@ private static <T> void putOrMerged(ManagedDependentResourceContext ctx, String
242244

243245
@NonNull
244246
private static InvalidClusterException resourceNotFound(VirtualKafkaCluster cluster, Filters filterRef) {
245-
return new InvalidClusterException(ClusterCondition.filterNotExists(cluster.getMetadata().getName(), filterRef.getName()));
247+
return new InvalidClusterException(ClusterCondition.filterNotExists(name(cluster), filterRef.getName()));
246248
}
247249

248250
/**
@@ -257,7 +259,7 @@ private static GenericKubernetesResource filterResourceFromRef(VirtualKafkaClust
257259
var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/"));
258260
return filterResourceGroup.equals(filterRef.getGroup())
259261
&& filterResource.getKind().equals(filterRef.getKind())
260-
&& filterResource.getMetadata().getName().equals(filterRef.getName());
262+
&& name(filterResource).equals(filterRef.getName());
261263
})
262264
.findFirst()
263265
.orElseThrow(() -> resourceNotFound(cluster, filterRef));
@@ -276,7 +278,7 @@ private static VirtualCluster getVirtualCluster(KafkaProxy primary,
276278

277279
String bootstrap = kafkaClusterRef.get().getSpec().getBootstrapServers();
278280
return new VirtualCluster(
279-
cluster.getMetadata().getName(), new TargetCluster(bootstrap, Optional.empty()),
281+
name(cluster), new TargetCluster(bootstrap, Optional.empty()),
280282
null,
281283
Optional.empty(),
282284
List.of(new VirtualClusterGateway("default",
@@ -289,10 +291,9 @@ private static VirtualCluster getVirtualCluster(KafkaProxy primary,
289291
}
290292

291293
private static Optional<ResourceID> clusterTargetClusterResourceID(VirtualKafkaCluster cluster) {
292-
var namespace = cluster.getMetadata().getNamespace();
293294
return Optional.ofNullable(cluster.getSpec())
294295
.map(VirtualKafkaClusterSpec::getTargetCluster)
295296
.map(io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.TargetCluster::getClusterRef)
296-
.map(r -> new ResourceID(r.getName(), namespace));
297+
.map(r -> new ResourceID(r.getName(), namespace(cluster)));
297298
}
298299
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyDeployment.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.kroxylicious.proxy.tag.VisibleForTesting;
3232

3333
import static io.kroxylicious.kubernetes.operator.Labels.standardLabels;
34+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
35+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
3436

3537
/**
3638
* Generates the Kube {@code Deployment} for the proxy
@@ -56,7 +58,7 @@ public ProxyDeployment() {
5658
* @return The {@code metadata.name} of the desired {@code Deployment}.
5759
*/
5860
static String deploymentName(KafkaProxy primary) {
59-
return primary.getMetadata().getName();
61+
return name(primary);
6062
}
6163

6264
@Override
@@ -66,7 +68,7 @@ protected Deployment desired(KafkaProxy primary,
6668
return new DeploymentBuilder()
6769
.editOrNewMetadata()
6870
.withName(deploymentName(primary))
69-
.withNamespace(primary.getMetadata().getNamespace())
71+
.withNamespace(namespace(primary))
7072
.addNewOwnerReferenceLike(ResourcesUtil.ownerReferenceTo(primary)).endOwnerReference()
7173
.addToLabels(APP_KROXY)
7274
.addToLabels(standardLabels(primary))

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyReconciler.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858
import edu.umd.cs.findbugs.annotations.NonNull;
5959
import edu.umd.cs.findbugs.annotations.Nullable;
6060

61+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.generation;
62+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.name;
63+
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.namespace;
64+
6165
// @formatter:off
6266
@ControllerConfiguration(dependents = {
6367
@Dependent(
@@ -142,7 +146,7 @@ private static KafkaProxy buildStatus(KafkaProxy primary,
142146
// @formatter:off
143147
return new KafkaProxyBuilder(primary)
144148
.editOrNewStatus()
145-
.withObservedGeneration(primary.getMetadata().getGeneration())
149+
.withObservedGeneration(generation(primary))
146150
.withConditions(effectiveReadyCondition(now, primary, exception ))
147151
.withClusters(clusterConditions(now, primary, context ))
148152
.endStatus()
@@ -157,7 +161,7 @@ private static List<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Clu
157161
ClusterCondition clusterCondition = SharedKafkaProxyContext.clusterCondition(context, cluster);
158162
var conditions = newClusterCondition(now, primary, clusterCondition);
159163
return new io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.ClustersBuilder()
160-
.withName(cluster.getMetadata().getName())
164+
.withName(name(cluster))
161165
.withConditions(conditions).build();
162166
}).toList();
163167
}
@@ -186,16 +190,16 @@ private static Conditions effectiveReadyCondition(ZonedDateTime now,
186190
return newCondition(now, ConditionType.Ready, primary, exception);
187191
}
188192
else {
189-
oldReady.setObservedGeneration(primary.getMetadata().getGeneration());
193+
oldReady.setObservedGeneration(generation(primary));
190194
return oldReady;
191195
}
192196
}
193197

194198
static LoggingEventBuilder addResourceKeys(KafkaProxy primary, LoggingEventBuilder loggingEventBuilder) {
195199
return loggingEventBuilder.addKeyValue("kind", primary.getKind())
196200
.addKeyValue("group", primary.getGroup())
197-
.addKeyValue("namespace", primary.getMetadata().getNamespace())
198-
.addKeyValue("name", primary.getMetadata().getName());
201+
.addKeyValue("namespace", namespace(primary))
202+
.addKeyValue("name", name(primary));
199203
}
200204

201205
private static void logException(KafkaProxy primary, Exception exception) {
@@ -230,7 +234,7 @@ private static Conditions newCondition(
230234
return new ConditionsBuilder()
231235
.withLastTransitionTime(now)
232236
.withMessage(conditionMessage(exception))
233-
.withObservedGeneration(primary.getMetadata().getGeneration())
237+
.withObservedGeneration(generation(primary))
234238
.withReason(conditionReason(exception))
235239
.withStatus(exception == null ? Conditions.Status.TRUE : Conditions.Status.FALSE)
236240
.withType(conditionType.getValue())
@@ -243,7 +247,7 @@ private static io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters
243247
return new io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.clusters.ConditionsBuilder()
244248
.withLastTransitionTime(now)
245249
.withMessage(clusterCondition.message())
246-
.withObservedGeneration(primary.getMetadata().getGeneration())
250+
.withObservedGeneration(generation(primary))
247251
.withReason(clusterCondition.reason())
248252
.withStatus(clusterCondition.status())
249253
.withType(clusterCondition.type().getValue())
@@ -330,14 +334,14 @@ private static InformerEventSource<?, KafkaProxy> buildKafkaClusterRefInformer(E
330334
// find all virtual clusters that reference this kafkaClusterRef
331335

332336
var proxyNames = resourcesInSameNamespace(context, kafkaClusterRef, VirtualKafkaCluster.class)
333-
.filter(vkc -> vkc.getSpec().getTargetCluster().getClusterRef().getName().equals(kafkaClusterRef.getMetadata().getName()))
337+
.filter(vkc -> vkc.getSpec().getTargetCluster().getClusterRef().getName().equals(name(kafkaClusterRef)))
334338
.map(VirtualKafkaCluster::getSpec)
335339
.map(VirtualKafkaClusterSpec::getProxyRef)
336340
.map(ProxyRef::getName)
337341
.collect(Collectors.toSet());
338342

339343
Set<ResourceID> proxyIds = filteredResourceIdsInSameNamespace(context, kafkaClusterRef, KafkaProxy.class,
340-
proxy -> proxyNames.contains(proxy.getMetadata().getName()));
344+
proxy -> proxyNames.contains(name(proxy)));
341345
LOGGER.debug("Event source KafkaClusterRef SecondaryToPrimaryMapper got {}", proxyIds);
342346
return proxyIds;
343347
};
@@ -352,15 +356,15 @@ private static InformerEventSource<?, KafkaProxy> buildKafkaClusterRefInformer(E
352356
return primary -> {
353357
// Load all the virtual clusters for the KafkaProxy, then extract all the referenced KafkaClusterRef resource ids.
354358
var clusterRefNames = resourcesInSameNamespace(context, primary, VirtualKafkaCluster.class)
355-
.filter(vkc -> vkc.getSpec().getProxyRef().getName().equals(primary.getMetadata().getName()))
359+
.filter(vkc -> vkc.getSpec().getProxyRef().getName().equals(name(primary)))
356360
.map(VirtualKafkaCluster::getSpec)
357361
.map(VirtualKafkaClusterSpec::getTargetCluster)
358362
.map(TargetCluster::getClusterRef)
359363
.map(ClusterRef::getName)
360364
.collect(Collectors.toSet());
361365

362366
Set<ResourceID> kafkaClusterRefs = filteredResourceIdsInSameNamespace(context, primary, KafkaClusterRef.class,
363-
cluster -> clusterRefNames.contains(cluster.getMetadata().getName()));
367+
cluster -> clusterRefNames.contains(name(cluster)));
364368
LOGGER.debug("Event source KafkaClusterRef PrimaryToSecondaryMapper got {}", kafkaClusterRefs);
365369
return kafkaClusterRefs;
366370
};
@@ -385,7 +389,7 @@ private static InformerEventSource<GenericKubernetesResource, KafkaProxy> eventS
385389
Set<ResourceID> filterReferences = resourcesInSameNamespace(context, proxy, VirtualKafkaCluster.class)
386390
.filter(matchesPrimary(proxy))
387391
.flatMap(cluster -> cluster.getSpec().getFilters().stream())
388-
.map(filter -> new ResourceID(filter.getName(), proxy.getMetadata().getNamespace()))
392+
.map(filter -> new ResourceID(filter.getName(), namespace(proxy)))
389393
.collect(Collectors.toSet());
390394
LOGGER.debug("KafkaProxy {} has references to filters {}", ResourceID.fromResource(proxy), filterReferences);
391395
return filterReferences;
@@ -434,14 +438,14 @@ private static <T extends HasMetadata> Set<ResourceID> filteredResourceIdsInSame
434438
private static <T extends HasMetadata> Stream<T> resourcesInSameNamespace(EventSourceContext<KafkaProxy> context, HasMetadata primary, Class<T> clazz) {
435439
return context.getClient()
436440
.resources(clazz)
437-
.inNamespace(primary.getMetadata().getNamespace())
441+
.inNamespace(namespace(primary))
438442
.list()
439443
.getItems()
440444
.stream();
441445
}
442446

443447
private static @NonNull Predicate<VirtualKafkaCluster> matchesPrimary(HasMetadata primary) {
444-
return cluster -> cluster.getSpec().getProxyRef().getName().equals(primary.getMetadata().getName());
448+
return cluster -> cluster.getSpec().getProxyRef().getName().equals(name(primary));
445449
}
446450

447451
}

0 commit comments

Comments
 (0)