diff --git a/go.mod b/go.mod index 2ff6e62a..0f23dd32 100644 --- a/go.mod +++ b/go.mod @@ -23,12 +23,12 @@ require ( github.com/fluxcd/pkg/apis/acl v0.9.0 github.com/fluxcd/pkg/apis/event v0.20.0 github.com/fluxcd/pkg/apis/kustomize v1.13.0 - github.com/fluxcd/pkg/apis/meta v1.22.0 + github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab github.com/fluxcd/pkg/auth v0.32.0 github.com/fluxcd/pkg/cache v0.12.0 github.com/fluxcd/pkg/http/fetch v0.20.0 github.com/fluxcd/pkg/kustomize v1.23.0 - github.com/fluxcd/pkg/runtime v0.88.0 + github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab github.com/fluxcd/pkg/ssa v0.60.0 github.com/fluxcd/pkg/tar v0.15.0 github.com/fluxcd/pkg/testserver v0.13.0 diff --git a/go.sum b/go.sum index 177892f9..6450aa77 100644 --- a/go.sum +++ b/go.sum @@ -194,8 +194,8 @@ github.com/fluxcd/pkg/apis/event v0.20.0 h1:Vxd1kkS/CsQNPHTbmlL4qOcCmUmavEtaEOod github.com/fluxcd/pkg/apis/event v0.20.0/go.mod h1:wyY+8BHicfFP7sXzhMrKpZTQeojCsSpK9idAidjv61c= github.com/fluxcd/pkg/apis/kustomize v1.13.0 h1:GGf0UBVRIku+gebY944icVeEIhyg1P/KE3IrhOyJJnE= github.com/fluxcd/pkg/apis/kustomize v1.13.0/go.mod h1:TLKVqbtnzkhDuhWnAsN35977HvRfIjs+lgMuNro/LEc= -github.com/fluxcd/pkg/apis/meta v1.22.0 h1:EHWQH5ZWml7i8eZ/AMjm1jxid3j/PQ31p+hIwCt6crM= -github.com/fluxcd/pkg/apis/meta v1.22.0/go.mod h1:Kc1+bWe5p0doROzuV9XiTfV/oL3ddsemYXt8ZYWdVVg= +github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab h1:qV92SLmuKonakYhMSTX3//eRaJHVfCAGcmbdb/7tBrA= +github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab/go.mod h1:Kc1+bWe5p0doROzuV9XiTfV/oL3ddsemYXt8ZYWdVVg= github.com/fluxcd/pkg/auth v0.32.0 h1:D0RkbWlT2gqcEaEr6GLnm1XP1KDIYQI8zWzuZVnsE5I= github.com/fluxcd/pkg/auth v0.32.0/go.mod h1:Yhe6p3/wTUj80yrOqhpsbA48hQRM14OKwo3Qr4199XM= github.com/fluxcd/pkg/cache v0.12.0 h1:mabABT3jIfuo84VbIW+qvfqMZ7PbM5tXQgQvA2uo2rc= @@ -206,8 +206,8 @@ github.com/fluxcd/pkg/http/fetch v0.20.0 h1:/Lvcu1JzABBLuQYuLKYh1K02a+RqbP4b5wIZ github.com/fluxcd/pkg/http/fetch v0.20.0/go.mod h1:9inwDiGOpuo14Rp06TpcgsYSkvp4YM+uWCjgDmpXMNk= github.com/fluxcd/pkg/kustomize v1.23.0 h1:4tNh30OsIj96YRfVP7qP0Fv3QTwdBo/udfZIcccL6NI= github.com/fluxcd/pkg/kustomize v1.23.0/go.mod h1:ZojUvmI4RiHk3BH3L3mBQ4ZDbNkiWfX9LvOMBjKq5Tc= -github.com/fluxcd/pkg/runtime v0.88.0 h1:EFPJ0jnRino6yUEwiNtQTpUNyCf96N2MJb+S7LVG648= -github.com/fluxcd/pkg/runtime v0.88.0/go.mod h1:qkmPX009tgiWufQ2Vj0QhyNgEU+0Cnz7Xy/naihLM10= +github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab h1:MuF0vVykSXQoVP9vTYjP6xUoj41TUhuIXyL8NQG5fJU= +github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab/go.mod h1:qkmPX009tgiWufQ2Vj0QhyNgEU+0Cnz7Xy/naihLM10= github.com/fluxcd/pkg/sourceignore v0.15.0 h1:tB30fuk4jlB3UGlR7ppJguZ3zaJh1iwuTCEufs91jSM= github.com/fluxcd/pkg/sourceignore v0.15.0/go.mod h1:mZ9X6gNtNkq9ZsD35LebEYjePc7DRvB2JdowMNoj6IU= github.com/fluxcd/pkg/ssa v0.60.0 h1:ikA78TWSLDmIc8I/goGAU/buYF6jto/gswE5hnOfWGk= diff --git a/internal/controller/kustomization_controller.go b/internal/controller/kustomization_controller.go index e55536d7..f3eb26b6 100644 --- a/internal/controller/kustomization_controller.go +++ b/internal/controller/kustomization_controller.go @@ -114,12 +114,11 @@ type KustomizationReconciler struct { // Feature gates - AdditiveCELDependencyCheck bool - AllowExternalArtifact bool - CancelHealthCheckOnNewRevision bool - FailFast bool - GroupChangeLog bool - StrictSubstitutions bool + AdditiveCELDependencyCheck bool + AllowExternalArtifact bool + FailFast bool + GroupChangeLog bool + StrictSubstitutions bool } func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { @@ -271,6 +270,17 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil } + // Handle health check cancellation. + if errors.Is(reconcileErr, &runtimeCtrl.QueueEventSource{}) { + qes := reconcileErr.(*runtimeCtrl.QueueEventSource) + ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes) + conditions.MarkFalse(obj, + meta.ReadyCondition, + meta.HealthCheckCanceledReason, + "New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name) + return ctrl.Result{}, nil + } + // Broadcast the reconciliation failure and requeue at the specified retry interval. if reconcileErr != nil { log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s", @@ -498,6 +508,11 @@ func (r *KustomizationReconciler) reconcile( isNewRevision, drifted, changeSet.ToObjMetadataSet()); err != nil { + + if errors.Is(err, &runtimeCtrl.QueueEventSource{}) { + return err + } + obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.HealthCheckFailedReason, historyMeta) conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err) return err @@ -984,43 +999,15 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context, } // Check the health with a default timeout of 30sec shorter than the reconciliation interval. - healthCtx := ctx - if r.CancelHealthCheckOnNewRevision { - // Create a cancellable context for health checks that monitors for new revisions - var cancel context.CancelFunc - healthCtx, cancel = context.WithCancel(ctx) - defer cancel() - - // Start monitoring for new revisions to allow early cancellation - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-healthCtx.Done(): - return - case <-ticker.C: - // Get the latest source artifact - latestSrc, err := r.getSource(ctx, obj) - if err == nil && latestSrc.GetArtifact() != nil { - if newRevision := latestSrc.GetArtifact().Revision; newRevision != revision { - const msg = "New revision detected during health check, cancelling" - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) - ctrl.LoggerFrom(ctx).Info(msg, "current", revision, "new", newRevision) - cancel() - return - } - } - } - } - }() - } + healthCtx := runtimeCtrl.GetInterruptContext(ctx) if err := manager.WaitForSetWithContext(healthCtx, toCheck, ssa.WaitOptions{ Interval: 5 * time.Second, Timeout: obj.GetTimeout(), FailFast: r.FailFast, }); err != nil { + if is, err := runtimeCtrl.IsObjectEnqueued(ctx); is { + return err + } conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err) conditions.MarkFalse(obj, meta.HealthyCondition, meta.HealthCheckFailedReason, "%s", err) return fmt.Errorf("health check failed after %s: %w", time.Since(checkStart).String(), err) diff --git a/internal/controller/kustomization_manager.go b/internal/controller/kustomization_manager.go index bd3652bb..2907cc5e 100644 --- a/internal/controller/kustomization_manager.go +++ b/internal/controller/kustomization_manager.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + runtimeCtrl "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -38,9 +39,10 @@ import ( // KustomizationReconcilerOptions contains options for the KustomizationReconciler. type KustomizationReconcilerOptions struct { - RateLimiter workqueue.TypedRateLimiter[reconcile.Request] - WatchConfigsPredicate predicate.Predicate - WatchExternalArtifacts bool + RateLimiter workqueue.TypedRateLimiter[reconcile.Request] + WatchConfigsPredicate predicate.Predicate + WatchExternalArtifacts bool + CancelHealthCheckOnNewRevision bool } // SetupWithManager sets up the controller with the Manager. @@ -129,43 +131,64 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl return fmt.Errorf("failed creating index %s: %w", indexSecret, err) } - ctrlBuilder := ctrl.NewControllerManagedBy(mgr). - For(&kustomizev1.Kustomization{}, builder.WithPredicates( - predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}), - )). + var blder *builder.Builder + var toComplete reconcile.TypedReconciler[reconcile.Request] + var enqueueRequestsFromMapFunc func(objKind string, fn handler.MapFunc) handler.EventHandler + + ksPredicate := predicate.Or( + predicate.GenerationChangedPredicate{}, + predicates.ReconcileRequestedPredicate{}, + ) + + if !opts.CancelHealthCheckOnNewRevision { + toComplete = r + enqueueRequestsFromMapFunc = func(objKind string, fn handler.MapFunc) handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(fn) + } + blder = ctrl.NewControllerManagedBy(mgr). + For(&kustomizev1.Kustomization{}, builder.WithPredicates(ksPredicate)) + } else { + wr := runtimeCtrl.WrapReconciler(r) + toComplete = wr + enqueueRequestsFromMapFunc = wr.EnqueueRequestsFromMapFunc + blder = runtimeCtrl.NewControllerManagedBy(mgr, wr). + For(&kustomizev1.Kustomization{}, ksPredicate).Builder + } + + blder. Watches( &sourcev1.OCIRepository{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexOCIRepository)), + enqueueRequestsFromMapFunc(sourcev1.OCIRepositoryKind, r.requestsForRevisionChangeOf(indexOCIRepository)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). Watches( &sourcev1.GitRepository{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexGitRepository)), + enqueueRequestsFromMapFunc(sourcev1.GitRepositoryKind, r.requestsForRevisionChangeOf(indexGitRepository)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). Watches( &sourcev1.Bucket{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexBucket)), + enqueueRequestsFromMapFunc(sourcev1.BucketKind, r.requestsForRevisionChangeOf(indexBucket)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). WatchesMetadata( &corev1.ConfigMap{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexConfigMap)), + enqueueRequestsFromMapFunc("ConfigMap", r.requestsForConfigDependency(indexConfigMap)), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate), ). WatchesMetadata( &corev1.Secret{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexSecret)), + enqueueRequestsFromMapFunc("Secret", r.requestsForConfigDependency(indexSecret)), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate), ) if opts.WatchExternalArtifacts { - ctrlBuilder = ctrlBuilder.Watches( + blder = blder.Watches( &sourcev1.ExternalArtifact{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexExternalArtifact)), + enqueueRequestsFromMapFunc(sourcev1.ExternalArtifactKind, r.requestsForRevisionChangeOf(indexExternalArtifact)), builder.WithPredicates(SourceRevisionChangePredicate{}), ) } - return ctrlBuilder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(r) + return blder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(toComplete) } diff --git a/internal/controller/kustomization_wait_test.go b/internal/controller/kustomization_wait_test.go index d178d8ee..7553867a 100644 --- a/internal/controller/kustomization_wait_test.go +++ b/internal/controller/kustomization_wait_test.go @@ -473,9 +473,6 @@ func TestKustomizationReconciler_CancelHealthCheckOnNewRevision(t *testing.T) { resultK := &kustomizev1.Kustomization{} timeout := 60 * time.Second - reconciler.CancelHealthCheckOnNewRevision = true - t.Cleanup(func() { reconciler.CancelHealthCheckOnNewRevision = false }) - err := createNamespace(id) g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace") @@ -616,15 +613,4 @@ spec: _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) return resultK.Status.LastAttemptedRevision == "main/"+fixedArtifact }, timeout, time.Second).Should(BeTrue()) - - // Check cancellation event was emitted - events := getEvents(resultK.GetName(), nil) - var found bool - for _, e := range events { - if e.Message == "New revision detected during health check, cancelling" { - found = true - break - } - } - g.Expect(found).To(BeTrue(), "did not find event for health check cancellation") } diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 91a9c6e9..60ee7efd 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -187,8 +187,9 @@ func TestMain(m *testing.M) { SOPSAgeSecret: sopsAgeSecret, } if err := (reconciler).SetupWithManager(ctx, testEnv, KustomizationReconcilerOptions{ - WatchConfigsPredicate: predicate.Not(predicate.Funcs{}), - WatchExternalArtifacts: true, + WatchConfigsPredicate: predicate.Not(predicate.Funcs{}), + WatchExternalArtifacts: true, + CancelHealthCheckOnNewRevision: true, }); err != nil { panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err)) } diff --git a/main.go b/main.go index c1a7491e..b906bf91 100644 --- a/main.go +++ b/main.go @@ -313,34 +313,34 @@ func main() { } if err = (&controller.KustomizationReconciler{ - AdditiveCELDependencyCheck: additiveCELDependencyCheck, - AllowExternalArtifact: allowExternalArtifact, - CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision, - APIReader: mgr.GetAPIReader(), - ArtifactFetchRetries: httpRetry, - Client: mgr.GetClient(), - ClusterReader: clusterReader, - ConcurrentSSA: concurrentSSA, - ControllerName: controllerName, - DefaultServiceAccount: defaultServiceAccount, - DependencyRequeueInterval: requeueDependency, - DisallowedFieldManagers: disallowedFieldManagers, - EventRecorder: eventRecorder, - FailFast: failFast, - GroupChangeLog: groupChangeLog, - KubeConfigOpts: kubeConfigOpts, - Mapper: restMapper, - Metrics: metricsH, - NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs, - NoRemoteBases: noRemoteBases, - SOPSAgeSecret: sopsAgeSecret, - StatusManager: fmt.Sprintf("gotk-%s", controllerName), - StrictSubstitutions: strictSubstitutions, - TokenCache: tokenCache, + AdditiveCELDependencyCheck: additiveCELDependencyCheck, + AllowExternalArtifact: allowExternalArtifact, + APIReader: mgr.GetAPIReader(), + ArtifactFetchRetries: httpRetry, + Client: mgr.GetClient(), + ClusterReader: clusterReader, + ConcurrentSSA: concurrentSSA, + ControllerName: controllerName, + DefaultServiceAccount: defaultServiceAccount, + DependencyRequeueInterval: requeueDependency, + DisallowedFieldManagers: disallowedFieldManagers, + EventRecorder: eventRecorder, + FailFast: failFast, + GroupChangeLog: groupChangeLog, + KubeConfigOpts: kubeConfigOpts, + Mapper: restMapper, + Metrics: metricsH, + NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs, + NoRemoteBases: noRemoteBases, + SOPSAgeSecret: sopsAgeSecret, + StatusManager: fmt.Sprintf("gotk-%s", controllerName), + StrictSubstitutions: strictSubstitutions, + TokenCache: tokenCache, }).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{ - RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions), - WatchConfigsPredicate: watchConfigsPredicate, - WatchExternalArtifacts: allowExternalArtifact, + RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions), + WatchConfigsPredicate: watchConfigsPredicate, + WatchExternalArtifacts: allowExternalArtifact, + CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", controllerName) os.Exit(1)