Skip to content

Commit dee3337

Browse files
authored
clustercatalog_controller: hacky, but more correct status updating in reconciler (#424)
Signed-off-by: Joe Lanford <[email protected]>
1 parent 8137da0 commit dee3337

File tree

5 files changed

+287
-191
lines changed

5 files changed

+287
-191
lines changed

cmd/manager/main.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,10 @@ func main() {
226226
os.Exit(1)
227227
}
228228

229-
clusterCatalogFinalizers, err := corecontrollers.NewFinalizers(localStorage, unpacker)
230-
if err != nil {
231-
setupLog.Error(err, "unable to configure finalizers")
232-
os.Exit(1)
233-
}
234-
235229
if err = (&corecontrollers.ClusterCatalogReconciler{
236-
Client: mgr.GetClient(),
237-
Unpacker: unpacker,
238-
Storage: localStorage,
239-
Finalizers: clusterCatalogFinalizers,
230+
Client: mgr.GetClient(),
231+
Unpacker: unpacker,
232+
Storage: localStorage,
240233
}).SetupWithManager(mgr); err != nil {
241234
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
242235
os.Exit(1)

internal/controllers/core/clustercatalog_controller.go

Lines changed: 152 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import (
2020
"context" // #nosec
2121
"errors"
2222
"fmt"
23+
"slices"
24+
"sync"
2325
"time"
2426

2527
"k8s.io/apimachinery/pkg/api/equality"
2628
"k8s.io/apimachinery/pkg/api/meta"
2729
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/util/sets"
2831
"k8s.io/apimachinery/pkg/util/wait"
2932
ctrl "sigs.k8s.io/controller-runtime"
3033
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -47,9 +50,22 @@ const (
4750
// ClusterCatalogReconciler reconciles a Catalog object
4851
type ClusterCatalogReconciler struct {
4952
client.Client
50-
Unpacker source.Unpacker
51-
Storage storage.Instance
52-
Finalizers crfinalizer.Finalizers
53+
Unpacker source.Unpacker
54+
Storage storage.Instance
55+
56+
finalizers crfinalizer.Finalizers
57+
58+
// TODO: The below storedCatalogs fields are used for a quick a hack that helps
59+
// us correctly populate a ClusterCatalog's status. The fact that we need
60+
// these is indicative of a larger problem with the design of one or both
61+
// of the Unpacker and Storage interfaces. We should fix this.
62+
storedCatalogsMu sync.RWMutex
63+
storedCatalogs map[string]storedCatalogData
64+
}
65+
66+
type storedCatalogData struct {
67+
observedGeneration int64
68+
unpackResult source.Result
5369
}
5470

5571
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch;create;update;patch;delete
@@ -75,6 +91,14 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
7591
reconciledCatsrc := existingCatsrc.DeepCopy()
7692
res, reconcileErr := r.reconcile(ctx, reconciledCatsrc)
7793

94+
// If we encounter an error, we should delete the stored catalog metadata
95+
// which represents the state of a successfully unpacked catalog. Deleting
96+
// this state ensures that we will continue retrying the unpacking process
97+
// until it succeeds.
98+
if reconcileErr != nil {
99+
r.deleteStoredCatalog(reconciledCatsrc.Name)
100+
}
101+
78102
// Do checks before any Update()s, as Update() may modify the resource structure!
79103
updateStatus := !equality.Semantic.DeepEqual(existingCatsrc.Status, reconciledCatsrc.Status)
80104
updateFinalizers := !equality.Semantic.DeepEqual(existingCatsrc.Finalizers, reconciledCatsrc.Finalizers)
@@ -109,6 +133,14 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
109133

110134
// SetupWithManager sets up the controller with the Manager.
111135
func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
136+
r.storedCatalogsMu.Lock()
137+
defer r.storedCatalogsMu.Unlock()
138+
r.storedCatalogs = make(map[string]storedCatalogData)
139+
140+
if err := r.setupFinalizers(); err != nil {
141+
return fmt.Errorf("failed to setup finalizers: %v", err)
142+
}
143+
112144
return ctrl.NewControllerManagedBy(mgr).
113145
For(&v1alpha1.ClusterCatalog{}).
114146
Complete(r)
@@ -123,7 +155,9 @@ func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
123155
// linting from the linter that was fussing about this.
124156
// nolint:unparam
125157
func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alpha1.ClusterCatalog) (ctrl.Result, error) {
126-
finalizeResult, err := r.Finalizers.Finalize(ctx, catalog)
158+
l := log.FromContext(ctx)
159+
160+
finalizeResult, err := r.finalizers.Finalize(ctx, catalog)
127161
if err != nil {
128162
return ctrl.Result{}, err
129163
}
@@ -133,55 +167,125 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alp
133167
return ctrl.Result{}, nil
134168
}
135169

136-
if !r.needsUnpacking(catalog) {
137-
return ctrl.Result{}, nil
170+
// TODO: The below algorithm to get the current state based on an in-memory
171+
// storedCatalogs map is a hack that helps us keep the ClusterCatalog's
172+
// status up-to-date. The fact that we need this setup is indicative of
173+
// a larger problem with the design of one or both of the Unpacker and
174+
// Storage interfaces and/or their interactions. We should fix this.
175+
expectedStatus, storedCatalog, hasStoredCatalog := r.getCurrentState(catalog)
176+
177+
// If any of the following are true, we need to unpack the catalog:
178+
// - we don't have a stored catalog in the map
179+
// - we have a stored catalog, but the content doesn't exist on disk
180+
// - we have a stored catalog, the content exists, but the expected status differs from the actual status
181+
// - we have a stored catalog, the content exists, the status looks correct, but the catalog generation is different from the observed generation in the stored catalog
182+
// - we have a stored catalog, the content exists, the status looks correct and reflects the catalog generation, but it is time to poll again
183+
needsUnpack := false
184+
switch {
185+
case !hasStoredCatalog:
186+
l.Info("unpack required: no cached catalog metadata found for this catalog")
187+
needsUnpack = true
188+
case !r.Storage.ContentExists(catalog.Name):
189+
l.Info("unpack required: no stored content found for this catalog")
190+
needsUnpack = true
191+
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus):
192+
l.Info("unpack required: current ClusterCatalog status differs from expected status")
193+
needsUnpack = true
194+
case catalog.Generation != storedCatalog.observedGeneration:
195+
l.Info("unpack required: catalog generation differs from observed generation")
196+
needsUnpack = true
197+
case r.needsPoll(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog):
198+
l.Info("unpack required: poll duration has elapsed")
199+
needsUnpack = true
200+
}
201+
202+
if !needsUnpack {
203+
// No need to update the status because we've already checked
204+
// that it is set correctly. Otherwise, we'd be unpacking again.
205+
return nextPollResult(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil
138206
}
139207

140208
unpackResult, err := r.Unpacker.Unpack(ctx, catalog)
141209
if err != nil {
142210
unpackErr := fmt.Errorf("source catalog content: %w", err)
143-
updateStatusProgressing(catalog, unpackErr)
211+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), unpackErr)
144212
return ctrl.Result{}, unpackErr
145213
}
146214

147215
switch unpackResult.State {
148216
case source.StateUnpacked:
149-
contentURL := ""
150217
// TODO: We should check to see if the unpacked result has the same content
151218
// as the already unpacked content. If it does, we should skip this rest
152219
// of the unpacking steps.
153220
err := r.Storage.Store(ctx, catalog.Name, unpackResult.FS)
154221
if err != nil {
155222
storageErr := fmt.Errorf("error storing fbc: %v", err)
156-
updateStatusProgressing(catalog, storageErr)
223+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
157224
return ctrl.Result{}, storageErr
158225
}
159-
contentURL = r.Storage.ContentURL(catalog.Name)
160-
161-
updateStatusProgressing(catalog, nil)
162-
updateStatusServing(&catalog.Status, unpackResult, contentURL, catalog.GetGeneration())
163-
164-
var requeueAfter time.Duration
165-
switch catalog.Spec.Source.Type {
166-
case v1alpha1.SourceTypeImage:
167-
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil {
168-
requeueAfter = wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor)
169-
}
170-
}
226+
contentURL := r.Storage.ContentURL(catalog.Name)
171227

172-
return ctrl.Result{RequeueAfter: requeueAfter}, nil
228+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
229+
updateStatusServing(&catalog.Status, *unpackResult, contentURL, catalog.GetGeneration())
173230
default:
174231
panic(fmt.Sprintf("unknown unpack state %q", unpackResult.State))
175232
}
233+
234+
r.storedCatalogsMu.Lock()
235+
r.storedCatalogs[catalog.Name] = storedCatalogData{
236+
unpackResult: *unpackResult,
237+
observedGeneration: catalog.GetGeneration(),
238+
}
239+
r.storedCatalogsMu.Unlock()
240+
return nextPollResult(unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil
176241
}
177242

178-
func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) {
243+
func (r *ClusterCatalogReconciler) getCurrentState(catalog *v1alpha1.ClusterCatalog) (*v1alpha1.ClusterCatalogStatus, storedCatalogData, bool) {
244+
r.storedCatalogsMu.RLock()
245+
storedCatalog, hasStoredCatalog := r.storedCatalogs[catalog.Name]
246+
r.storedCatalogsMu.RUnlock()
247+
248+
expectedStatus := catalog.Status.DeepCopy()
249+
250+
// Set expected status based on what we see in the stored catalog
251+
clearUnknownConditions(expectedStatus)
252+
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
253+
updateStatusServing(expectedStatus, storedCatalog.unpackResult, r.Storage.ContentURL(catalog.Name), storedCatalog.observedGeneration)
254+
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
255+
}
256+
257+
return expectedStatus, storedCatalog, hasStoredCatalog
258+
}
259+
260+
func nextPollResult(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) ctrl.Result {
261+
var requeueAfter time.Duration
262+
switch catalog.Spec.Source.Type {
263+
case v1alpha1.SourceTypeImage:
264+
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil {
265+
jitteredDuration := wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor)
266+
requeueAfter = time.Until(lastSuccessfulPoll.Add(jitteredDuration))
267+
}
268+
}
269+
return ctrl.Result{RequeueAfter: requeueAfter}
270+
}
271+
272+
func clearUnknownConditions(status *v1alpha1.ClusterCatalogStatus) {
273+
knownTypes := sets.New[string](
274+
v1alpha1.TypeServing,
275+
v1alpha1.TypeProgressing,
276+
)
277+
status.Conditions = slices.DeleteFunc(status.Conditions, func(cond metav1.Condition) bool {
278+
return !knownTypes.Has(cond.Type)
279+
})
280+
}
281+
282+
func updateStatusProgressing(status *v1alpha1.ClusterCatalogStatus, generation int64, err error) {
179283
progressingCond := metav1.Condition{
180284
Type: v1alpha1.TypeProgressing,
181285
Status: metav1.ConditionFalse,
182286
Reason: v1alpha1.ReasonSucceeded,
183287
Message: "Successfully unpacked and stored content from resolved source",
184-
ObservedGeneration: catalog.GetGeneration(),
288+
ObservedGeneration: generation,
185289
}
186290

187291
if err != nil {
@@ -195,10 +299,10 @@ func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) {
195299
progressingCond.Reason = v1alpha1.ReasonBlocked
196300
}
197301

198-
meta.SetStatusCondition(&catalog.Status.Conditions, progressingCond)
302+
meta.SetStatusCondition(&status.Conditions, progressingCond)
199303
}
200304

