1919import org .slf4j .LoggerFactory ;
2020
2121import io .fabric8 .kubernetes .api .model .ConfigMap ;
22+ import io .fabric8 .kubernetes .api .model .HasMetadata ;
2223import io .fabric8 .kubernetes .api .model .OwnerReference ;
2324import io .fabric8 .kubernetes .api .model .Secret ;
2425import io .fabric8 .kubernetes .api .model .Service ;
@@ -306,10 +307,7 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
306307 .withName (SERVICES_EVENT_SOURCE_NAME )
307308 .withPrimaryToSecondaryMapper ((VirtualKafkaCluster cluster ) -> ResourcesUtil .localRefAsResourceId (cluster ,
308309 cluster .getSpec ().getTargetKafkaServiceRef ()))
309- .withSecondaryToPrimaryMapper (service -> ResourcesUtil .findReferrers (context ,
310- service ,
311- VirtualKafkaCluster .class ,
312- cluster -> Optional .of (cluster .getSpec ().getTargetKafkaServiceRef ())))
310+ .withSecondaryToPrimaryMapper (kafkaServiceSecondaryToPrimaryMapper (context ))
313311 .build ();
314312
315313 InformerEventSourceConfiguration <KafkaProxyIngress > clusterToIngresses = InformerEventSourceConfiguration .from (
@@ -321,10 +319,7 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
321319 .map (VirtualKafkaClusterSpec ::getIngresses )
322320 .stream ().flatMap (List ::stream )
323321 .map (Ingresses ::getIngressRef ).toList ()))
324- .withSecondaryToPrimaryMapper (ingress -> ResourcesUtil .findReferrersMulti (context ,
325- ingress ,
326- VirtualKafkaCluster .class ,
327- cluster -> cluster .getSpec ().getIngresses ().stream ().map (Ingresses ::getIngressRef ).toList ()))
322+ .withSecondaryToPrimaryMapper (ingressSecondaryToPrimaryMapper (context ))
328323 .build ();
329324
330325 InformerEventSourceConfiguration <KafkaProtocolFilter > clusterToFilters = InformerEventSourceConfiguration .from (
@@ -333,10 +328,7 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
333328 .withName (FILTERS_EVENT_SOURCE_NAME )
334329 .withPrimaryToSecondaryMapper ((VirtualKafkaCluster cluster ) -> ResourcesUtil .localRefsAsResourceIds (cluster ,
335330 Optional .ofNullable (cluster .getSpec ()).map (VirtualKafkaClusterSpec ::getFilterRefs ).orElse (List .of ())))
336- .withSecondaryToPrimaryMapper (filter -> ResourcesUtil .findReferrersMulti (context ,
337- filter ,
338- VirtualKafkaCluster .class ,
339- cluster -> cluster .getSpec ().getFilterRefs ()))
331+ .withSecondaryToPrimaryMapper (filterSecondaryToPrimaryMapper (context ))
340332 .build ();
341333
342334 InformerEventSourceConfiguration <Service > clusterToKubeService = InformerEventSourceConfiguration .from (
@@ -394,6 +386,54 @@ static SecondaryToPrimaryMapper<Secret> secretToVirtualKafkaCluster(EventSourceC
394386 .map (Tls ::getCertificateRef ).toList ());
395387 }
396388
389+ @ VisibleForTesting
390+ static SecondaryToPrimaryMapper <KafkaProtocolFilter > filterSecondaryToPrimaryMapper (EventSourceContext <VirtualKafkaCluster > context ) {
391+ return filter -> {
392+ if (!ResourcesUtil .isStatusFresh (filter )) {
393+ logIgnoredEvent (filter );
394+ return Set .of ();
395+ }
396+ return ResourcesUtil .findReferrersMulti (context ,
397+ filter ,
398+ VirtualKafkaCluster .class ,
399+ cluster -> cluster .getSpec ().getFilterRefs ());
400+ };
401+ }
402+
403+ @ VisibleForTesting
404+ static SecondaryToPrimaryMapper <KafkaProxyIngress > ingressSecondaryToPrimaryMapper (EventSourceContext <VirtualKafkaCluster > context ) {
405+ return ingress -> {
406+ if (!ResourcesUtil .isStatusFresh (ingress )) {
407+ logIgnoredEvent (ingress );
408+ return Set .of ();
409+ }
410+ return ResourcesUtil .findReferrersMulti (context ,
411+ ingress ,
412+ VirtualKafkaCluster .class ,
413+ cluster -> cluster .getSpec ().getIngresses ().stream ().map (Ingresses ::getIngressRef ).toList ());
414+ };
415+ }
416+
417+ @ VisibleForTesting
418+ static SecondaryToPrimaryMapper <KafkaService > kafkaServiceSecondaryToPrimaryMapper (EventSourceContext <VirtualKafkaCluster > context ) {
419+ return service -> {
420+ if (!ResourcesUtil .isStatusFresh (service )) {
421+ logIgnoredEvent (service );
422+ return Set .of ();
423+ }
424+ return ResourcesUtil .findReferrers (context ,
425+ service ,
426+ VirtualKafkaCluster .class ,
427+ cluster -> Optional .of (cluster .getSpec ().getTargetKafkaServiceRef ()));
428+ };
429+ }
430+
431+ private static void logIgnoredEvent (HasMetadata hasMetadata ) {
432+ if (LOGGER .isDebugEnabled ()) {
433+ LOGGER .debug ("Ignoring event from {} with stale status: {}" , HasMetadata .getKind (hasMetadata .getClass ()), ResourcesUtil .toLocalRef (hasMetadata ));
434+ }
435+ }
436+
397437 @ Override
398438 public ErrorStatusUpdateControl <VirtualKafkaCluster > updateErrorStatus (VirtualKafkaCluster cluster , Context <VirtualKafkaCluster > context , Exception e ) {
399439 // ResolvedRefs to UNKNOWN
0 commit comments