88import java .io .IOException ;
99import java .io .UncheckedIOException ;
1010import java .util .ArrayList ;
11+ import java .util .Collection ;
1112import java .util .Comparator ;
1213import java .util .HashSet ;
1314import java .util .LinkedHashSet ;
1415import java .util .List ;
1516import java .util .Map ;
1617import java .util .Optional ;
1718import java .util .Set ;
18- import java .util .function .Function ;
1919import java .util .stream .Collectors ;
2020
2121import com .fasterxml .jackson .databind .ObjectMapper ;
2222
2323import io .fabric8 .kubernetes .api .model .ConfigMap ;
2424import io .fabric8 .kubernetes .api .model .ConfigMapBuilder ;
25- import io .fabric8 .kubernetes .api .model .GenericKubernetesResource ;
2625import io .fabric8 .kubernetes .api .model .Volume ;
2726import io .fabric8 .kubernetes .api .model .VolumeMount ;
2827import io .javaoperatorsdk .operator .api .reconciler .Context ;
2928import io .javaoperatorsdk .operator .api .reconciler .dependent .managed .ManagedWorkflowAndDependentResourceContext ;
3029import io .javaoperatorsdk .operator .processing .dependent .kubernetes .CRUDKubernetesDependentResource ;
3130import io .javaoperatorsdk .operator .processing .dependent .kubernetes .KubernetesDependent ;
32- import io .javaoperatorsdk .operator .processing .event .ResourceID ;
3331
3432import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaClusterRef ;
3533import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxy ;
36- import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxyIngress ;
3734import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaCluster ;
38- import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaClusterSpec ;
3935import io .kroxylicious .kubernetes .api .v1alpha1 .virtualkafkaclusterspec .Filters ;
40- import io .kroxylicious .kubernetes .operator .ingress .IngressAllocator ;
41- import io .kroxylicious .kubernetes .operator .ingress .IngressConflictException ;
42- import io .kroxylicious .kubernetes .operator .ingress .ProxyIngressModel ;
36+ import io .kroxylicious .kubernetes .operator .model .ProxyModel ;
37+ import io .kroxylicious .kubernetes .operator .model .ProxyModelBuilder ;
38+ import io .kroxylicious .kubernetes .operator .model .ingress .ProxyIngressModel ;
39+ import io .kroxylicious .kubernetes .operator .resolver .ResolutionResult ;
4340import io .kroxylicious .proxy .config .ConfigParser ;
4441import io .kroxylicious .proxy .config .Configuration ;
4542import io .kroxylicious .proxy .config .NamedFilterDefinition ;
5249import edu .umd .cs .findbugs .annotations .NonNull ;
5350
5451import static io .kroxylicious .kubernetes .operator .Labels .standardLabels ;
55- import static io .kroxylicious .kubernetes .operator .ResourcesUtil .clusterRefs ;
5652import static io .kroxylicious .kubernetes .operator .ResourcesUtil .namespace ;
5753
5854/**
@@ -132,24 +128,19 @@ protected ConfigMap desired(KafkaProxy primary,
132128
133129 String generateProxyConfig (KafkaProxy primary ,
134130 Context <KafkaProxy > context ) {
131+ ProxyModelBuilder proxyModelBuilder = ProxyModelBuilder .contextBuilder (context );
132+ ProxyModel model = proxyModelBuilder .build (primary , context );
133+ List <NamedFilterDefinition > allFilterDefinitions = buildFilterDefinitions (context , model );
134+ Map <String , NamedFilterDefinition > namedDefinitions = allFilterDefinitions .stream ().collect (Collectors .toMap (NamedFilterDefinition ::name , f -> f ));
135135
136- List <VirtualKafkaCluster > virtualKafkaClusters = ResourcesUtil .clustersInNameOrder (context ).toList ();
137- Set <KafkaProxyIngress > ingresses = context .getSecondaryResources (KafkaProxyIngress .class );
138- ProxyIngressModel ingressModel = getProxyIngressModel (primary , context , virtualKafkaClusters , ingresses );
136+ var virtualClusters = buildVirtualClusters (namedDefinitions .keySet (), model );
139137
140- Map <ResourceID , KafkaClusterRef > clusterRefs = clusterRefs (context );
141-
142- // TODO fix this double invocation of buildFilterDefinitions which is a workaround for https://github.com/kroxylicious/kroxylicious/issues/1916
143- // first invocation rejects some virtual clusters with unresolved refs
144- buildFilterDefinitions (context , virtualKafkaClusters );
145-
146- var virtualClusters = buildVirtualClusters (context , virtualKafkaClusters , clusterRefs , ingressModel );
147-
148- // second invocation excludes filters that belong to clusters broken in `buildVirtualClusters`
149- List <NamedFilterDefinition > filterDefinitions = buildFilterDefinitions (context , virtualKafkaClusters );
138+ List <NamedFilterDefinition > referencedFilters = virtualClusters .stream ().flatMap (c -> Optional .ofNullable (c .filters ()).stream ().flatMap (Collection ::stream ))
139+ .distinct ()
140+ .map (namedDefinitions ::get ).toList ();
150141
151142 Configuration configuration = new Configuration (
152- new ManagementConfiguration (null , null , new EndpointsConfiguration (new PrometheusMetricsConfig ())), filterDefinitions ,
143+ new ManagementConfiguration (null , null , new EndpointsConfiguration (new PrometheusMetricsConfig ())), referencedFilters ,
153144 null , // no defaultFilters <= each of the virtualClusters specifies its own
154145 virtualClusters ,
155146 List .of (), false ,
@@ -159,60 +150,31 @@ String generateProxyConfig(KafkaProxy primary,
159150 return toYaml (configuration );
160151 }
161152
162- private static @ NonNull ProxyIngressModel getProxyIngressModel (KafkaProxy primary , Context <KafkaProxy > context , List <VirtualKafkaCluster > virtualKafkaClusters ,
163- Set <KafkaProxyIngress > ingresses ) {
164- ProxyIngressModel ingressModel = IngressAllocator .allocateProxyIngressModel (primary , virtualKafkaClusters , ingresses );
165- for (ProxyIngressModel .VirtualClusterIngressModel virtualClusterIngressModel : ingressModel .clusters ()) {
166- Set <IngressConflictException > exceptions = virtualClusterIngressModel .ingressExceptions ();
167- if (!exceptions .isEmpty ()) {
168- VirtualKafkaCluster cluster = virtualClusterIngressModel .cluster ();
169- SharedKafkaProxyContext .addClusterCondition (context , cluster , ClusterCondition .ingressConflict (ResourcesUtil .name (cluster ), exceptions ));
170- }
171- }
172- return ingressModel ;
173- }
174-
175153 @ NonNull
176- private static List <VirtualCluster > buildVirtualClusters (Context <KafkaProxy > context ,
177- List <VirtualKafkaCluster > clusters ,
178- Map <ResourceID , KafkaClusterRef > clusterRefs ,
179- ProxyIngressModel ingressModel ) {
180- Map <VirtualKafkaCluster , Optional <KafkaClusterRef >> clusterToRefMap = clusters .stream ()
181- .collect (Collectors .toMap (
182- Function .identity (),
183- cluster -> clusterTargetClusterResourceID (cluster ).map (clusterRefs ::get )));
184-
185- clusterToRefMap .entrySet ().stream ()
186- .filter (e -> e .getValue ().isEmpty ())
187- .forEach (e -> {
188- var cluster = e .getKey ();
189- SharedKafkaProxyContext .addClusterCondition (context , cluster , targetClusterResourceNotFound (cluster ).accepted ());
190- });
191-
192- return clusters .stream ()
193- .filter (cluster -> !SharedKafkaProxyContext .isBroken (context , cluster ))
194- .map (cluster -> getVirtualCluster (cluster , clusterToRefMap .get (cluster ).get (), ingressModel ))
154+ private static List <VirtualCluster > buildVirtualClusters (Set <String > successfullyBuiltFilterNames , ProxyModel model ) {
155+ return model .clustersWithValidIngresses ().stream ()
156+ .filter (cluster -> Optional .ofNullable (cluster .getSpec ().getFilters ()).stream ().flatMap (Collection ::stream ).allMatch (
157+ filters -> successfullyBuiltFilterNames .contains (filterDefinitionName (filters ))))
158+ .map (cluster -> getVirtualCluster (cluster , model .resolutionResult ().kafkaClusterRef (cluster ).orElseThrow (), model .ingressModel ()))
195159 .toList ();
196160 }
197161
198162 @ NonNull
199163
200- private List <NamedFilterDefinition > buildFilterDefinitions (Context <KafkaProxy > context , List <VirtualKafkaCluster > clusters ) {
164+ private List <NamedFilterDefinition > buildFilterDefinitions (Context <KafkaProxy > context ,
165+ ProxyModel model ) {
201166 List <NamedFilterDefinition > filterDefinitions = new ArrayList <>();
202167 Set <NamedFilterDefinition > uniqueValues = new HashSet <>();
203- for (VirtualKafkaCluster cluster1 : clusters ) {
204- if (SharedKafkaProxyContext .isBroken (context , cluster1 )) {
205- continue ;
206- }
168+ for (VirtualKafkaCluster cluster : model .clustersWithValidIngresses ()) {
207169 try {
208- for (NamedFilterDefinition namedFilterDefinition : filterDefinitions (context , cluster1 )) {
170+ for (NamedFilterDefinition namedFilterDefinition : filterDefinitions (context , cluster , model . resolutionResult () )) {
209171 if (uniqueValues .add (namedFilterDefinition )) {
210172 filterDefinitions .add (namedFilterDefinition );
211173 }
212174 }
213175 }
214176 catch (InvalidClusterException e ) {
215- SharedKafkaProxyContext .addClusterCondition (context , cluster1 , e .accepted ());
177+ SharedKafkaProxyContext .addClusterCondition (context , cluster , e .accepted ());
216178 }
217179 }
218180 filterDefinitions .sort (Comparator .comparing (NamedFilterDefinition ::name ));
@@ -233,14 +195,14 @@ private static String filterDefinitionName(Filters filterCrRef) {
233195 }
234196
235197 @ NonNull
236- private List <NamedFilterDefinition > filterDefinitions (Context <KafkaProxy > context , VirtualKafkaCluster cluster )
198+ private List <NamedFilterDefinition > filterDefinitions (Context <KafkaProxy > context , VirtualKafkaCluster cluster , ResolutionResult resolutionResult )
237199 throws InvalidClusterException {
238200
239201 return Optional .ofNullable (cluster .getSpec ().getFilters ()).orElse (List .of ()).stream ().map (filterCrRef -> {
240202
241203 String filterDefinitionName = filterDefinitionName (filterCrRef );
242204
243- var filterCr = filterResourceFromRef ( cluster , context , filterCrRef );
205+ var filterCr = resolutionResult . filter ( filterCrRef ). orElseThrow ( );
244206 if (filterCr .getAdditionalProperties ().get ("spec" ) instanceof Map <?, ?> spec ) {
245207 String type = (String ) spec .get ("type" );
246208 SecureConfigInterpolator .InterpolationResult interpolationResult = interpolateConfig (spec );
@@ -266,7 +228,7 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
266228 }
267229 }
268230
269- private @ NonNull SecureConfigInterpolator .InterpolationResult interpolateConfig (Map <?, ?> spec ) {
231+ private SecureConfigInterpolator .InterpolationResult interpolateConfig (Map <?, ?> spec ) {
270232 SecureConfigInterpolator .InterpolationResult result ;
271233 Object configTemplate = spec .get ("configTemplate" );
272234 if (configTemplate != null ) {
@@ -278,34 +240,6 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
278240 return result ;
279241 }
280242
281- @ NonNull
282- private static InvalidClusterException filterResourceNotFound (VirtualKafkaCluster cluster , Filters filterRef ) {
283- return new InvalidClusterException (ClusterCondition .filterNotFound (ResourcesUtil .name (cluster ), filterRef .getName ()));
284- }
285-
286- @ NonNull
287- private static InvalidClusterException targetClusterResourceNotFound (VirtualKafkaCluster cluster ) {
288- return new InvalidClusterException (ClusterCondition .targetClusterRefNotFound (ResourcesUtil .name (cluster ), cluster .getSpec ().getTargetCluster ()));
289- }
290-
291- /**
292- * Look up a Filter CR from the group, kind and name given in the cluster.
293- */
294- @ NonNull
295- private static GenericKubernetesResource filterResourceFromRef (VirtualKafkaCluster cluster , Context <KafkaProxy > context , Filters filterRef )
296- throws InvalidClusterException {
297- return context .getSecondaryResources (GenericKubernetesResource .class ).stream ()
298- .filter (filterResource -> {
299- String apiVersion = filterResource .getApiVersion ();
300- var filterResourceGroup = apiVersion .substring (0 , apiVersion .indexOf ("/" ));
301- return filterResourceGroup .equals (filterRef .getGroup ())
302- && filterResource .getKind ().equals (filterRef .getKind ())
303- && ResourcesUtil .name (filterResource ).equals (filterRef .getName ());
304- })
305- .findFirst ()
306- .orElseThrow (() -> filterResourceNotFound (cluster , filterRef ));
307- }
308-
309243 private static VirtualCluster getVirtualCluster (VirtualKafkaCluster cluster ,
310244 KafkaClusterRef kafkaClusterRef ,
311245 ProxyIngressModel ingressModel ) {
@@ -321,10 +255,4 @@ private static VirtualCluster getVirtualCluster(VirtualKafkaCluster cluster,
321255 filterNamesForCluster (cluster ));
322256 }
323257
324- private static Optional <ResourceID > clusterTargetClusterResourceID (VirtualKafkaCluster cluster ) {
325- return Optional .ofNullable (cluster .getSpec ())
326- .map (VirtualKafkaClusterSpec ::getTargetCluster )
327- .map (io .kroxylicious .kubernetes .api .v1alpha1 .virtualkafkaclusterspec .TargetCluster ::getClusterRef )
328- .map (r -> new ResourceID (r .getName (), namespace (cluster )));
329- }
330258}
0 commit comments