Skip to content

Commit a86699a

Browse files
authored
Merge pull request #1536 from fluxcd/cancel-hc-requeue
Cancel health checks on new reconciliation request
2 parents 0b68a45 + 00f24ff commit a86699a

File tree

7 files changed

+128
-103
lines changed

7 files changed

+128
-103
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ require (
2323
github.com/fluxcd/pkg/apis/acl v0.9.0
2424
github.com/fluxcd/pkg/apis/event v0.21.0
2525
github.com/fluxcd/pkg/apis/kustomize v1.14.0
26-
github.com/fluxcd/pkg/apis/meta v1.23.0
26+
github.com/fluxcd/pkg/apis/meta v1.24.0
2727
github.com/fluxcd/pkg/auth v0.33.0
2828
github.com/fluxcd/pkg/cache v0.12.0
2929
github.com/fluxcd/pkg/http/fetch v0.21.0
3030
github.com/fluxcd/pkg/kustomize v1.24.0
31-
github.com/fluxcd/pkg/runtime v0.91.0
31+
github.com/fluxcd/pkg/runtime v0.93.0
3232
github.com/fluxcd/pkg/ssa v0.61.0
3333
github.com/fluxcd/pkg/tar v0.16.0
3434
github.com/fluxcd/pkg/testserver v0.13.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ github.com/fluxcd/pkg/apis/event v0.21.0 h1:VVl0WmgDXJwDS3Pivkk+31h3fWHbq+BpbNLU
194194
github.com/fluxcd/pkg/apis/event v0.21.0/go.mod h1:jacQdE6DdxoBsUOLMzEZNtpd4TqtYaiH1DWoyHMSUSo=
195195
github.com/fluxcd/pkg/apis/kustomize v1.14.0 h1:PmWqMpRX0v7/aCAUNWfohe4o1qa9G3Cg/vVr5PCedI4=
196196
github.com/fluxcd/pkg/apis/kustomize v1.14.0/go.mod h1:CGRpU9Od4ht5+MHL6QlMfWaW87U9UTfGVM5CM4PZ28I=
197-
github.com/fluxcd/pkg/apis/meta v1.23.0 h1:fLis5YcHnOsyKYptzBtituBm5EWNx13I0bXQsy0FG4s=
198-
github.com/fluxcd/pkg/apis/meta v1.23.0/go.mod h1:UWsIbBPCxYvoVklr2mV2uLFBf/n17dNAmKFjRfApdDo=
197+
github.com/fluxcd/pkg/apis/meta v1.24.0 h1:+e33T4OL9oqMWZSltsgImvi+/Punx42X9NqFlPesH6o=
198+
github.com/fluxcd/pkg/apis/meta v1.24.0/go.mod h1:UWsIbBPCxYvoVklr2mV2uLFBf/n17dNAmKFjRfApdDo=
199199
github.com/fluxcd/pkg/auth v0.33.0 h1:3ccwqpBr8uWEQgl15b7S0PwJ9EgtcKObg4J1jnaof2w=
200200
github.com/fluxcd/pkg/auth v0.33.0/go.mod h1:ZAFC8pNZxhe+7RV2cQO1K9X62HM8BbRBnCE118oY/0A=
201201
github.com/fluxcd/pkg/cache v0.12.0 h1:mabABT3jIfuo84VbIW+qvfqMZ7PbM5tXQgQvA2uo2rc=
@@ -206,8 +206,8 @@ github.com/fluxcd/pkg/http/fetch v0.21.0 h1:/vHWc+3BIk9q5HFA8khl0NEBb/XFXzZOqpnU
206206
github.com/fluxcd/pkg/http/fetch v0.21.0/go.mod h1:aFUPa2DLpUHE/dXkhIdaHakVIiZ6GVCWvp5tWkDKSEM=
207207
github.com/fluxcd/pkg/kustomize v1.24.0 h1:ckFB7hh9FpJA1Oy3bYl88p9On/zsZZTbwlLBgP6eUkA=
208208
github.com/fluxcd/pkg/kustomize v1.24.0/go.mod h1:cydG0vKpDuUaoP5STpKfxY3zqgzaARv5HsWDOFyt5nA=
209-
github.com/fluxcd/pkg/runtime v0.91.0 h1:Z92sOLsJXa+0RIi/vNl87zF5qnsBUdOb60d2a0b4Ulo=
210-
github.com/fluxcd/pkg/runtime v0.91.0/go.mod h1:D/gUsaSpyw6Od2QEL7MELi5m+oUmwokuxUVZ+vKQxdo=
209+
github.com/fluxcd/pkg/runtime v0.93.0 h1:fgd1O1xC7RRK5XJjBCocgg6MkDHS56Q73F0h5qCIvVk=
210+
github.com/fluxcd/pkg/runtime v0.93.0/go.mod h1:/E4dT1pdSkidyRTR5ghSzoyHEUcEJw3ipvJt597ArOA=
211211
github.com/fluxcd/pkg/sourceignore v0.15.0 h1:tB30fuk4jlB3UGlR7ppJguZ3zaJh1iwuTCEufs91jSM=
212212
github.com/fluxcd/pkg/sourceignore v0.15.0/go.mod h1:mZ9X6gNtNkq9ZsD35LebEYjePc7DRvB2JdowMNoj6IU=
213213
github.com/fluxcd/pkg/ssa v0.61.0 h1:GeueQfZVrjPLEzmEkq6gpFTBr1MDcqUihCQDf6AaIo8=

internal/controller/kustomization_controller.go

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,11 @@ type KustomizationReconciler struct {
114114

115115
// Feature gates
116116

117-
AdditiveCELDependencyCheck bool
118-
AllowExternalArtifact bool
119-
CancelHealthCheckOnNewRevision bool
120-
FailFast bool
121-
GroupChangeLog bool
122-
StrictSubstitutions bool
117+
AdditiveCELDependencyCheck bool
118+
AllowExternalArtifact bool
119+
FailFast bool
120+
GroupChangeLog bool
121+
StrictSubstitutions bool
123122
}
124123

125124
func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
@@ -271,6 +270,19 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
271270
return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil
272271
}
273272

