Skip to content

Commit d31aada

Browse files
authored
Merge pull request kroxylicious#2067 from robobario/proxy-reconciler-flip-out-on-other-referents-stale-status
KafkaProxyReconciler error on stale Referent status
2 parents a3a376b + c68ee2a commit d31aada

File tree

38 files changed

+638
-145
lines changed

38 files changed

+638
-145
lines changed

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyReconciler.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,16 +312,21 @@ private static InformerEventSource<KafkaService, KafkaProxy> buildKafkaServiceEv
312312

313313
@VisibleForTesting
314314
static SecondaryToPrimaryMapper<KafkaService> kafkaServiceRefToProxyMapper(EventSourceContext<KafkaProxy> context) {
315-
return kafkaServiceRef -> {
315+
return kafkaService -> {
316+
// we do not want to trigger reconciliation of any proxy if the ingress has not been reconciled
317+
if (!ResourcesUtil.isStatusFresh(kafkaService)) {
318+
LOGGER.debug("Ignoring event from KafkaService with stale status: {}", ResourcesUtil.toLocalRef(kafkaService));
319+
return Set.of();
320+
}
316321
// find all virtual clusters that reference this kafkaServiceRef
317322

318-
Set<? extends LocalRef<KafkaProxy>> proxyRefs = ResourcesUtil.resourcesInSameNamespace(context, kafkaServiceRef, VirtualKafkaCluster.class)
319-
.filter(vkc -> vkc.getSpec().getTargetKafkaServiceRef().equals(ResourcesUtil.toLocalRef(kafkaServiceRef)))
323+
Set<? extends LocalRef<KafkaProxy>> proxyRefs = ResourcesUtil.resourcesInSameNamespace(context, kafkaService, VirtualKafkaCluster.class)
324+
.filter(vkc -> vkc.getSpec().getTargetKafkaServiceRef().equals(ResourcesUtil.toLocalRef(kafkaService)))
320325
.map(VirtualKafkaCluster::getSpec)
321326
.map(VirtualKafkaClusterSpec::getProxyRef)
322327
.collect(Collectors.toSet());
323328

324-
Set<ResourceID> proxyIds = ResourcesUtil.filteredResourceIdsInSameNamespace(context, kafkaServiceRef, KafkaProxy.class,
329+
Set<ResourceID> proxyIds = ResourcesUtil.filteredResourceIdsInSameNamespace(context, kafkaService, KafkaProxy.class,
325330
proxy -> proxyRefs.contains(toLocalRef(proxy)));
326331
LOGGER.debug("Event source KafkaService SecondaryToPrimaryMapper got {}", proxyIds);
327332
return proxyIds;
@@ -404,6 +409,11 @@ static SecondaryToPrimaryMapper<VirtualKafkaCluster> clusterToProxyMapper(EventS
404409
@VisibleForTesting
405410
static SecondaryToPrimaryMapper<KafkaProxyIngress> ingressToProxyMapper(EventSourceContext<KafkaProxy> context) {
406411
return ingress -> {
412+
// we do not want to trigger reconciliation of any proxy if the ingress has not been reconciled
413+
if (!ResourcesUtil.isStatusFresh(ingress)) {
414+
LOGGER.debug("Ignoring event from ingress with stale status: {}", ResourcesUtil.toLocalRef(ingress));
415+
return Set.of();
416+
}
407417
// we need to reconcile all proxies when a kafka proxy ingress changes in case the proxyRef is updated, we need to update
408418
// the previously referenced proxy too.
409419
Set<ResourceID> proxyIds = ResourcesUtil.filteredResourceIdsInSameNamespace(context, ingress, KafkaProxy.class, proxy -> true);
@@ -425,6 +435,11 @@ static PrimaryToSecondaryMapper<KafkaProxy> proxyToIngressMapper(EventSourceCont
425435
@VisibleForTesting
426436
static SecondaryToPrimaryMapper<KafkaProtocolFilter> filterToProxy(EventSourceContext<KafkaProxy> context) {
427437
return (KafkaProtocolFilter filter) -> {
438+
// we do not want to trigger reconciliation of any proxy if the filter has not been reconciled
439+
if (!ResourcesUtil.isStatusFresh(filter)) {
440+
LOGGER.debug("Ignoring event from filter with stale status: {}", ResourcesUtil.toLocalRef(filter));
441+
return Set.of();
442+
}
428443
// filters don't point to a proxy, but must be in the same namespace as the proxy/proxies which reference the,
429444
// so when a filter changes we reconcile all the proxies in the same namespace
430445
Set<ResourceID> proxiesInFilterNamespace = ResourcesUtil.filteredResourceIdsInSameNamespace(context, filter, KafkaProxy.class, proxy -> true);

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ResourcesUtil.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,14 @@
2828

2929
import io.kroxylicious.kubernetes.api.common.AnyLocalRefBuilder;
3030
import io.kroxylicious.kubernetes.api.common.LocalRef;
31+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
32+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngressStatus;
33+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
34+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceStatus;
3135
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
3236
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterStatus;
37+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
38+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterStatus;
3339

3440
public class ResourcesUtil {
3541

@@ -280,6 +286,36 @@ public static boolean isStatusFresh(VirtualKafkaCluster cluster) {
280286
return isStatusFresh(cluster, c -> Optional.ofNullable(c.getStatus()).map(VirtualKafkaClusterStatus::getObservedGeneration).orElse(null));
281287
}
282288

289+
/**
290+
* Checks that the status observedGeneration is equal to the metadata generation. Indicating
291+
* that the current {@code spec} of the resource has been reconciled.
292+
* @param ingress ingress
293+
* @return true if status observedGeneration is equal to the metadata generation
294+
*/
295+
public static boolean isStatusFresh(KafkaProxyIngress ingress) {
296+
return isStatusFresh(ingress, i -> Optional.ofNullable(i.getStatus()).map(KafkaProxyIngressStatus::getObservedGeneration).orElse(null));
297+
}
298+
299+
/**
300+
* Checks that the status observedGeneration is equal to the metadata generation. Indicating
301+
* that the current {@code spec} of the resource has been reconciled.
302+
* @param service service
303+
* @return true if status observedGeneration is equal to the metadata generation
304+
*/
305+
public static boolean isStatusFresh(KafkaService service) {
306+
return isStatusFresh(service, i -> Optional.ofNullable(i.getStatus()).map(KafkaServiceStatus::getObservedGeneration).orElse(null));
307+
}
308+
309+
/**
310+
* Checks that the status observedGeneration is equal to the metadata generation. Indicating
311+
* that the current {@code spec} of the resource has been reconciled.
312+
* @param filter filter
313+
* @return true if status observedGeneration is equal to the metadata generation
314+
*/
315+
public static boolean isStatusFresh(KafkaProtocolFilter filter) {
316+
return isStatusFresh(filter, i -> Optional.ofNullable(i.getStatus()).map(KafkaProtocolFilterStatus::getObservedGeneration).orElse(null));
317+
}
318+
283319
private static <T extends HasMetadata> boolean isStatusFresh(T resource, Function<T, Long> observedGenerationFunc) {
284320
Long observedGeneration = observedGenerationFunc.apply(resource);
285321
Long generation = resource.getMetadata().getGeneration();

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolver.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,20 @@ private ClusterResolutionResult discoverProblemsAndBuildResolutionResult(Virtual
142142
Set<KafkaProxy> proxies) {
143143
Set<DanglingReference> danglingReferences = determineDanglingRefs(result, proxies, cluster);
144144
Set<LocalRef<?>> resolvedRefsFalse = determineResolvedRefsFalse(result);
145-
return new ClusterResolutionResult(cluster, danglingReferences, resolvedRefsFalse, Set.of());
145+
Set<LocalRef<?>> referentsWithStaleStatus = determineReferentsWithStaleStatus(result);
146+
return new ClusterResolutionResult(cluster, danglingReferences, resolvedRefsFalse, referentsWithStaleStatus);
147+
}
148+
149+
private static Set<LocalRef<?>> determineReferentsWithStaleStatus(CommonDependencies cluster) {
150+
Stream<LocalRef<?>> ingressesWithStaleStatus = cluster.ingresses().entrySet().stream().filter(e -> !ResourcesUtil.isStatusFresh(e.getValue()))
151+
.map(Map.Entry::getKey);
152+
Stream<LocalRef<?>> servicesWithStaleStatus = cluster.kafkaServices().entrySet().stream().filter(e -> !ResourcesUtil.isStatusFresh(e.getValue()))
153+
.map(Map.Entry::getKey);
154+
Stream<LocalRef<?>> filtersWithStaleStatus = cluster.filters().entrySet().stream().filter(e -> !ResourcesUtil.isStatusFresh(e.getValue()))
155+
.map(Map.Entry::getKey);
156+
return Stream.of(ingressesWithStaleStatus, servicesWithStaleStatus, filtersWithStaleStatus)
157+
.flatMap(Function.identity())
158+
.collect(Collectors.toSet());
146159
}
147160

148161
private static Set<LocalRef<?>> determineReferentsWithStaleStatus(VirtualKafkaCluster cluster) {

0 commit comments

Comments
 (0)