Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
Expand All @@ -35,6 +37,12 @@ const (
// IntegrationSinkConditionEventPoliciesReady has status True when all the applying EventPolicies for this
// IntegrationSink are ready.
IntegrationSinkConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"

// IntegrationSinkConditionCertificateReady has status True when the IntegrationSink's certificate is ready.
IntegrationSinkConditionCertificateReady apis.ConditionType = "CertificateReady"

// Certificate related condition reasons
IntegrationSinkCertificateNotReady string = "CertificateNotReady"
)

var IntegrationSinkCondSet = apis.NewLivingConditionSet(
Expand Down Expand Up @@ -112,6 +120,37 @@ func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.DeploymentSt
}
}

func (s *IntegrationSinkStatus) PropagateCertificateStatus(cs cmv1.CertificateStatus) bool {
var topLevel *cmv1.CertificateCondition
for _, cond := range cs.Conditions {
if cond.Type == cmv1.CertificateConditionReady {
topLevel = &cond
break
}
}

if topLevel == nil {
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionCertificateReady,
IntegrationSinkCertificateNotReady, "Certificate is progressing")
return false
}

if topLevel.Status == cmmeta.ConditionUnknown {
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionCertificateReady,
IntegrationSinkCertificateNotReady, "Certificate is progressing, "+topLevel.Reason+" Message: "+topLevel.Message)
return false
}

if topLevel.Status == cmmeta.ConditionFalse {
IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionCertificateReady,
IntegrationSinkCertificateNotReady, "Certificate is not ready, "+topLevel.Reason+" Message: "+topLevel.Message)
return false
}

IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionCertificateReady)
return true
}

func (s *IntegrationSinkStatus) SetAddress(address *duckv1.Addressable) {
s.Address = address
if address == nil || address.URL.IsEmpty() {
Expand Down
126 changes: 101 additions & 25 deletions pkg/reconciler/integration/sink/integrationsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
deploymentUpdated = "DeploymentUpdated"
serviceCreated = "ServiceCreated"
certificateCreated = "CertificateCreated"
certificateUpdated = "CertificateUpdated"
serviceUpdated = "ServiceUpdated"
)

Expand All @@ -87,31 +88,35 @@ func newReconciledNormal(namespace, name string) reconciler.Event {

func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationSink) reconciler.Event {
featureFlags := feature.FromContext(ctx)
logger := logging.FromContext(ctx)

if featureFlags.IsPermissiveTransportEncryption() || featureFlags.IsStrictTransportEncryption() {
_, err := r.reconcileCMCertificate(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Certificate", zap.Error(err))
return err
}
logger.Debugw("Reconciling IntegrationSink Certificate")
_, err := r.reconcileIntegrationSinkCertificate(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Certificate", zap.Error(err))
return err
}

_, err := r.reconcileDeployment(ctx, sink, featureFlags)
logger.Debugw("Reconciling IntegrationSink Deployment")
_, err = r.reconcileDeployment(ctx, sink, featureFlags)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Pod", zap.Error(err))
return err
}

logger.Debugw("Reconciling IntegrationSink Service")
_, err = r.reconcileService(ctx, sink)
if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Service", zap.Error(err))
return err
}

logger.Debugw("Reconciling IntegrationSink address")
if err := r.reconcileAddress(ctx, sink); err != nil {
return fmt.Errorf("failed to reconcile address: %w", err)
}

logger.Debugw("Updating IntegrationSink status with EventPolicies")
err = auth.UpdateStatusWithEventPolicies(featureFlags, &sink.Status.AppliedEventPoliciesStatus, &sink.Status, r.eventPolicyLister, sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update IntegrationSink status with EventPolicies: %v", err)
Expand Down Expand Up @@ -171,34 +176,78 @@ func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.Integrati
return svc, nil
}

func (r *Reconciler) reconcileCMCertificate(ctx context.Context, sink *sinks.IntegrationSink) (*cmv1.Certificate, error) {
func (r *Reconciler) reconcileIntegrationSinkCertificate(ctx context.Context, sink *sinks.IntegrationSink) (*cmv1.Certificate, error) {
if f := feature.FromContext(ctx); !f.IsStrictTransportEncryption() && !f.IsPermissiveTransportEncryption() {
return nil, r.deleteIntegrationSinkCertificate(ctx, sink)
}

expected := certificates.MakeCertificate(sink, certificates.WithDNSNames(
network.GetServiceHostname(resources.DeploymentName(sink.GetName()), sink.GetNamespace()),
fmt.Sprintf("%s.%s.svc", resources.DeploymentName(sink.GetName()), sink.GetNamespace()),
))
expected := integrationSinkCertificate(sink)

lister := r.cmCertificateLister.Load()
if lister == nil || *lister == nil {
cmCertificateLister := r.cmCertificateLister.Load()
if cmCertificateLister == nil || *cmCertificateLister == nil {
return nil, fmt.Errorf("no cert-manager certificate lister created yet, this should rarely happen and recover")
}

cert, err := (*lister).Certificates(sink.Namespace).Get(expected.Name)
curr, err := (*cmCertificateLister).Certificates(expected.GetNamespace()).Get(expected.GetName())
if apierrors.IsNotFound(err) {
cert, err := r.certManagerClient.CertmanagerV1().Certificates(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{})
created, err := r.createCertificate(ctx, sink, expected)
if err != nil {
return nil, fmt.Errorf("creating new Certificate: %v", err)
return nil, err
}
controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, certificateCreated, "Certificate created %q", cert.Name)
} else if err != nil {
return nil, fmt.Errorf("getting Certificate: %v", err)
} else if !metav1.IsControlledBy(cert, sink) {
return nil, fmt.Errorf("Certificate %q is not owned by IntegrationSink %q", cert.Name, sink.Name)
} else {
logging.FromContext(ctx).Debugw("Reusing existing Certificate", zap.Any("Certificate", cert))
if !sink.Status.PropagateCertificateStatus(created.Status) {
// Wait for Certificate to become ready before continuing.
return nil, controller.NewSkipKey("")
}
return created, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get certificate %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}

return cert, nil
if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) &&
equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) &&
equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) {
if !sink.Status.PropagateCertificateStatus(curr.Status) {
// Wait for Certificate to become ready before continuing.
return nil, controller.NewSkipKey("")
}
return curr, nil
}
expected.ResourceVersion = curr.ResourceVersion
updated, err := r.updateCertificate(ctx, sink, expected)
if err != nil {
return nil, err
}
if !sink.Status.PropagateCertificateStatus(updated.Status) {
// Wait for Certificate to become ready before continuing.
return nil, controller.NewSkipKey("")
}
return updated, nil
}