273+
// Handle health check cancellation.
274+
if qes := new(runtimeCtrl.QueueEventSource); errors.As(reconcileErr, &qes) {
275+
conditions.MarkFalse(obj,
276+
meta.ReadyCondition,
277+
meta.HealthCheckCanceledReason,
278+
"New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name)
279+
ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes)
280+
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo,
281+
fmt.Sprintf("Health checks canceled due to new reconciliation triggered by %s/%s/%s",
282+
qes.Kind, qes.Namespace, qes.Name), nil)
283+
return ctrl.Result{}, nil
284+
}
285+
274286
// Broadcast the reconciliation failure and requeue at the specified retry interval.
275287
if reconcileErr != nil {
276288
log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s",
@@ -504,6 +516,11 @@ func (r *KustomizationReconciler) reconcile(
504516
isNewRevision,
505517
drifted,
506518
changeSet.ToObjMetadataSet()); err != nil {
519+
520+
if errors.Is(err, &runtimeCtrl.QueueEventSource{}) {
521+
return err
522+
}
523+
507524
obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.HealthCheckFailedReason, historyMeta)
508525
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
509526
return err
@@ -990,43 +1007,15 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
9901007
}
9911008

9921009
// Check the health with a default timeout of 30sec shorter than the reconciliation interval.
993-
healthCtx := ctx
994-
if r.CancelHealthCheckOnNewRevision {
995-
// Create a cancellable context for health checks that monitors for new revisions
996-
var cancel context.CancelFunc
997-
healthCtx, cancel = context.WithCancel(ctx)
998-
defer cancel()
999-
1000-
// Start monitoring for new revisions to allow early cancellation
1001-
go func() {
1002-
ticker := time.NewTicker(5 * time.Second)
1003-
defer ticker.Stop()
1004-
1005-
for {
1006-
select {
1007-
case <-healthCtx.Done():
1008-
return
1009-
case <-ticker.C:
1010-
// Get the latest source artifact
1011-
latestSrc, err := r.getSource(ctx, obj)
1012-
if err == nil && latestSrc.GetArtifact() != nil {
1013-
if newRevision := latestSrc.GetArtifact().Revision; newRevision != revision {
1014-
const msg = "New revision detected during health check, cancelling"
1015-
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
1016-
ctrl.LoggerFrom(ctx).Info(msg, "current", revision, "new", newRevision)
1017-
cancel()
1018-
return
1019-
}
1020-
}
1021-
}
1022-
}
1023-
}()
1024-
}
1010+
healthCtx := runtimeCtrl.GetInterruptContext(ctx)
10251011
if err := manager.WaitForSetWithContext(healthCtx, toCheck, ssa.WaitOptions{
10261012
Interval: 5 * time.Second,
10271013
Timeout: obj.GetTimeout(),
10281014
FailFast: r.FailFast,
10291015
}); err != nil {
1016+
if is, err := runtimeCtrl.IsObjectEnqueued(ctx); is {
1017+
return err
1018+
}
10301019
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
10311020
conditions.MarkFalse(obj, meta.HealthyCondition, meta.HealthCheckFailedReason, "%s", err)
10321021
return fmt.Errorf("health check failed after %s: %w", time.Since(checkStart).String(), err)
@@ -1223,12 +1212,12 @@ func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
12231212
reason = r
12241213
}
12251214

