Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ require (
github.com/fluxcd/pkg/apis/acl v0.9.0
github.com/fluxcd/pkg/apis/event v0.20.0
github.com/fluxcd/pkg/apis/kustomize v1.13.0
github.com/fluxcd/pkg/apis/meta v1.22.0
github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab
github.com/fluxcd/pkg/auth v0.32.0
github.com/fluxcd/pkg/cache v0.12.0
github.com/fluxcd/pkg/http/fetch v0.20.0
github.com/fluxcd/pkg/kustomize v1.23.0
github.com/fluxcd/pkg/runtime v0.88.0
github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab
github.com/fluxcd/pkg/ssa v0.60.0
github.com/fluxcd/pkg/tar v0.15.0
github.com/fluxcd/pkg/testserver v0.13.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ github.com/fluxcd/pkg/apis/event v0.20.0 h1:Vxd1kkS/CsQNPHTbmlL4qOcCmUmavEtaEOod
github.com/fluxcd/pkg/apis/event v0.20.0/go.mod h1:wyY+8BHicfFP7sXzhMrKpZTQeojCsSpK9idAidjv61c=
github.com/fluxcd/pkg/apis/kustomize v1.13.0 h1:GGf0UBVRIku+gebY944icVeEIhyg1P/KE3IrhOyJJnE=
github.com/fluxcd/pkg/apis/kustomize v1.13.0/go.mod h1:TLKVqbtnzkhDuhWnAsN35977HvRfIjs+lgMuNro/LEc=
github.com/fluxcd/pkg/apis/meta v1.22.0 h1:EHWQH5ZWml7i8eZ/AMjm1jxid3j/PQ31p+hIwCt6crM=
github.com/fluxcd/pkg/apis/meta v1.22.0/go.mod h1:Kc1+bWe5p0doROzuV9XiTfV/oL3ddsemYXt8ZYWdVVg=
github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab h1:qV92SLmuKonakYhMSTX3//eRaJHVfCAGcmbdb/7tBrA=
github.com/fluxcd/pkg/apis/meta v1.22.1-0.20251010220540-14b66b2de1ab/go.mod h1:Kc1+bWe5p0doROzuV9XiTfV/oL3ddsemYXt8ZYWdVVg=
github.com/fluxcd/pkg/auth v0.32.0 h1:D0RkbWlT2gqcEaEr6GLnm1XP1KDIYQI8zWzuZVnsE5I=
github.com/fluxcd/pkg/auth v0.32.0/go.mod h1:Yhe6p3/wTUj80yrOqhpsbA48hQRM14OKwo3Qr4199XM=
github.com/fluxcd/pkg/cache v0.12.0 h1:mabABT3jIfuo84VbIW+qvfqMZ7PbM5tXQgQvA2uo2rc=
Expand All @@ -206,8 +206,8 @@ github.com/fluxcd/pkg/http/fetch v0.20.0 h1:/Lvcu1JzABBLuQYuLKYh1K02a+RqbP4b5wIZ
github.com/fluxcd/pkg/http/fetch v0.20.0/go.mod h1:9inwDiGOpuo14Rp06TpcgsYSkvp4YM+uWCjgDmpXMNk=
github.com/fluxcd/pkg/kustomize v1.23.0 h1:4tNh30OsIj96YRfVP7qP0Fv3QTwdBo/udfZIcccL6NI=
github.com/fluxcd/pkg/kustomize v1.23.0/go.mod h1:ZojUvmI4RiHk3BH3L3mBQ4ZDbNkiWfX9LvOMBjKq5Tc=
github.com/fluxcd/pkg/runtime v0.88.0 h1:EFPJ0jnRino6yUEwiNtQTpUNyCf96N2MJb+S7LVG648=
github.com/fluxcd/pkg/runtime v0.88.0/go.mod h1:qkmPX009tgiWufQ2Vj0QhyNgEU+0Cnz7Xy/naihLM10=
github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab h1:MuF0vVykSXQoVP9vTYjP6xUoj41TUhuIXyL8NQG5fJU=
github.com/fluxcd/pkg/runtime v0.89.1-0.20251010220540-14b66b2de1ab/go.mod h1:qkmPX009tgiWufQ2Vj0QhyNgEU+0Cnz7Xy/naihLM10=
github.com/fluxcd/pkg/sourceignore v0.15.0 h1:tB30fuk4jlB3UGlR7ppJguZ3zaJh1iwuTCEufs91jSM=
github.com/fluxcd/pkg/sourceignore v0.15.0/go.mod h1:mZ9X6gNtNkq9ZsD35LebEYjePc7DRvB2JdowMNoj6IU=
github.com/fluxcd/pkg/ssa v0.60.0 h1:ikA78TWSLDmIc8I/goGAU/buYF6jto/gswE5hnOfWGk=
Expand Down
63 changes: 25 additions & 38 deletions internal/controller/kustomization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,11 @@ type KustomizationReconciler struct {

// Feature gates

AdditiveCELDependencyCheck bool
AllowExternalArtifact bool
CancelHealthCheckOnNewRevision bool
FailFast bool
GroupChangeLog bool
StrictSubstitutions bool
AdditiveCELDependencyCheck bool
AllowExternalArtifact bool
FailFast bool
GroupChangeLog bool
StrictSubstitutions bool
}

func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
Expand Down Expand Up @@ -271,6 +270,17 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil
}

