diff --git a/internal/controller/dependency_predicate.go b/internal/controller/dependency_predicate.go new file mode 100644 index 00000000..a2a92d65 --- /dev/null +++ b/internal/controller/dependency_predicate.go @@ -0,0 +1,54 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/fluxcd/pkg/runtime/conditions" + + kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1" +) + +type KustomizationReadyChangePredicate struct { + predicate.Funcs +} + +func (KustomizationReadyChangePredicate) Update(e event.UpdateEvent) bool { + if e.ObjectNew == nil || e.ObjectOld == nil { + return false + } + + newKs, ok := e.ObjectNew.(*kustomizev1.Kustomization) + if !ok { + return false + } + oldKs, ok := e.ObjectOld.(*kustomizev1.Kustomization) + if !ok { + return false + } + + if !conditions.IsReady(newKs) { + return false + } + if !conditions.IsReady(oldKs) { + return true + } + + return oldKs.Status.LastAppliedRevision != newKs.Status.LastAppliedRevision +} diff --git a/internal/controller/kustomization_controller.go b/internal/controller/kustomization_controller.go index d65cb6bc..688eca51 100644 --- a/internal/controller/kustomization_controller.go +++ b/internal/controller/kustomization_controller.go @@ -110,6 +110,7 @@ type KustomizationReconciler struct { type KustomizationReconcilerOptions struct { HTTPRetry int DependencyRequeueInterval time.Duration + EnableDependencyQueueing bool RateLimiter workqueue.TypedRateLimiter[reconcile.Request] } @@ -142,7 +143,7 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName) r.artifactFetchRetries = opts.HTTPRetry - return ctrl.NewControllerManagedBy(mgr). + controllerBuilder := ctrl.NewControllerManagedBy(mgr). For(&kustomizev1.Kustomization{}, builder.WithPredicates( predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}), )). @@ -160,7 +161,24 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl &sourcev1.Bucket{}, handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(bucketIndexKey)), builder.WithPredicates(SourceRevisionChangePredicate{}), - ). + ) + + if opts.EnableDependencyQueueing { + // Index the Kustomizations by the dependsOn references they (may) point at. + if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, dependsOnIndexKey, + r.indexDependsOn); err != nil { + return fmt.Errorf("failed setting index fields: %w", err) + } + + controllerBuilder = controllerBuilder. + Watches( + &kustomizev1.Kustomization{}, + handler.EnqueueRequestsFromMapFunc(r.requestsForDependents), + builder.WithPredicates(KustomizationReadyChangePredicate{}), + ) + } + + return controllerBuilder. WithOptions(controller.Options{ RateLimiter: opts.RateLimiter, }). @@ -272,7 +290,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques if err := r.checkDependencies(ctx, obj, artifactSource); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err) msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String()) - log.Info(msg) + log.Info(msg, "err", err) r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) return ctrl.Result{RequeueAfter: r.requeueDependency}, nil } diff --git a/internal/controller/kustomization_dependson_test.go b/internal/controller/kustomization_dependson_test.go index 100575e3..60eeea4a 100644 --- a/internal/controller/kustomization_dependson_test.go +++ b/internal/controller/kustomization_dependson_test.go @@ -134,6 +134,14 @@ spec: }, } + dependencyKey := types.NamespacedName{ + Name: fmt.Sprintf("dep-%s", randStringRunes(5)), + Namespace: id, + } + dependencyKs := kustomization.DeepCopy() + dependencyKs.ObjectMeta.Name = dependencyKey.Name + dependencyKs.ObjectMeta.Namespace = dependencyKey.Namespace + g.Expect(k8sClient.Create(context.Background(), kustomization)).To(Succeed()) resultK := &kustomizev1.Kustomization{} @@ -170,8 +178,8 @@ spec: _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) resultK.Spec.DependsOn = []meta.NamespacedObjectReference{ { - Namespace: id, - Name: "root", + Namespace: dependencyKey.Namespace, + Name: dependencyKey.Name, }, } return k8sClient.Update(context.Background(), resultK) @@ -183,4 +191,15 @@ spec: return ready.Reason == meta.DependencyNotReadyReason }, timeout, time.Second).Should(BeTrue()) }) + + t.Run("reconciles once dependency becomes ready", func(t *testing.T) { + g := NewWithT(t) + g.Expect(k8sClient.Create(context.Background(), dependencyKs)).To(Succeed()) + + g.Eventually(func() bool { + _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) + ready := apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition) + return ready.Reason == meta.ReconciliationSucceededReason + }, timeout, time.Second).Should(BeTrue()) + }) } diff --git a/internal/controller/kustomization_indexers.go b/internal/controller/kustomization_indexers.go index e3c6ce27..bb9338cc 100644 --- a/internal/controller/kustomization_indexers.go +++ b/internal/controller/kustomization_indexers.go @@ -21,17 +21,24 @@ import ( "fmt" "github.com/fluxcd/pkg/runtime/conditions" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/dependency" sourcev1 "github.com/fluxcd/source-controller/api/v1" kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1" ) +const ( + dependsOnIndexKey string = ".metadata.dependsOn" +) + func (r *KustomizationReconciler) requestsForRevisionChangeOf(indexKey string) handler.MapFunc { return func(ctx context.Context, obj client.Object) []reconcile.Request { log := ctrl.LoggerFrom(ctx) @@ -78,6 +85,53 @@ func (r *KustomizationReconciler) requestsForRevisionChangeOf(indexKey string) h } } +func isNotReadyForDependency(k *kustomizev1.Kustomization) bool { + c := conditions.Get(k, meta.ReadyCondition) + if c == nil { + return false + } + return c.Status == metav1.ConditionFalse && c.Reason == meta.DependencyNotReadyReason +} + +func (r *KustomizationReconciler) requestsForDependents(ctx context.Context, obj client.Object) []reconcile.Request { + log := ctrl.LoggerFrom(ctx) + + var list kustomizev1.KustomizationList + if err := r.List(ctx, &list, client.MatchingFields{ + dependsOnIndexKey: client.ObjectKeyFromObject(obj).String(), + }); err != nil { + log.Error(err, "failed to list objects for dependency change") + return nil + } + var dd []dependency.Dependent + for _, d := range list.Items { + if isNotReadyForDependency(&d) { + dd = append(dd, &d) + } + } + sorted, err := dependency.Sort(dd) + if err != nil { + log.Error(err, "failed to sort dependents for dependency change") + return nil + } + reqs := make([]reconcile.Request, 0, len(sorted)) + debugLog := log.V(1).WithValues("dependency", map[string]string{ + "name": obj.GetName(), + "namespace": obj.GetNamespace(), + }) + for _, d := range sorted { + debugLog.Info("requesting reconciliation of dependent", "dependent", map[string]string{ + "name": d.Name, + "namespace": d.Namespace, + }) + reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{ + Name: d.Name, + Namespace: d.Namespace, + }}) + } + return reqs +} + func (r *KustomizationReconciler) indexBy(kind string) func(o client.Object) []string { return func(o client.Object) []string { k, ok := o.(*kustomizev1.Kustomization) @@ -96,3 +150,21 @@ func (r *KustomizationReconciler) indexBy(kind string) func(o client.Object) []s return nil } } + +func (r *KustomizationReconciler) indexDependsOn(o client.Object) []string { + k, ok := o.(*kustomizev1.Kustomization) + if !ok { + panic(fmt.Sprintf("Expected a Kustomization, got %T", o)) + } + + deps := make([]string, len(k.Spec.DependsOn)) + for i, dep := range k.Spec.DependsOn { + namespace := k.GetNamespace() + if dep.Namespace != "" { + namespace = dep.Namespace + } + deps[i] = fmt.Sprintf("%s/%s", namespace, dep.Name) + } + + return deps +} diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 2f7053a7..d731f2de 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -185,6 +185,7 @@ func TestMain(m *testing.M) { } if err := (reconciler).SetupWithManager(ctx, testEnv, KustomizationReconcilerOptions{ DependencyRequeueInterval: 2 * time.Second, + EnableDependencyQueueing: true, }); err != nil { panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err)) } diff --git a/internal/features/features.go b/internal/features/features.go index c52e7566..344b9632 100644 --- a/internal/features/features.go +++ b/internal/features/features.go @@ -48,6 +48,11 @@ const ( // GroupChangelog controls groups kubernetes objects names on log output // reduces cardinality of logs when logging to elasticsearch GroupChangeLog = "GroupChangeLog" + + // EnableDependencyQueueing controls whether reconciliation of a kustomization + // should be queued once one of its dependencies becomes ready, or if only + // time-based retries with reque-dependecy delays should be attempted + EnableDependencyQueueing = "EnableDependencyQueueing" ) var features = map[string]bool{ @@ -66,6 +71,8 @@ var features = map[string]bool{ // GroupChangeLog // opt-in from v1.5 GroupChangeLog: false, + // EnableDependencyQueueing + EnableDependencyQueueing: false, } // FeatureGates contains a list of all supported feature gates and diff --git a/main.go b/main.go index c0f5651d..5aa9ddec 100644 --- a/main.go +++ b/main.go @@ -240,6 +240,12 @@ func main() { os.Exit(1) } + enableDependencyQueueing, err := features.Enabled(features.EnableDependencyQueueing) + if err != nil { + setupLog.Error(err, "unable to check feature gate "+features.EnableDependencyQueueing) + os.Exit(1) + } + if err = (&controller.KustomizationReconciler{ ControllerName: controllerName, DefaultServiceAccount: defaultServiceAccount, @@ -259,6 +265,7 @@ func main() { GroupChangeLog: groupChangeLog, }).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{ DependencyRequeueInterval: requeueDependency, + EnableDependencyQueueing: enableDependencyQueueing, HTTPRetry: httpRetry, RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions), }); err != nil {