func (r *Reconciler) deleteIntegrationSinkCertificate(ctx context.Context, sink *sinks.IntegrationSink) error {
certificate := integrationSinkCertificate(sink)

cmCertificateLister := r.cmCertificateLister.Load()
if cmCertificateLister != nil && *cmCertificateLister != nil {
_, err := (*cmCertificateLister).Certificates(certificate.GetNamespace()).Get(certificate.GetName())
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to get certificate %s/%s: %w", certificate.GetNamespace(), certificate.GetName(), err)
}
}

err := r.certManagerClient.CertmanagerV1().Certificates(certificate.GetNamespace()).Delete(ctx, certificate.GetName(), metav1.DeleteOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to delete certificate %s/%s: %w", certificate.GetNamespace(), certificate.GetName(), err)
}
controller.GetEventRecorder(ctx).Event(sink, corev1.EventTypeNormal, "IntegrationSinkCertificateDeleted", certificate.GetName())
return nil
}

func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.IntegrationSink) error {
Expand Down Expand Up @@ -303,3 +352,30 @@ func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1
}
return false
}

func integrationSinkCertificate(sink *sinks.IntegrationSink) *cmv1.Certificate {
return certificates.MakeCertificate(sink,
certificates.WithDNSNames(
network.GetServiceHostname(resources.DeploymentName(sink.Name), sink.Namespace),
fmt.Sprintf("%s.%s.svc", resources.DeploymentName(sink.Name), sink.Namespace),
),
)
}

func (r *Reconciler) createCertificate(ctx context.Context, sink *sinks.IntegrationSink, expected *cmv1.Certificate) (*cmv1.Certificate, error) {
created, err := r.certManagerClient.CertmanagerV1().Certificates(expected.GetNamespace()).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("creating new Certificate %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, certificateCreated, "Certificate created %q", expected.GetName())
return created, nil
}

func (r *Reconciler) updateCertificate(ctx context.Context, sink *sinks.IntegrationSink, expected *cmv1.Certificate) (*cmv1.Certificate, error) {
updated, err := r.certManagerClient.CertmanagerV1().Certificates(expected.GetNamespace()).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to update certificate %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
controller.GetEventRecorder(ctx).Event(sink, corev1.EventTypeNormal, certificateUpdated, expected.GetName())
return updated, nil
}
19 changes: 14 additions & 5 deletions pkg/reconciler/integration/sink/integrationsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package sink

import (
"fmt"
"sync/atomic"

cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1"

"knative.dev/eventing/pkg/certificates"

Expand Down Expand Up @@ -151,12 +154,18 @@ func TestReconcile(t *testing.T) {
logger := logtesting.TestLogger(t)
table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
ctx = addressable.WithDuck(ctx)

cmCertificatesListerAtomic := &atomic.Pointer[cmlisters.CertificateLister]{}
cmCertificatesLister := listers.GetCertificateLister()
cmCertificatesListerAtomic.Store(&cmCertificatesLister)

r := &Reconciler{
kubeClientSet: fakekubeclient.Get(ctx),
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
secretLister: listers.GetSecretLister(),
eventPolicyLister: listers.GetEventPolicyLister(),
kubeClientSet: fakekubeclient.Get(ctx),
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
secretLister: listers.GetSecretLister(),
cmCertificateLister: cmCertificatesListerAtomic,
eventPolicyLister: listers.GetEventPolicyLister(),
}

return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r)
Expand Down
Loading