201-
func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result *source.Result, contentURL string, generation int64) {
305+
func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result source.Result, contentURL string, generation int64) {
202306
status.ResolvedSource = result.ResolvedSource
203307
status.ContentURL = contentURL
204308
status.LastUnpacked = metav1.NewTime(result.UnpackTime)
@@ -223,34 +327,15 @@ func updateStatusNotServing(status *v1alpha1.ClusterCatalogStatus, generation in
223327
})
224328
}
225329

226-
func (r *ClusterCatalogReconciler) needsUnpacking(catalog *v1alpha1.ClusterCatalog) bool {
227-
// if ResolvedSource is nil, it indicates that this is the first time we're
228-
// unpacking this catalog.
229-
if catalog.Status.ResolvedSource == nil {
230-
return true
231-
}
232-
if !r.Storage.ContentExists(catalog.Name) {
233-
return true
234-
}
235-
// if there is no spec.Source.Image, don't unpack again
236-
if catalog.Spec.Source.Image == nil {
237-
return false
238-
}
239-
if len(catalog.Status.Conditions) == 0 {
240-
return true
241-
}
242-
for _, c := range catalog.Status.Conditions {
243-
if c.ObservedGeneration != catalog.Generation {
244-
return true
245-
}
246-
}
247-
// if pollInterval is nil, don't unpack again
330+
func (r *ClusterCatalogReconciler) needsPoll(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) bool {
331+
// If polling is disabled, we don't need to poll.
248332
if catalog.Spec.Source.Image.PollInterval == nil {
249333
return false
250334
}
251-
// if it's not time to poll yet, and the CR wasn't changed don't unpack again
252-
nextPoll := catalog.Status.ResolvedSource.Image.LastSuccessfulPollAttempt.Add(catalog.Spec.Source.Image.PollInterval.Duration)
253-
return !nextPoll.After(time.Now())
335+
336+
// Only poll if the next poll time is in the past.
337+
nextPoll := lastSuccessfulPoll.Add(catalog.Spec.Source.Image.PollInterval.Duration)
338+
return nextPoll.Before(time.Now())
254339
}
255340

256341
// Compare resources - ignoring status & metadata.finalizers
@@ -266,26 +351,35 @@ func (f finalizerFunc) Finalize(ctx context.Context, obj client.Object) (crfinal
266351
return f(ctx, obj)
267352
}
268353

269-
func NewFinalizers(localStorage storage.Instance, unpacker source.Unpacker) (crfinalizer.Finalizers, error) {
354+
func (r *ClusterCatalogReconciler) setupFinalizers() error {
270355
f := crfinalizer.NewFinalizers()
271356
err := f.Register(fbcDeletionFinalizer, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) {
272357
catalog, ok := obj.(*v1alpha1.ClusterCatalog)
273358
if !ok {
274359
panic("could not convert object to clusterCatalog")
275360
}
276-
if err := localStorage.Delete(catalog.Name); err != nil {
277-
updateStatusProgressing(catalog, err)
361+
if err := r.Storage.Delete(catalog.Name); err != nil {
362+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
278363
return crfinalizer.Result{StatusUpdated: true}, err
279364
}
280365
updateStatusNotServing(&catalog.Status, catalog.GetGeneration())
281-
if err := unpacker.Cleanup(ctx, catalog); err != nil {
282-
updateStatusProgressing(catalog, err)
366+
if err := r.Unpacker.Cleanup(ctx, catalog); err != nil {
367+
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
283368
return crfinalizer.Result{StatusUpdated: true}, err
284369
}
370+
371+
r.deleteStoredCatalog(catalog.Name)
285372
return crfinalizer.Result{StatusUpdated: true}, nil
286373
}))
287374
if err != nil {
288-
return f, err
375+
return err
289376
}
290-
return f, nil
377+
r.finalizers = f
378+
return nil
379+
}
380+
381+
func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) {
382+
r.storedCatalogsMu.Lock()
383+
defer r.storedCatalogsMu.Unlock()
384+
delete(r.storedCatalogs, catalogName)
291385
}

0 commit comments

Comments
 (0)