Skip to content

Commit 32b9a7c

Browse files
committed
WIP: Add cert-manager support to Kafka Exporter
Signed-off-by: Kate Stanley <[email protected]>
1 parent 765cbeb commit 32b9a7c

File tree

6 files changed

+624
-15
lines changed

6 files changed

+624
-15
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CertManagerUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,26 @@ public static Certificate buildCertManagerCertificate(String namespace,
104104
* Used when cert-manager has issued the CA Secret.
105105
*
106106
* @param clusterCa The Cluster CA
107+
* @param certManagerSecret cert-manager Secret containing the Cluster CA
108+
* @param namespace Namespace for the Secret
109+
* @param secretName Name of the Secret
110+
* @param keyCertName Key under which the certificate will be stored in the new Secret
111+
* @param labels Labels
112+
* @param ownerReference Owner reference
113+
* @return Newly built Secret
114+
*/
115+
public static Secret buildTrustedCertificateSecretFromCertManager(ClusterCa clusterCa, Secret certManagerSecret, String namespace,
116+
String secretName, String keyCertName, Labels labels,
117+
OwnerReference ownerReference) {
118+
// TO, UO, KE etc do not track the clientsCa generation annotation on their Secret, so pass null
119+
return buildTrustedCertificateSecretFromCertManager(clusterCa, null, certManagerSecret, namespace, secretName, keyCertName, labels, ownerReference);
120+
}
121+
122+
/**
123+
* Builds a certificate Secret based on the cert-manager provided Secret.
124+
* Used when cert-manager has issued the CA Secret.
125+
*
126+
* @param clusterCa The Cluster CA
107127
* @param clientsCa The Clients CA. If this is not null the Clients CA generation is added
108128
* @param certManagerSecret cert-manager Secret containing the Cluster CA
109129
* @param namespace Namespace for the Secret

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaExporter.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package io.strimzi.operator.cluster.model;
66

7+
import io.fabric8.certmanager.api.model.v1.Certificate;
78
import io.fabric8.kubernetes.api.model.Container;
89
import io.fabric8.kubernetes.api.model.ContainerPort;
910
import io.fabric8.kubernetes.api.model.EnvVar;
@@ -33,13 +34,18 @@
3334
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
3435
import io.strimzi.operator.common.Reconciliation;
3536
import io.strimzi.operator.common.Util;
37+
import io.strimzi.operator.common.auth.PemTrustSet;
38+
import io.strimzi.operator.common.model.Ca;
3639

40+
import java.security.cert.CertificateException;
41+
import java.security.cert.X509Certificate;
3742
import java.util.ArrayList;
3843
import java.util.HashMap;
3944
import java.util.List;
4045
import java.util.Map;
4146

4247
import static io.strimzi.api.kafka.model.common.template.DeploymentStrategy.ROLLING_UPDATE;
48+
import static io.strimzi.operator.common.model.Ca.x509Certificate;
4349

4450
/**
4551
* Kafka Exporter model
@@ -280,9 +286,25 @@ private List<VolumeMount> getVolumeMounts() {
280286
return volumeList;
281287
}
282288

289+
/**
290+
* Creates the Certificate resource for the Kafka Exporter used when cert-manager is issuing certificates
291+
*
292+
* @param clusterCa The CA for cluster certificates
293+
*
294+
* @return List of Certificate resources
295+
*/
296+
public Certificate generateCertificateResource(ClusterCa clusterCa) {
297+
return CertManagerUtils.buildCertManagerCertificate(namespace,
298+
KafkaExporterResources.secretName(cluster),
299+
clusterCa.getCertManagerCert(componentName, Ca.IO_STRIMZI),
300+
labels,
301+
ownerReference);
302+
}
303+
283304
/**
284305
* Generate the Secret containing the Kafka Exporter certificate signed by the cluster CA certificate used for TLS based
285306
* internal communication with Kafka. It also contains the related Kafka Exporter private key.
307+
* Used when Strimzi is issuing certificates.
286308
*
287309
* @param clusterCa The cluster CA.
288310
* @param existingSecret The existing secret with Kafka certificates
@@ -291,11 +313,60 @@ private List<VolumeMount> getVolumeMounts() {
291313
*
292314
* @return The generated Secret.
293315
*/
294-
public Secret generateCertificatesSecret(ClusterCa clusterCa, Secret existingSecret, boolean isMaintenanceTimeWindowsSatisfied) {
316+
public Secret generateCertificatesSecretForStrimziCa(ClusterCa clusterCa, Secret existingSecret, boolean isMaintenanceTimeWindowsSatisfied) {
295317
return CertUtils.buildTrustedCertificateSecret(reconciliation, clusterCa, existingSecret, namespace, KafkaExporterResources.secretName(cluster), componentName,
296318
COMPONENT_TYPE, labels, ownerReference, isMaintenanceTimeWindowsSatisfied);
297319
}
298320

321+
/**
322+
* Generate the Secret containing the Kafka Exporter certificate signed by the cluster CA certificate used for TLS based
323+
* internal communication with Kafka. It also contains the related Kafka Exporter private key.
324+
* Used when cert-manager is issuing certificates.
325+
*
326+
* @param clusterCa The cluster CA.
327+
* @param existingSecret Existing Secret.
328+
* @param certManagerSecret Secret managed by cert-manager, may be null.
329+
* @param pemTrustSet Trust set to use to determine if new certificates are trusted
330+
*
331+
* @return The generated Secret.
332+
*/
333+
public Secret generateCertificatesSecretForCertManagerCA(ClusterCa clusterCa, Secret existingSecret, Secret certManagerSecret, PemTrustSet pemTrustSet) {
334+
Secret newSecret = CertManagerUtils.buildTrustedCertificateSecretFromCertManager(clusterCa, certManagerSecret, namespace, KafkaExporterResources.secretName(cluster),
335+
COMPONENT_TYPE, labels, ownerReference);
336+
if (existingSecret == null) {
337+
return newSecret;
338+
} else if (CertManagerUtils.certManagerCertUpdated(existingSecret, newSecret)) {
339+
if (certManagerSecretNotTrusted(pemTrustSet, existingSecret)) {
340+
LOGGER.infoCr(reconciliation, "New certificate for Kafka Exporter, but not trusted yet so keeping existing certificate Secret.");
341+
return existingSecret;
342+
} else {
343+
LOGGER.infoCr(reconciliation, "New certificate for Kafka Exporter, updating Secret {}/{}", namespace, existingSecret.getMetadata().getName());
344+
return newSecret;
345+
}
346+
} else {
347+
// Certificate has no changed
348+
return existingSecret;
349+
}
350+
}
351+
352+
/**
353+
* Checks if the cert-manager Secret is trusted by the current CA cert
354+
*
355+
* @param certManagerSecret Secret containing cert-manager provided cert
356+
* @return Whether the cert is trusted
357+
*/
358+
private boolean certManagerSecretNotTrusted(PemTrustSet pemTrustSet, Secret certManagerSecret) {
359+
X509Certificate x509CaCert;
360+
X509Certificate certManagerCert;
361+
try {
362+
x509CaCert = x509Certificate(pemTrustSet.trustedCertificatesPemBytes());
363+
certManagerCert = x509Certificate(Util.decodeBytesFromBase64(certManagerSecret.getData().get("tls.crt")));
364+
} catch (CertificateException e) {
365+
throw new RuntimeException(e);
366+
}
367+
return !CertUtils.certIsTrusted(reconciliation, certManagerCert, x509CaCert);
368+
}
369+
299370
/**
300371
* Generates the NetworkPolicies relevant for Kafka Exporter
301372
*

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,6 @@ Future<Void> reconcileClusterOperatorSecret(Clock clock, Secret certManagerSecre
430430
if (clusterCaCertManagerType == CertificateManagerType.CERT_MANAGER_IO) {
431431
Secret newCoSecret = CertManagerUtils.buildTrustedCertificateSecretFromCertManager(
432432
clusterCa,
433-
null, //we don't need the Clients CA generation present on this Secret
434433
certManagerSecret,
435434
reconciliation.namespace(),
436435
KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()),

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconciler.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@
77
import io.fabric8.kubernetes.api.model.LocalObjectReference;
88
import io.fabric8.kubernetes.api.model.Secret;
99
import io.fabric8.kubernetes.api.model.apps.Deployment;
10+
import io.strimzi.api.kafka.model.common.CertificateManagerType;
1011
import io.strimzi.api.kafka.model.kafka.Kafka;
1112
import io.strimzi.api.kafka.model.kafka.exporter.KafkaExporterResources;
1213
import io.strimzi.operator.cluster.ClusterOperatorConfig;
14+
import io.strimzi.operator.cluster.model.CertManagerUtils;
1315
import io.strimzi.operator.cluster.model.CertUtils;
1416
import io.strimzi.operator.cluster.model.ClusterCa;
1517
import io.strimzi.operator.cluster.model.ImagePullPolicy;
1618
import io.strimzi.operator.cluster.model.KafkaExporter;
1719
import io.strimzi.operator.cluster.model.KafkaVersion;
1820
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
21+
import io.strimzi.operator.cluster.operator.resource.kubernetes.CertManagerCertificateOperator;
1922
import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator;
2023
import io.strimzi.operator.cluster.operator.resource.kubernetes.NetworkPolicyOperator;
2124
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
@@ -49,6 +52,7 @@ public class KafkaExporterReconciler {
4952
private final ServiceAccountOperator serviceAccountOperator;
5053
private final NetworkPolicyOperator networkPolicyOperator;
5154
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;
55+
private final CertManagerCertificateOperator certManagerCertificateOperator;
5256

5357
private String certificateHash = "";
5458

@@ -83,6 +87,7 @@ public KafkaExporterReconciler(
8387
this.serviceAccountOperator = supplier.serviceAccountOperations;
8488
this.networkPolicyOperator = supplier.networkPolicyOperator;
8589
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
90+
this.certManagerCertificateOperator = supplier.certManagerCertificateOperator;
8691
}
8792

8893
/**
@@ -99,7 +104,8 @@ public KafkaExporterReconciler(
99104
*/
100105
public Future<Void> reconcile(boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets, Clock clock) {
101106
return serviceAccount()
102-
.compose(i -> certificatesSecret(clock))
107+
.compose(i -> maybeReconcileCertManagerCertificates())
108+
.compose(secret -> certificatesSecret(clock, secret))
103109
.compose(i -> networkPolicy())
104110
.compose(i -> podDisruptionBudget())
105111
.compose(i -> deployment(isOpenShift, imagePullPolicy, imagePullSecrets))
@@ -121,27 +127,51 @@ private Future<Void> serviceAccount() {
121127
).mapEmpty();
122128
}
123129

130+
/**
131+
* Manages the Certificate object that is used when cert-manager is the Certificate issuer
132+
*
133+
* @return Completes when the Certificate object was successfully created, deleted or updated and returns the related Secret
134+
*/
135+
protected Future<Secret> maybeReconcileCertManagerCertificates() {
136+
//TODO handle empty reconciles when kafka exporter not enabled
137+
if (CertificateManagerType.CERT_MANAGER_IO.equals(clusterCa.getType())) {
138+
return certManagerCertificateOperator.reconcile(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()), kafkaExporter.generateCertificateResource(clusterCa))
139+
.compose(v -> certManagerCertificateOperator.waitForReady(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name())))
140+
.compose(v -> secretOperator.getAsync(reconciliation.namespace(), CertManagerUtils.certManagerSecretName(KafkaExporterResources.secretName(reconciliation.name()))));
141+
} else {
142+
return Future.succeededFuture();
143+
}
144+
}
145+
124146
/**
125147
* Manages the Kafka Exporter Secret with certificates.
126148
*
127-
* @param clock The clock for supplying the reconciler with the time instant of each reconciliation cycle.
128-
* That time is used for checking maintenance windows
149+
* @param clock The clock for supplying the reconciler with the time instant of each reconciliation cycle.
150+
* That time is used for checking maintenance windows
151+
* @param certManagerSecret Secret managed by cert-manager containing the Kafka Exporter certificate, may be null if Strimzi is issuing certificates.
129152
*
130-
* @return Future which completes when the reconciliation is done
153+
* @return Future which completes when the reconciliation is done
131154
*/
132-
private Future<Void> certificatesSecret(Clock clock) {
155+
private Future<Void> certificatesSecret(Clock clock, Secret certManagerSecret) {
133156
if (kafkaExporter != null) {
134157
return secretOperator.getAsync(reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()))
135158
.compose(oldSecret -> {
136-
Secret newSecret = kafkaExporter.generateCertificatesSecret(clusterCa, oldSecret, Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));
137-
138-
return secretOperator
139-
.reconcile(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()), newSecret)
140-
.compose(i -> {
141-
certificateHash = CertUtils.getCertificateShortThumbprint(newSecret, Ca.SecretEntry.CRT.asKey(KafkaExporter.COMPONENT_TYPE));
159+
Future<Secret> secretFuture;
160+
if (CertificateManagerType.CERT_MANAGER_IO.equals(clusterCa.getType())) {
161+
secretFuture = ReconcilerUtils.clusterCaPemTrustSet(reconciliation, secretOperator)
162+
.map(pemTrustSet -> kafkaExporter.generateCertificatesSecretForCertManagerCA(clusterCa, oldSecret, certManagerSecret, pemTrustSet));
163+
} else {
164+
secretFuture = Future.succeededFuture(kafkaExporter.generateCertificatesSecretForStrimziCa(clusterCa, oldSecret, Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant())));
165+
}
142166

167+
return secretFuture.compose(secret -> secretOperator.reconcile(reconciliation,
168+
reconciliation.namespace(),
169+
KafkaExporterResources.secretName(reconciliation.name()),
170+
secret)
171+
.compose(result -> {
172+
certificateHash = CertUtils.getCertificateShortThumbprint(secret, Ca.SecretEntry.CRT.asKey(KafkaExporter.COMPONENT_TYPE));
143173
return Future.succeededFuture();
144-
});
174+
}));
145175
});
146176
} else {
147177
return secretOperator

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ReconcilerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public static Future<TlsPemIdentity> coTlsPemIdentity(Reconciliation reconciliat
135135
*
136136
* @return Future containing the trust set to use for client authentication.
137137
*/
138-
private static Future<PemTrustSet> clusterCaPemTrustSet(Reconciliation reconciliation, SecretOperator secretOperator) {
138+
public static Future<PemTrustSet> clusterCaPemTrustSet(Reconciliation reconciliation, SecretOperator secretOperator) {
139139
return getSecret(secretOperator, reconciliation.namespace(), KafkaResources.clusterCaCertificateSecretName(reconciliation.name()))
140140
.map(PemTrustSet::new);
141141
}

0 commit comments

Comments
 (0)