Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions internal/controller/dependency_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/fluxcd/pkg/runtime/conditions"

kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
)

type KustomizationReadyChangePredicate struct {
predicate.Funcs
}

func (KustomizationReadyChangePredicate) Update(e event.UpdateEvent) bool {
if e.ObjectNew == nil || e.ObjectOld == nil {
return false
}

newKs, ok := e.ObjectNew.(*kustomizev1.Kustomization)
if !ok {
return false
}
oldKs, ok := e.ObjectOld.(*kustomizev1.Kustomization)
if !ok {
return false
}

if !conditions.IsReady(newKs) {
return false
}
if !conditions.IsReady(oldKs) {
return true
}

return oldKs.Status.LastAppliedRevision != newKs.Status.LastAppliedRevision
}
24 changes: 21 additions & 3 deletions internal/controller/kustomization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type KustomizationReconciler struct {
type KustomizationReconcilerOptions struct {
HTTPRetry int
DependencyRequeueInterval time.Duration
EnableDependencyQueueing bool
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
}

Expand Down Expand Up @@ -142,7 +143,7 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl
r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName)
r.artifactFetchRetries = opts.HTTPRetry

return ctrl.NewControllerManagedBy(mgr).
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
)).
Expand All @@ -160,7 +161,24 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl
&sourcev1.Bucket{},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(bucketIndexKey)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
)

if opts.EnableDependencyQueueing {
// Index the Kustomizations by the dependsOn references they (may) point at.
if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, dependsOnIndexKey,
r.indexDependsOn); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}

controllerBuilder = controllerBuilder.
Watches(
&kustomizev1.Kustomization{},
handler.EnqueueRequestsFromMapFunc(r.requestsForDependents),
builder.WithPredicates(KustomizationReadyChangePredicate{}),
)
}

return controllerBuilder.
WithOptions(controller.Options{
RateLimiter: opts.RateLimiter,
}).
Expand Down Expand Up @@ -272,7 +290,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if err := r.checkDependencies(ctx, obj, artifactSource); err != nil {
conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
log.Info(msg)
log.Info(msg, "err", err)
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
Expand Down
23 changes: 21 additions & 2 deletions internal/controller/kustomization_dependson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ spec:
},
}

dependencyKey := types.NamespacedName{
Name: fmt.Sprintf("dep-%s", randStringRunes(5)),
Namespace: id,
}
dependencyKs := kustomization.DeepCopy()
dependencyKs.ObjectMeta.Name = dependencyKey.Name
dependencyKs.ObjectMeta.Namespace = dependencyKey.Namespace

g.Expect(k8sClient.Create(context.Background(), kustomization)).To(Succeed())

resultK := &kustomizev1.Kustomization{}
Expand Down Expand Up @@ -170,8 +178,8 @@ spec:
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
resultK.Spec.DependsOn = []meta.NamespacedObjectReference{
{
Namespace: id,
Name: "root",
Namespace: dependencyKey.Namespace,
Name: dependencyKey.Name,
},
}
return k8sClient.Update(context.Background(), resultK)
Expand All @@ -183,4 +191,15 @@ spec:
return ready.Reason == meta.DependencyNotReadyReason
}, timeout, time.Second).Should(BeTrue())
})

t.Run("reconciles once dependency becomes ready", func(t *testing.T) {
g := NewWithT(t)
g.Expect(k8sClient.Create(context.Background(), dependencyKs)).To(Succeed())

g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
ready := apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition)
return ready.Reason == meta.ReconciliationSucceededReason
}, timeout, time.Second).Should(BeTrue())
})
}
72 changes: 72 additions & 0 deletions internal/controller/kustomization_indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@ import (
"fmt"

"github.com/fluxcd/pkg/runtime/conditions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/dependency"
sourcev1 "github.com/fluxcd/source-controller/api/v1"

kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
)

const (
dependsOnIndexKey string = ".metadata.dependsOn"
)

func (r *KustomizationReconciler) requestsForRevisionChangeOf(indexKey string) handler.MapFunc {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -78,6 +85,53 @@ func (r *KustomizationReconciler) requestsForRevisionChangeOf(indexKey string) h
}
}

func isNotReadyForDependency(k *kustomizev1.Kustomization) bool {
c := conditions.Get(k, meta.ReadyCondition)
if c == nil {
return false
}
return c.Status == metav1.ConditionFalse && c.Reason == meta.DependencyNotReadyReason
}

func (r *KustomizationReconciler) requestsForDependents(ctx context.Context, obj client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)