// Handle health check cancellation.
if errors.Is(reconcileErr, &runtimeCtrl.QueueEventSource{}) {
qes := reconcileErr.(*runtimeCtrl.QueueEventSource)
ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes)
conditions.MarkFalse(obj,
meta.ReadyCondition,
meta.HealthCheckCanceledReason,
"New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name)
return ctrl.Result{}, nil
}

// Broadcast the reconciliation failure and requeue at the specified retry interval.
if reconcileErr != nil {
log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s",
Expand Down Expand Up @@ -498,6 +508,11 @@ func (r *KustomizationReconciler) reconcile(
isNewRevision,
drifted,
changeSet.ToObjMetadataSet()); err != nil {

if errors.Is(err, &runtimeCtrl.QueueEventSource{}) {
return err
}

obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.HealthCheckFailedReason, historyMeta)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
return err
Expand Down Expand Up @@ -984,43 +999,15 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
}

// Check the health with a default timeout of 30sec shorter than the reconciliation interval.
healthCtx := ctx
if r.CancelHealthCheckOnNewRevision {
// Create a cancellable context for health checks that monitors for new revisions
var cancel context.CancelFunc
healthCtx, cancel = context.WithCancel(ctx)
defer cancel()

// Start monitoring for new revisions to allow early cancellation
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
// Get the latest source artifact
latestSrc, err := r.getSource(ctx, obj)
if err == nil && latestSrc.GetArtifact() != nil {
if newRevision := latestSrc.GetArtifact().Revision; newRevision != revision {
const msg = "New revision detected during health check, cancelling"
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
ctrl.LoggerFrom(ctx).Info(msg, "current", revision, "new", newRevision)
cancel()
return
}
}
}
}
}()
}
healthCtx := runtimeCtrl.GetInterruptContext(ctx)
if err := manager.WaitForSetWithContext(healthCtx, toCheck, ssa.WaitOptions{
Interval: 5 * time.Second,
Timeout: obj.GetTimeout(),
FailFast: r.FailFast,
}); err != nil {
if is, err := runtimeCtrl.IsObjectEnqueued(ctx); is {
return err
}
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
conditions.MarkFalse(obj, meta.HealthyCondition, meta.HealthCheckFailedReason, "%s", err)
return fmt.Errorf("health check failed after %s: %w", time.Since(checkStart).String(), err)
Expand Down
53 changes: 38 additions & 15 deletions internal/controller/kustomization_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

runtimeCtrl "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1"

Expand All @@ -38,9 +39,10 @@ import (

// KustomizationReconcilerOptions contains options for the KustomizationReconciler.
type KustomizationReconcilerOptions struct {
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
WatchConfigsPredicate predicate.Predicate
WatchExternalArtifacts bool
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
WatchConfigsPredicate predicate.Predicate
WatchExternalArtifacts bool
CancelHealthCheckOnNewRevision bool
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -129,43 +131,64 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl
return fmt.Errorf("failed creating index %s: %w", indexSecret, err)
}

ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
)).
var blder *builder.Builder
var toComplete reconcile.TypedReconciler[reconcile.Request]
var enqueueRequestsFromMapFunc func(objKind string, fn handler.MapFunc) handler.EventHandler

ksPredicate := predicate.Or(
predicate.GenerationChangedPredicate{},
predicates.ReconcileRequestedPredicate{},
)

if !opts.CancelHealthCheckOnNewRevision {
toComplete = r
enqueueRequestsFromMapFunc = func(objKind string, fn handler.MapFunc) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(fn)
}
blder = ctrl.NewControllerManagedBy(mgr).
For(&kustomizev1.Kustomization{}, builder.WithPredicates(ksPredicate))
} else {
wr := runtimeCtrl.WrapReconciler(r)
toComplete = wr
enqueueRequestsFromMapFunc = wr.EnqueueRequestsFromMapFunc
blder = runtimeCtrl.NewControllerManagedBy(mgr, wr).
For(&kustomizev1.Kustomization{}, ksPredicate).Builder
}

blder.
Watches(
&sourcev1.OCIRepository{},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexOCIRepository)),
enqueueRequestsFromMapFunc(sourcev1.OCIRepositoryKind, r.requestsForRevisionChangeOf(indexOCIRepository)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
Watches(
&sourcev1.GitRepository{},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexGitRepository)),
enqueueRequestsFromMapFunc(sourcev1.GitRepositoryKind, r.requestsForRevisionChangeOf(indexGitRepository)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
Watches(
&sourcev1.Bucket{},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexBucket)),
enqueueRequestsFromMapFunc(sourcev1.BucketKind, r.requestsForRevisionChangeOf(indexBucket)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
WatchesMetadata(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexConfigMap)),
enqueueRequestsFromMapFunc("ConfigMap", r.requestsForConfigDependency(indexConfigMap)),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate),
).
WatchesMetadata(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexSecret)),
enqueueRequestsFromMapFunc("Secret", r.requestsForConfigDependency(indexSecret)),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate),
)

