55 */
66package io .kroxylicious .kubernetes .operator ;
77
8+ import java .nio .file .Path ;
89import java .time .Clock ;
910import java .util .ArrayList ;
1011import java .util .Collection ;
1920import java .util .function .Function ;
2021import java .util .function .Predicate ;
2122import java .util .stream .Collectors ;
23+ import java .util .stream .Stream ;
2224
2325import org .slf4j .Logger ;
2426import org .slf4j .LoggerFactory ;
2527
2628import io .fabric8 .kubernetes .api .model .Volume ;
29+ import io .fabric8 .kubernetes .api .model .VolumeBuilder ;
2730import io .fabric8 .kubernetes .api .model .VolumeMount ;
31+ import io .fabric8 .kubernetes .api .model .VolumeMountBuilder ;
2832import io .javaoperatorsdk .operator .OperatorException ;
2933import io .javaoperatorsdk .operator .api .config .informer .InformerEventSourceConfiguration ;
3034import io .javaoperatorsdk .operator .api .reconciler .Context ;
4145import io .javaoperatorsdk .operator .processing .event .source .SecondaryToPrimaryMapper ;
4246import io .javaoperatorsdk .operator .processing .event .source .informer .InformerEventSource ;
4347
48+ import io .kroxylicious .kubernetes .api .common .AnyLocalRef ;
4449import io .kroxylicious .kubernetes .api .common .Condition ;
4550import io .kroxylicious .kubernetes .api .common .FilterRef ;
4651import io .kroxylicious .kubernetes .api .common .LocalRef ;
4752import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxy ;
4853import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaProxyIngress ;
4954import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaService ;
55+ import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaServiceSpec ;
5056import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaCluster ;
5157import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaClusterSpec ;
58+ import io .kroxylicious .kubernetes .api .v1alpha1 .kafkaservicespec .tls .TrustAnchorRef ;
5259import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilter ;
5360import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilterSpec ;
5461import io .kroxylicious .kubernetes .operator .model .ProxyModel ;
6370import io .kroxylicious .proxy .config .admin .EndpointsConfiguration ;
6471import io .kroxylicious .proxy .config .admin .ManagementConfiguration ;
6572import io .kroxylicious .proxy .config .admin .PrometheusMetricsConfig ;
73+ import io .kroxylicious .proxy .config .tls .AllowDeny ;
74+ import io .kroxylicious .proxy .config .tls .KeyPair ;
75+ import io .kroxylicious .proxy .config .tls .KeyProvider ;
76+ import io .kroxylicious .proxy .config .tls .PlatformTrustProvider ;
77+ import io .kroxylicious .proxy .config .tls .Tls ;
78+ import io .kroxylicious .proxy .config .tls .TrustProvider ;
79+ import io .kroxylicious .proxy .config .tls .TrustStore ;
6680import io .kroxylicious .proxy .tag .VisibleForTesting ;
6781
82+ import edu .umd .cs .findbugs .annotations .Nullable ;
83+
6884import static io .kroxylicious .kubernetes .operator .ResourcesUtil .name ;
6985import static io .kroxylicious .kubernetes .operator .ResourcesUtil .namespace ;
7086import static io .kroxylicious .kubernetes .operator .ResourcesUtil .toLocalRef ;
@@ -104,6 +120,7 @@ public class KafkaProxyReconciler implements
104120 public static final String CONFIG_DEP = "config" ;
105121 public static final String DEPLOYMENT_DEP = "deployment" ;
106122 public static final String CLUSTERS_DEP = "clusters" ;
123+ public static final Path TARGET_CLUSTER_MOUNTS_BASE_DIR = Path .of ("/opt/kroxylicious/target-cluster" );
107124
108125 private final Clock clock ;
109126 private final SecureConfigInterpolator secureConfigInterpolator ;
@@ -145,26 +162,25 @@ private ConfigurationFragment<Configuration> generateProxyConfig(ProxyModel mode
145162
146163 var virtualClusters = buildVirtualClusters (namedDefinitions .keySet (), model );
147164
148- List <NamedFilterDefinition > referencedFilters = virtualClusters .stream ().flatMap (c -> Optional .ofNullable (c .filters ()).stream ().flatMap (Collection ::stream ))
165+ List <NamedFilterDefinition > referencedFilters = virtualClusters .stream ()
166+ .flatMap (vcFragment -> Optional .ofNullable (vcFragment .fragment ().filters ()).stream ().flatMap (Collection ::stream ))
149167 .distinct ()
150168 .map (filterName -> namedDefinitions .get (filterName ).fragment ()).toList ();
151169
152- var allVolumes = allFilterDefinitions .stream ()
170+ var allVolumes = Stream . concat ( allFilterDefinitions .stream (), virtualClusters . stream () )
153171 .flatMap (fd -> fd .volumes ().stream ())
154172 .collect (Collectors .toCollection (() -> new TreeSet <>(Comparator .comparing (Volume ::getName ).reversed ())));
155173
156- var allMounts = allFilterDefinitions .stream ()
174+ var allMounts = Stream . concat ( allFilterDefinitions .stream (), virtualClusters . stream () )
157175 .flatMap (fd -> fd .mounts ().stream ())
158- .collect (Collectors .toCollection (() -> new TreeSet <>(Comparator .comparing (VolumeMount ::getName ).reversed ())));
159-
160- // TODO add the volume & mounts for each KafkaService's spec.tls
176+ .collect (Collectors .toCollection (() -> new TreeSet <>(Comparator .comparing (VolumeMount ::getMountPath ).reversed ())));
161177
162178 return new ConfigurationFragment <>(
163179 new Configuration (
164180 new ManagementConfiguration (null , null , new EndpointsConfiguration (new PrometheusMetricsConfig ())),
165181 referencedFilters ,
166182 null , // no defaultFilters <= each of the virtualClusters specifies its own
167- virtualClusters ,
183+ virtualClusters . stream (). map ( ConfigurationFragment :: fragment ). toList () ,
168184 List .of (),
169185 false ,
170186 // micrometer
@@ -173,11 +189,11 @@ private ConfigurationFragment<Configuration> generateProxyConfig(ProxyModel mode
173189 allMounts );
174190 }
175191
176- private static List <VirtualCluster > buildVirtualClusters (Set <String > successfullyBuiltFilterNames , ProxyModel model ) {
192+ private static List <ConfigurationFragment < VirtualCluster > > buildVirtualClusters (Set <String > successfullyBuiltFilterNames , ProxyModel model ) {
177193 return model .clustersWithValidIngresses ().stream ()
178194 .filter (cluster -> Optional .ofNullable (cluster .getSpec ().getFilterRefs ()).stream ().flatMap (Collection ::stream ).allMatch (
179195 filters -> successfullyBuiltFilterNames .contains (filterDefinitionName (filters ))))
180- .map (cluster -> getVirtualCluster (cluster , model .resolutionResult ().kafkaServiceRef (cluster ).orElseThrow (), model .ingressModel ()))
196+ .map (cluster -> buildVirtualCluster (cluster , model .resolutionResult ().kafkaServiceRef (cluster ).orElseThrow (), model .ingressModel ()))
181197 .toList ();
182198 }
183199
@@ -230,19 +246,95 @@ private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProt
230246 return secureConfigInterpolator .interpolate (configTemplate );
231247 }
232248
233- private static VirtualCluster getVirtualCluster (VirtualKafkaCluster cluster ,
234- KafkaService kafkaServiceRef ,
235- ProxyIngressModel ingressModel ) {
249+ private static ConfigurationFragment < VirtualCluster > buildVirtualCluster (VirtualKafkaCluster cluster ,
250+ KafkaService kafkaServiceRef ,
251+ ProxyIngressModel ingressModel ) {
236252
237253 ProxyIngressModel .VirtualClusterIngressModel virtualClusterIngressModel = ingressModel .clusterIngressModel (cluster ).orElseThrow ();
238- String bootstrap = kafkaServiceRef .getSpec ().getBootstrapServers ();
239- return new VirtualCluster (
240- ResourcesUtil .name (cluster ), new TargetCluster (bootstrap , Optional .empty ()),
254+
255+ return buildTargetCluster (kafkaServiceRef ).map (targetCluster -> new VirtualCluster (
256+ ResourcesUtil .name (cluster ),
257+ targetCluster ,
241258 null ,
242259 Optional .empty (),
243260 virtualClusterIngressModel .gateways (),
244- false , false ,
245- filterNamesForCluster (cluster ));
261+ false ,
262+ false ,
263+ filterNamesForCluster (cluster )));
264+ }
265+
266+ private static ConfigurationFragment <TargetCluster > buildTargetCluster (KafkaService kafkaServiceRef ) {
267+ return buildTargetClusterTls (kafkaServiceRef )
268+ .map (tls -> new TargetCluster (kafkaServiceRef .getSpec ().getBootstrapServers (), tls ));
269+ }
270+
271+ private static ConfigurationFragment <Optional <Tls >> buildTargetClusterTls (KafkaService kafkaServiceRef ) {
272+ return Optional .ofNullable (kafkaServiceRef .getSpec ())
273+ .map (KafkaServiceSpec ::getTls )
274+ .map (serviceTls -> ConfigurationFragment .combine (
275+ buildKeyProvider (serviceTls .getCertificateRef ()),
276+ buildTrustProvider (serviceTls .getTrustAnchorRef ()),
277+ (keyProviderOpt , trustProvider ) -> Optional .of (
278+ new Tls (keyProviderOpt .orElse (null ),
279+ trustProvider ,
280+ Optional .ofNullable (serviceTls .getCipherSuites ())
281+ .map (cipherSuites -> new AllowDeny <>(cipherSuites .getAllowed (), new HashSet <>(cipherSuites .getDenied ())))
282+ .orElse (null ),
283+ Optional .ofNullable (serviceTls .getProtocols ())
284+ .map (protocols -> new AllowDeny <>(protocols .getAllowed (), new HashSet <>(protocols .getDenied ())))
285+ .orElse (null )))))
286+ .orElse (ConfigurationFragment .empty ());
287+ }
288+
289+ private static ConfigurationFragment <Optional <KeyProvider >> buildKeyProvider (@ Nullable AnyLocalRef certificateRef ) {
290+ return Optional .ofNullable (certificateRef )
291+ .filter (ResourcesUtil ::isSecret )
292+ .map (ref -> {
293+ var volume = new VolumeBuilder ()
294+ .withName (ResourcesUtil .volumeName ("" , "secrets" , ref .getName ()))
295+ .withNewSecret ()
296+ .withSecretName (ref .getName ())
297+ .endSecret ()
298+ .build ();
299+ Path mountPath = TARGET_CLUSTER_MOUNTS_BASE_DIR .resolve ("client-certs" ).resolve (ref .getName ());
300+ var mount = new VolumeMountBuilder ()
301+ .withName (ResourcesUtil .volumeName ("" , "secrets" , ref .getName ()))
302+ .withMountPath (mountPath .toString ())
303+ .withReadOnly (true )
304+ .build ();
305+ var keyPath = mountPath .resolve ("tls.key" );
306+ var crtPath = mountPath .resolve ("tls.crt" );
307+ return new ConfigurationFragment <>(
308+ Optional .<KeyProvider > of (new KeyPair (keyPath .toString (), crtPath .toString (), null )),
309+ Set .of (volume ),
310+ Set .of (mount ));
311+ }).orElse (ConfigurationFragment .empty ());
312+ }
313+
314+ private static ConfigurationFragment <TrustProvider > buildTrustProvider (@ Nullable TrustAnchorRef trustAnchorRef ) {
315+ return Optional .ofNullable (trustAnchorRef )
316+ .filter (ResourcesUtil ::isConfigMap )
317+ .map (ref -> {
318+ var volume = new VolumeBuilder ()
319+ .withName (ResourcesUtil .volumeName ("" , "configmaps" , ref .getName ()))
320+ .withNewConfigMap ()
321+ .withName (ref .getName ())
322+ .endConfigMap ()
323+ .build ();
324+ Path mountPath = TARGET_CLUSTER_MOUNTS_BASE_DIR .resolve ("trusted-certs" ).resolve (ref .getName ());
325+ var mount = new VolumeMountBuilder ()
326+ .withName (ResourcesUtil .volumeName ("" , "configmaps" , ref .getName ()))
327+ .withMountPath (mountPath .toString ())
328+ .withReadOnly (true )
329+ .build ();
330+ TrustProvider trustProvider = new TrustStore (
331+ mountPath .resolve (ref .getKey ()).toString (),
332+ null ,
333+ "PEM" );
334+ return new ConfigurationFragment <>(trustProvider ,
335+ Set .of (volume ),
336+ Set .of (mount ));
337+ }).orElse (new ConfigurationFragment <>(PlatformTrustProvider .INSTANCE , Set .of (), Set .of ()));
246338 }
247339
248340 /**
@@ -266,7 +358,9 @@ public ErrorStatusUpdateControl<KafkaProxy> updateErrorStatus(KafkaProxy proxy,
266358 Context <KafkaProxy > context ,
267359 Exception e ) {
268360 if (e instanceof StaleReferentStatusException || e instanceof OperatorException && e .getCause () instanceof StaleReferentStatusException ) {
269- LOGGER .debug ("Completed reconciliation of {}/{} with stale referent" , namespace (proxy ), name (proxy ), e );
361+ if (LOGGER .isDebugEnabled ()) {
362+ LOGGER .debug ("Completed reconciliation of {}/{} with stale referent" , namespace (proxy ), name (proxy ), e );
363+ }
270364 return ErrorStatusUpdateControl .noStatusUpdate ();
271365 }
272366 var uc = ErrorStatusUpdateControl .patchStatus (statusFactory .newUnknownConditionStatusPatch (proxy , Condition .Type .Ready , e ));
0 commit comments