1226-
eventtype := "Normal"
1215+
eventType := corev1.EventTypeNormal
12271216
if severity == eventv1.EventSeverityError {
1228-
eventtype = "Warning"
1217+
eventType = corev1.EventTypeWarning
12291218
}
12301219

1231-
r.EventRecorder.AnnotatedEventf(obj, metadata, eventtype, reason, msg)
1220+
r.EventRecorder.AnnotatedEventf(obj, metadata, eventType, reason, msg)
12321221
}
12331222

12341223
func (r *KustomizationReconciler) finalizeStatus(ctx context.Context,

internal/controller/kustomization_manager.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/predicate"
3131
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3232

33+
runtimeCtrl "github.com/fluxcd/pkg/runtime/controller"
3334
"github.com/fluxcd/pkg/runtime/predicates"
3435
sourcev1 "github.com/fluxcd/source-controller/api/v1"
3536

@@ -38,10 +39,11 @@ import (
3839

3940
// KustomizationReconcilerOptions contains options for the KustomizationReconciler.
4041
type KustomizationReconcilerOptions struct {
41-
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
42-
WatchConfigs bool
43-
WatchConfigsPredicate predicate.Predicate
44-
WatchExternalArtifacts bool
42+
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
43+
WatchConfigs bool
44+
WatchConfigsPredicate predicate.Predicate
45+
WatchExternalArtifacts bool
46+
CancelHealthCheckOnRequeue bool
4547
}
4648

4749
// SetupWithManager sets up the controller with the Manager.
@@ -130,47 +132,68 @@ func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl
130132
return fmt.Errorf("failed creating index %s: %w", indexSecret, err)
131133
}
132134

133-
ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
134-
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
135-
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
136-
)).
135+
var blder *builder.Builder
136+
var toComplete reconcile.TypedReconciler[reconcile.Request]
137+
var enqueueRequestsFromMapFunc func(objKind string, fn handler.MapFunc) handler.EventHandler
138+
139+
ksPredicate := predicate.Or(
140+
predicate.GenerationChangedPredicate{},
141+
predicates.ReconcileRequestedPredicate{},
142+
)
143+
144+
if !opts.CancelHealthCheckOnRequeue {
145+
toComplete = r
146+
enqueueRequestsFromMapFunc = func(objKind string, fn handler.MapFunc) handler.EventHandler {
147+
return handler.EnqueueRequestsFromMapFunc(fn)
148+
}
149+
blder = ctrl.NewControllerManagedBy(mgr).
150+
For(&kustomizev1.Kustomization{}, builder.WithPredicates(ksPredicate))
151+
} else {
152+
wr := runtimeCtrl.WrapReconciler(r)
153+
toComplete = wr
154+
enqueueRequestsFromMapFunc = wr.EnqueueRequestsFromMapFunc
155+
blder = runtimeCtrl.NewControllerManagedBy(mgr, wr).
156+
For(&kustomizev1.Kustomization{}, ksPredicate).Builder
157+
}
158+
159+
blder.
137160
Watches(
138161
&sourcev1.OCIRepository{},
139-
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexOCIRepository)),
162+
enqueueRequestsFromMapFunc(sourcev1.OCIRepositoryKind, r.requestsForRevisionChangeOf(indexOCIRepository)),
140163
builder.WithPredicates(SourceRevisionChangePredicate{}),
141164
).
142165
Watches(
143166
&sourcev1.GitRepository{},
144-
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexGitRepository)),
167+
enqueueRequestsFromMapFunc(sourcev1.GitRepositoryKind, r.requestsForRevisionChangeOf(indexGitRepository)),
145168
builder.WithPredicates(SourceRevisionChangePredicate{}),
146169
).
147170
Watches(
148171
&sourcev1.Bucket{},
149-
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexBucket)),
172+
enqueueRequestsFromMapFunc(sourcev1.BucketKind, r.requestsForRevisionChangeOf(indexBucket)),
150173
builder.WithPredicates(SourceRevisionChangePredicate{}),
151174
)
152175