var list kustomizev1.KustomizationList
if err := r.List(ctx, &list, client.MatchingFields{
dependsOnIndexKey: client.ObjectKeyFromObject(obj).String(),
}); err != nil {
log.Error(err, "failed to list objects for dependency change")
return nil
}
var dd []dependency.Dependent
for _, d := range list.Items {
if isNotReadyForDependency(&d) {
dd = append(dd, &d)
}
}
sorted, err := dependency.Sort(dd)
if err != nil {
log.Error(err, "failed to sort dependents for dependency change")
return nil
}
reqs := make([]reconcile.Request, 0, len(sorted))
debugLog := log.V(1).WithValues("dependency", map[string]string{
"name": obj.GetName(),
"namespace": obj.GetNamespace(),
})
for _, d := range sorted {
debugLog.Info("requesting reconciliation of dependent", "dependent", map[string]string{
"name": d.Name,
"namespace": d.Namespace,
})
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{
Name: d.Name,
Namespace: d.Namespace,
}})
}
return reqs
}

func (r *KustomizationReconciler) indexBy(kind string) func(o client.Object) []string {
return func(o client.Object) []string {
k, ok := o.(*kustomizev1.Kustomization)
Expand All @@ -96,3 +150,21 @@ func (r *KustomizationReconciler) indexBy(kind string) func(o client.Object) []s
return nil
}
}

func (r *KustomizationReconciler) indexDependsOn(o client.Object) []string {
k, ok := o.(*kustomizev1.Kustomization)
if !ok {
panic(fmt.Sprintf("Expected a Kustomization, got %T", o))
}

deps := make([]string, len(k.Spec.DependsOn))
for i, dep := range k.Spec.DependsOn {
namespace := k.GetNamespace()
if dep.Namespace != "" {
namespace = dep.Namespace
}
deps[i] = fmt.Sprintf("%s/%s", namespace, dep.Name)
}

return deps
}
1 change: 1 addition & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func TestMain(m *testing.M) {
}
if err := (reconciler).SetupWithManager(ctx, testEnv, KustomizationReconcilerOptions{
DependencyRequeueInterval: 2 * time.Second,
EnableDependencyQueueing: true,
Comment on lines 187 to +188
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DependencyRequeueInterval: 2 * time.Second,
EnableDependencyQueueing: true,
DependencyRequeueInterval: time.Minute,
EnableDependencyQueueing: true,

Let's set here the requeue interval to 1m, this should cause the test to fail if the watcher doesn't work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I was trying to write above (#1412 (comment)), the requeue interval is currently used also for retries that are not related to readiness.

For example the test TestKustomizationReconciler_ArtifactDownload/recovers_after_not_found_errors fails with that, because it is explicitly setting some "invalid" statuses on the resources, that cannot be covered by the predicates that are used to filter watchers.
It is possible to change the test to simulate the conditions that would match the readiness predicates...
... but I suppose it is better to change the controller logic to avoid mixing retries due to unexpected errors together with expected (watchable) non-ready states.

In my opinion those kind of retries should be handled either with delay of obj.GetRetryInterval(), or left as return ..{}, err for the runtime framework to handle.

}); err != nil {
panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err))
}
Expand Down
7 changes: 7 additions & 0 deletions internal/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ const (
// GroupChangelog controls groups kubernetes objects names on log output
// reduces cardinality of logs when logging to elasticsearch
GroupChangeLog = "GroupChangeLog"

// EnableDependencyQueueing controls whether reconciliation of a kustomization
// should be queued once one of its dependencies becomes ready, or if only
// time-based retries with reque-dependecy delays should be attempted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// time-based retries with reque-dependecy delays should be attempted
// time-based retries with requeue-dependency delays should be attempted

EnableDependencyQueueing = "EnableDependencyQueueing"
)

var features = map[string]bool{
Expand All @@ -66,6 +71,8 @@ var features = map[string]bool{
// GroupChangeLog
// opt-in from v1.5
GroupChangeLog: false,
// EnableDependencyQueueing
EnableDependencyQueueing: false,
Comment on lines +74 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// EnableDependencyQueueing
EnableDependencyQueueing: false,
// EnableDependencyQueueing
// opt-in from v1.6
EnableDependencyQueueing: false,

}

// FeatureGates contains a list of all supported feature gates and
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ func main() {
os.Exit(1)
}

enableDependencyQueueing, err := features.Enabled(features.EnableDependencyQueueing)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.EnableDependencyQueueing)
os.Exit(1)
}

if err = (&controller.KustomizationReconciler{
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
Expand All @@ -259,6 +265,7 @@ func main() {
GroupChangeLog: groupChangeLog,
}).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{
DependencyRequeueInterval: requeueDependency,
EnableDependencyQueueing: enableDependencyQueueing,
HTTPRetry: httpRetry,
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
}); err != nil {
Expand Down
Loading