77package io .kroxylicious .kubernetes .operator ;
88
99import java .time .Clock ;
10+ import java .util .Collection ;
1011import java .util .List ;
12+ import java .util .Optional ;
13+ import java .util .TreeSet ;
14+ import java .util .function .Function ;
15+ import java .util .stream .Collectors ;
16+ import java .util .stream .Stream ;
1117
1218import org .slf4j .Logger ;
1319import org .slf4j .LoggerFactory ;
1420
21+ import io .fabric8 .kubernetes .client .CustomResource ;
1522import io .javaoperatorsdk .operator .api .config .informer .InformerEventSourceConfiguration ;
1623import io .javaoperatorsdk .operator .api .reconciler .Context ;
1724import io .javaoperatorsdk .operator .api .reconciler .ErrorStatusUpdateControl ;
2229import io .javaoperatorsdk .operator .processing .event .source .informer .InformerEventSource ;
2330
2431import io .kroxylicious .kubernetes .api .common .Condition ;
32+ import io .kroxylicious .kubernetes .api .common .LocalRef ;
2533import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxy ;
34+ import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxyIngress ;
35+ import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxyIngressStatus ;
36+ import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaService ;
37+ import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaServiceStatus ;
2638import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaCluster ;
39+ import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaClusterSpec ;
40+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilter ;
41+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilterStatus ;
42+
43+ import edu .umd .cs .findbugs .annotations .NonNull ;
2744
2845import static io .kroxylicious .kubernetes .operator .ResourcesUtil .name ;
2946import static io .kroxylicious .kubernetes .operator .ResourcesUtil .namespace ;
3047
48+ /**
49+ * Reconciles a {@link VirtualKafkaCluster} by checking whether the resources
50+ * referenced by the {@code spec.proxyRef.name}, {@code spec.targetClusterRef.name},
51+ * {@code spec.ingressRefs[].name} and {@code spec.filterRefs[].name} actually exist,
52+ * setting a {@link Condition.Type#ResolvedRefs} {@link Condition} accordingly.
53+ */
3154public final class VirtualKafkaClusterReconciler implements
3255 Reconciler <VirtualKafkaCluster > {
3356
3457 private static final Logger LOGGER = LoggerFactory .getLogger (VirtualKafkaClusterReconciler .class );
3558 public static final String PROXY_EVENT_SOURCE_NAME = "proxy" ;
59+ public static final String SERVICES_EVENT_SOURCE_NAME = "services" ;
60+ public static final String INGRESSES_EVENT_SOURCE_NAME = "ingresses" ;
61+ public static final String FILTERS_EVENT_SOURCE_NAME = "filters" ;
62+ public static final String TRANSITIVELY_REFERENCED_RESOURCES_NOT_FOUND = "TransitivelyReferencedResourcesNotFound" ;
63+ public static final String REFERENCED_RESOURCES_NOT_FOUND = "ReferencedResourcesNotFound" ;
3664
3765 private final Clock clock ;
3866
@@ -42,18 +70,85 @@ public VirtualKafkaClusterReconciler(Clock clock) {
4270
4371 @ Override
4472 public UpdateControl <VirtualKafkaCluster > reconcile (VirtualKafkaCluster cluster , Context <VirtualKafkaCluster > context ) {
45- var proxyOpt = context .getSecondaryResource (KafkaProxy .class , PROXY_EVENT_SOURCE_NAME );
46- LOGGER .debug ("spec.proxyRef.name resolves to: {}" , proxyOpt );
73+ var existingProxies = context .getSecondaryResource (KafkaProxy .class , PROXY_EVENT_SOURCE_NAME ).stream ().collect (Collectors .toSet ());
74+ TreeSet <LocalRef <KafkaProxy >> missingProxies = Optional .ofNullable (cluster .getSpec ())
75+ .map (VirtualKafkaClusterSpec ::getProxyRef )
76+ .stream ()
77+ .collect (Collectors .toCollection (TreeSet ::new ));
78+ missingProxies .removeAll (existingProxies .stream ()
79+ .map (ResourcesUtil ::toLocalRef )
80+ .toList ());
81+
82+ var existingServices = context .getSecondaryResource (KafkaService .class , SERVICES_EVENT_SOURCE_NAME ).stream ().collect (Collectors .toSet ());
83+ TreeSet <LocalRef <KafkaService >> missingServices = Optional .ofNullable (cluster .getSpec ())
84+ .map (VirtualKafkaClusterSpec ::getTargetKafkaServiceRef )
85+ .stream ()
86+ .collect (Collectors .toCollection (TreeSet ::new ));
87+ missingServices .removeAll (existingServices .stream ()
88+ .map (ResourcesUtil ::toLocalRef )
89+ .toList ());
90+
91+ var existingIngresses = context .getSecondaryResources (KafkaProxyIngress .class );
92+ TreeSet <LocalRef <KafkaProxyIngress >> missingIngresses = Optional .ofNullable (cluster .getSpec ())
93+ .map (VirtualKafkaClusterSpec ::getIngressRefs )
94+ .stream ().flatMap (Collection ::stream )
95+ .collect (Collectors .toCollection (TreeSet ::new ));
96+ missingIngresses .removeAll (existingIngresses .stream ()
97+ .map (ResourcesUtil ::toLocalRef )
98+ .toList ());
99+
100+ var existingFilters = context .getSecondaryResources (KafkaProtocolFilter .class );
101+ TreeSet <LocalRef <KafkaProtocolFilter >> missingFilters = Optional .ofNullable (cluster .getSpec ())
102+ .map (VirtualKafkaClusterSpec ::getFilterRefs )
103+ .stream ().flatMap (Collection ::stream )
104+ .collect (Collectors .toCollection (TreeSet ::new ));
105+ missingFilters .removeAll (existingFilters .stream ()
106+ .map (ResourcesUtil ::toLocalRef )
107+ .toList ());
47108
48109 Condition condition ;
49- if (proxyOpt .isPresent ()) {
50- condition = ResourcesUtil .newResolvedRefsTrue (clock , cluster );
110+ if (missingProxies .isEmpty ()
111+ && missingServices .isEmpty ()
112+ && missingIngresses .isEmpty ()
113+ && missingFilters .isEmpty ()) {
114+ var unresolvedServices = existingServices .stream ()
115+ .filter (ks -> hasAnyResolvedRefsFalse (Optional .ofNullable (ks .getStatus ()).map (KafkaServiceStatus ::getConditions ).orElse (List .of ())))
116+ .map (ResourcesUtil ::toLocalRef )
117+ .collect (Collectors .toCollection (TreeSet ::new ));
118+ var unresolvedIngresses = existingIngresses .stream ()
119+ .filter (kafkaProxyIngress -> hasAnyResolvedRefsFalse (
120+ Optional .ofNullable (kafkaProxyIngress .getStatus ()).map (KafkaProxyIngressStatus ::getConditions ).orElse (List .of ())))
121+ .map (ResourcesUtil ::toLocalRef )
122+ .collect (Collectors .toCollection (TreeSet ::new ));
123+ var unresolvedFilters = existingFilters .stream ()
124+ .filter (kpf -> hasAnyResolvedRefsFalse (Optional .ofNullable (kpf .getStatus ()).map (KafkaProtocolFilterStatus ::getConditions ).orElse (List .of ())))
125+ .map (ResourcesUtil ::toLocalRef )
126+ .collect (Collectors .toCollection (TreeSet ::new ));
127+ if (unresolvedServices .isEmpty ()
128+ && unresolvedIngresses .isEmpty ()
129+ && unresolvedFilters .isEmpty ()) {
130+ condition = ResourcesUtil .newResolvedRefsTrue (clock , cluster );
131+ }
132+ else {
133+ Stream <String > serviceMsg = refsMessage ("spec.targetKafkaServiceRef references " , KafkaService .class , unresolvedServices );
134+ Stream <String > ingressMsg = refsMessage ("spec.ingressRefs references " , KafkaProxyIngress .class , unresolvedIngresses );
135+ Stream <String > filterMsg = refsMessage ("spec.filterRefs references " , KafkaProtocolFilter .class , unresolvedFilters );
136+ condition = ResourcesUtil .newResolvedRefsFalse (clock ,
137+ cluster ,
138+ TRANSITIVELY_REFERENCED_RESOURCES_NOT_FOUND ,
139+ joiningMessages (serviceMsg , ingressMsg , filterMsg ));
140+ }
51141 }
52142 else {
143+ Stream <String > proxyMsg = refsMessage ("spec.proxyRef references " , KafkaProxy .class , missingProxies );
144+ Stream <String > serviceMsg = refsMessage ("spec.targetKafkaServiceRef references " , KafkaService .class , missingServices );
145+ Stream <String > ingressMsg = refsMessage ("spec.ingressRefs references " , KafkaProxyIngress .class , missingIngresses );
146+ Stream <String > filterMsg = refsMessage ("spec.filterRefs references " , KafkaProtocolFilter .class , missingFilters );
147+
53148 condition = ResourcesUtil .newResolvedRefsFalse (clock ,
54149 cluster ,
55- "spec.proxyRef.name" ,
56- "KafkaProxy not found" );
150+ REFERENCED_RESOURCES_NOT_FOUND ,
151+ joiningMessages ( proxyMsg , serviceMsg , ingressMsg , filterMsg ) );
57152 }
58153
59154 UpdateControl <VirtualKafkaCluster > uc = UpdateControl .patchStatus (ResourcesUtil .patchWithCondition (cluster , condition ));
@@ -63,9 +158,33 @@ public UpdateControl<VirtualKafkaCluster> reconcile(VirtualKafkaCluster cluster,
63158 return uc ;
64159 }
65160
161+ @ NonNull
162+ private static String joiningMessages (
163+ Stream <String >... serviceMsg ) {
164+ return Stream .of (serviceMsg ).flatMap (Function .identity ()).collect (Collectors .joining ("; " ));
165+ }
166+
167+ private static boolean hasAnyResolvedRefsFalse (List <Condition > conditions ) {
168+ return conditions .stream ()
169+ .anyMatch (c -> Condition .Type .ResolvedRefs .equals (c .getType ())
170+ && Condition .Status .FALSE .equals (c .getStatus ()));
171+ }
172+
173+ @ NonNull
174+ private static <R extends CustomResource <?, ?>> Stream <String > refsMessage (
175+ String prefix ,
176+ Class <R > crdClass ,
177+ TreeSet <? extends LocalRef <R >> refs ) {
178+ return refs .isEmpty () ? Stream .of ()
179+ : Stream .of (
180+ prefix + refs .stream ()
181+ .map (ref -> ResourcesUtil .slug (crdClass , ref .getName ()))
182+ .collect (Collectors .joining (", " )));
183+ }
184+
66185 @ Override
67186 public List <EventSource <?, VirtualKafkaCluster >> prepareEventSources (EventSourceContext <VirtualKafkaCluster > context ) {
68- InformerEventSourceConfiguration <KafkaProxy > configuration = InformerEventSourceConfiguration .from (
187+ InformerEventSourceConfiguration <KafkaProxy > clusterToProxy = InformerEventSourceConfiguration .from (
69188 KafkaProxy .class ,
70189 VirtualKafkaCluster .class )
71190 .withName (PROXY_EVENT_SOURCE_NAME )
@@ -75,7 +194,48 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
75194 VirtualKafkaCluster .class ,
76195 cluster -> cluster .getSpec ().getProxyRef ()))
77196 .build ();
78- return List .of (new InformerEventSource <>(configuration , context ));
197+
198+ InformerEventSourceConfiguration <KafkaService > clusterToService = InformerEventSourceConfiguration .from (
199+ KafkaService .class ,
200+ VirtualKafkaCluster .class )
201+ .withName (SERVICES_EVENT_SOURCE_NAME )
202+ .withPrimaryToSecondaryMapper ((VirtualKafkaCluster cluster ) -> ResourcesUtil .localRefAsResourceId (cluster ,
203+ cluster .getSpec ().getTargetKafkaServiceRef ()))
204+ .withSecondaryToPrimaryMapper (service -> ResourcesUtil .findReferrers (context ,
205+ service ,
206+ VirtualKafkaCluster .class ,
207+ cluster -> cluster .getSpec ().getTargetKafkaServiceRef ()))
208+ .build ();
209+
210+ InformerEventSourceConfiguration <KafkaProxyIngress > clusterToIngresses = InformerEventSourceConfiguration .from (
211+ KafkaProxyIngress .class ,
212+ VirtualKafkaCluster .class )
213+ .withName (INGRESSES_EVENT_SOURCE_NAME )
214+ .withPrimaryToSecondaryMapper ((VirtualKafkaCluster cluster ) -> ResourcesUtil .localRefsAsResourceIds (cluster ,
215+ Optional .ofNullable (cluster .getSpec ()).map (VirtualKafkaClusterSpec ::getIngressRefs )))
216+ .withSecondaryToPrimaryMapper (ingress -> ResourcesUtil .findReferrersMulti (context ,
217+ ingress ,
218+ VirtualKafkaCluster .class ,
219+ cluster -> cluster .getSpec ().getIngressRefs ()))
220+ .build ();
221+
222+ InformerEventSourceConfiguration <KafkaProtocolFilter > clusterToFilters = InformerEventSourceConfiguration .from (
223+ KafkaProtocolFilter .class ,
224+ VirtualKafkaCluster .class )
225+ .withName (FILTERS_EVENT_SOURCE_NAME )
226+ .withPrimaryToSecondaryMapper ((VirtualKafkaCluster cluster ) -> ResourcesUtil .localRefsAsResourceIds (cluster ,
227+ Optional .ofNullable (cluster .getSpec ()).map (VirtualKafkaClusterSpec ::getFilterRefs )))
228+ .withSecondaryToPrimaryMapper (filter -> ResourcesUtil .findReferrersMulti (context ,
229+ filter ,
230+ VirtualKafkaCluster .class ,
231+ cluster -> cluster .getSpec ().getFilterRefs ()))
232+ .build ();
233+
234+ return List .of (
235+ new InformerEventSource <>(clusterToProxy , context ),
236+ new InformerEventSource <>(clusterToIngresses , context ),
237+ new InformerEventSource <>(clusterToService , context ),
238+ new InformerEventSource <>(clusterToFilters , context ));
79239 }
80240
81241 @ Override
0 commit comments