153176
if opts.WatchConfigs {
154-
ctrlBuilder = ctrlBuilder.
177+
blder = blder.
155178
WatchesMetadata(
156179
&corev1.ConfigMap{},
157-
handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexConfigMap)),
180+
enqueueRequestsFromMapFunc("ConfigMap", r.requestsForConfigDependency(indexConfigMap)),
158181
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate),
159182
).
160183
WatchesMetadata(
161184
&corev1.Secret{},
162-
handler.EnqueueRequestsFromMapFunc(r.requestsForConfigDependency(indexSecret)),
185+
enqueueRequestsFromMapFunc("Secret", r.requestsForConfigDependency(indexSecret)),
163186
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}, opts.WatchConfigsPredicate),
164187
)
165188
}
166189

167190
if opts.WatchExternalArtifacts {
168-
ctrlBuilder = ctrlBuilder.Watches(
191+
blder = blder.Watches(
169192
&sourcev1.ExternalArtifact{},
170-
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(indexExternalArtifact)),
193+
enqueueRequestsFromMapFunc(sourcev1.ExternalArtifactKind, r.requestsForRevisionChangeOf(indexExternalArtifact)),
171194
builder.WithPredicates(SourceRevisionChangePredicate{}),
172195
)
173196
}
174197

175-
return ctrlBuilder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(r)
198+
return blder.WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).Complete(toComplete)
176199
}

internal/controller/kustomization_wait_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
runtimeClient "github.com/fluxcd/pkg/runtime/client"
2626
. "github.com/onsi/gomega"
27+
corev1 "k8s.io/api/core/v1"
2728
apierrors "k8s.io/apimachinery/pkg/api/errors"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -473,9 +474,6 @@ func TestKustomizationReconciler_CancelHealthCheckOnNewRevision(t *testing.T) {
473474
resultK := &kustomizev1.Kustomization{}
474475
timeout := 60 * time.Second
475476

476-
reconciler.CancelHealthCheckOnNewRevision = true
477-
t.Cleanup(func() { reconciler.CancelHealthCheckOnNewRevision = false })
478-
479477
err := createNamespace(id)
480478
g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace")
481479

@@ -617,14 +615,28 @@ spec:
617615
return resultK.Status.LastAttemptedRevision == "main/"+fixedArtifact
618616
}, timeout, time.Second).Should(BeTrue())
619617

620-
// Check cancellation event was emitted
618+
// Verify the HealthCheckCanceled event was emitted.
619+
g.Eventually(func() bool {
620+
events := getEvents(resultK.GetName(), nil)
621+
for _, event := range events {
622+
if event.Reason == meta.HealthCheckCanceledReason {
623+
t.Logf("Found HealthCheckCanceled event: %s", event.Message)
624+
return true
625+
}
626+
}
627+
return false
628+
}, timeout, time.Second).Should(BeTrue(), "HealthCheckCanceled event should be recorded")
629+
630+
// Verify the event message indicates the trigger source.
621631
events := getEvents(resultK.GetName(), nil)
622-
var found bool
623-
for _, e := range events {
624-
if e.Message == "New revision detected during health check, cancelling" {
625-
found = true
632+
var cancelEvent *corev1.Event
633+
for i := range events {
634+
if events[i].Reason == meta.HealthCheckCanceledReason {
635+
cancelEvent = &events[i]
626636
break
627637
}
628638
}
629-
g.Expect(found).To(BeTrue(), "did not find event for health check cancellation")
639+
g.Expect(cancelEvent).ToNot(BeNil())
640+
g.Expect(cancelEvent.Message).To(ContainSubstring("Health checks canceled"))
641+
g.Expect(cancelEvent.Message).To(ContainSubstring("GitRepository"))
630642
}

internal/controller/suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,9 @@ func TestMain(m *testing.M) {
187187
SOPSAgeSecret: sopsAgeSecret,
188188
}
189189
if err := (reconciler).SetupWithManager(ctx, testEnv, KustomizationReconcilerOptions{
190-
WatchConfigsPredicate: predicate.Not(predicate.Funcs{}),
191-
WatchExternalArtifacts: true,
190+
WatchConfigsPredicate: predicate.Not(predicate.Funcs{}),
191+
WatchExternalArtifacts: true,
192+
CancelHealthCheckOnRequeue: true,
192193
}); err != nil {
193194
panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err))
194195
}