if opts.WatchExternalArtifacts {
ctrlBuilder = ctrlBuilder.Watches(
blder = blder.Watches(
&sourcev1.ExternalArtifact{},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexExternalArtifact)),
enqueueRequestsFromMapFunc(sourcev1.ExternalArtifactKind, r.requestsForRevisionChangeOf(indexExternalArtifact)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
)
}

return ctrlBuilder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(r)
return blder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(toComplete)
}
14 changes: 0 additions & 14 deletions internal/controller/kustomization_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,6 @@ func TestKustomizationReconciler_CancelHealthCheckOnNewRevision(t *testing.T) {
resultK := &kustomizev1.Kustomization{}
timeout := 60 * time.Second

reconciler.CancelHealthCheckOnNewRevision = true
t.Cleanup(func() { reconciler.CancelHealthCheckOnNewRevision = false })

err := createNamespace(id)
g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace")

Expand Down Expand Up @@ -616,15 +613,4 @@ spec:
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK)
return resultK.Status.LastAttemptedRevision == "main/"+fixedArtifact
}, timeout, time.Second).Should(BeTrue())

// Check cancellation event was emitted
events := getEvents(resultK.GetName(), nil)
var found bool
for _, e := range events {
if e.Message == "New revision detected during health check, cancelling" {
found = true
break
}
}
g.Expect(found).To(BeTrue(), "did not find event for health check cancellation")
}
5 changes: 3 additions & 2 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ func TestMain(m *testing.M) {
SOPSAgeSecret: sopsAgeSecret,
}
if err := (reconciler).SetupWithManager(ctx, testEnv, KustomizationReconcilerOptions{
WatchConfigsPredicate: predicate.Not(predicate.Funcs{}),
WatchExternalArtifacts: true,
WatchConfigsPredicate: predicate.Not(predicate.Funcs{}),
WatchExternalArtifacts: true,
CancelHealthCheckOnNewRevision: true,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to have tests for both true and false here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the lines you just deleted from the test:

	reconciler.CancelHealthCheckOnNewRevision = true
	t.Cleanup(func() { reconciler.CancelHealthCheckOnNewRevision = false })

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the boolean moving from the reconciler to the SetupWithManager options

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, the test framework we have in place does not cope with this, you can not rebuild the controller once it has started.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a separate package like we did before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe test only true since ideally we would not have this feature gate. If this works well we plan to make it opt-out later, and possibly even ignore it with a warning after even more time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah enabling it in tests should be Ok. The e2e test suite has the default gate state, so we cover both cases.

}); err != nil {
panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err))
}
Expand Down
54 changes: 27 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,34 +313,34 @@ func main() {
}

if err = (&controller.KustomizationReconciler{
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
AllowExternalArtifact: allowExternalArtifact,
CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision,
APIReader: mgr.GetAPIReader(),
ArtifactFetchRetries: httpRetry,
Client: mgr.GetClient(),
ClusterReader: clusterReader,
ConcurrentSSA: concurrentSSA,
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
DependencyRequeueInterval: requeueDependency,
DisallowedFieldManagers: disallowedFieldManagers,
EventRecorder: eventRecorder,
FailFast: failFast,
GroupChangeLog: groupChangeLog,
KubeConfigOpts: kubeConfigOpts,
Mapper: restMapper,
Metrics: metricsH,
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
NoRemoteBases: noRemoteBases,
SOPSAgeSecret: sopsAgeSecret,
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
StrictSubstitutions: strictSubstitutions,
TokenCache: tokenCache,
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
AllowExternalArtifact: allowExternalArtifact,
APIReader: mgr.GetAPIReader(),
ArtifactFetchRetries: httpRetry,
Client: mgr.GetClient(),
ClusterReader: clusterReader,
ConcurrentSSA: concurrentSSA,
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
DependencyRequeueInterval: requeueDependency,
DisallowedFieldManagers: disallowedFieldManagers,
EventRecorder: eventRecorder,
FailFast: failFast,
GroupChangeLog: groupChangeLog,
KubeConfigOpts: kubeConfigOpts,
Mapper: restMapper,
Metrics: metricsH,
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
NoRemoteBases: noRemoteBases,
SOPSAgeSecret: sopsAgeSecret,
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
StrictSubstitutions: strictSubstitutions,
TokenCache: tokenCache,
}).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
WatchConfigsPredicate: watchConfigsPredicate,
WatchExternalArtifacts: allowExternalArtifact,
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
WatchConfigsPredicate: watchConfigsPredicate,
WatchExternalArtifacts: allowExternalArtifact,
CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", controllerName)
os.Exit(1)
Expand Down