main.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -320,35 +320,35 @@ func main() {
320320
watchConfigs := !disableConfigWatchers
321321

322322
if err = (&controller.KustomizationReconciler{
323-
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
324-
AllowExternalArtifact: allowExternalArtifact,
325-
CancelHealthCheckOnNewRevision: cancelHealthCheckOnNewRevision,
326-
APIReader: mgr.GetAPIReader(),
327-
ArtifactFetchRetries: httpRetry,
328-
Client: mgr.GetClient(),
329-
ClusterReader: clusterReader,
330-
ConcurrentSSA: concurrentSSA,
331-
ControllerName: controllerName,
332-
DefaultServiceAccount: defaultServiceAccount,
333-
DependencyRequeueInterval: requeueDependency,
334-
DisallowedFieldManagers: disallowedFieldManagers,
335-
EventRecorder: eventRecorder,
336-
FailFast: failFast,
337-
GroupChangeLog: groupChangeLog,
338-
KubeConfigOpts: kubeConfigOpts,
339-
Mapper: restMapper,
340-
Metrics: metricsH,
341-
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
342-
NoRemoteBases: noRemoteBases,
343-
SOPSAgeSecret: sopsAgeSecret,
344-
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
345-
StrictSubstitutions: strictSubstitutions,
346-
TokenCache: tokenCache,
323+
AdditiveCELDependencyCheck: additiveCELDependencyCheck,
324+
AllowExternalArtifact: allowExternalArtifact,
325+
APIReader: mgr.GetAPIReader(),
326+
ArtifactFetchRetries: httpRetry,
327+
Client: mgr.GetClient(),
328+
ClusterReader: clusterReader,
329+
ConcurrentSSA: concurrentSSA,
330+
ControllerName: controllerName,
331+
DefaultServiceAccount: defaultServiceAccount,
332+
DependencyRequeueInterval: requeueDependency,
333+
DisallowedFieldManagers: disallowedFieldManagers,
334+
EventRecorder: eventRecorder,
335+
FailFast: failFast,
336+
GroupChangeLog: groupChangeLog,
337+
KubeConfigOpts: kubeConfigOpts,
338+
Mapper: restMapper,
339+
Metrics: metricsH,
340+
NoCrossNamespaceRefs: aclOptions.NoCrossNamespaceRefs,
341+
NoRemoteBases: noRemoteBases,
342+
SOPSAgeSecret: sopsAgeSecret,
343+
StatusManager: fmt.Sprintf("gotk-%s", controllerName),
344+
StrictSubstitutions: strictSubstitutions,
345+
TokenCache: tokenCache,
347346
}).SetupWithManager(ctx, mgr, controller.KustomizationReconcilerOptions{
348-
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
349-
WatchConfigs: watchConfigs,
350-
WatchConfigsPredicate: watchConfigsPredicate,
351-
WatchExternalArtifacts: allowExternalArtifact,
347+
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
348+
WatchConfigs: watchConfigs,
349+
WatchConfigsPredicate: watchConfigsPredicate,
350+
WatchExternalArtifacts: allowExternalArtifact,
351+
CancelHealthCheckOnRequeue: cancelHealthCheckOnNewRevision,
352352
}); err != nil {
353353
setupLog.Error(err, "unable to create controller", "controller", controllerName)
354354
os.Exit(1)

0 commit comments

Comments
 (0)