Skip to content

Commit 4ff3683

Browse files
author
Per Goncalves da Silva
committed
queue item fix
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 87a3269 commit 4ff3683

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

pkg/controller/registry/resolver/source_csvs.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,19 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
9393
continue
9494
}
9595

96-
//if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
97-
// // we might be in an incoherent state, so let's check with live clients to make sure
98-
// realSubscriptions, err := s.listSubscriptions(ctx)
99-
// if err != nil {
100-
// return nil, fmt.Errorf("failed to list subscriptions: %w", err)
101-
// }
102-
// for _, realSubscription := range realSubscriptions.Items {
103-
// if realSubscription.Status.InstalledCSV == csv.Name {
104-
// // oops, live cluster state is coherent
105-
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
106-
// }
107-
// }
108-
//}
96+
if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
97+
// we might be in an incoherent state, so let's check with live clients to make sure
98+
realSubscriptions, err := s.listSubscriptions(ctx)
99+
if err != nil {
100+
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
101+
}
102+
for _, realSubscription := range realSubscriptions.Items {
103+
if realSubscription.Status.InstalledCSV == csv.Name {
104+
// oops, live cluster state is coherent
105+
return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
106+
}
107+
}
108+
}
109109

110110
if failForwardEnabled {
111111
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,19 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
282282
queue.Forget(item)
283283
return true
284284
}
285-
285+
286286
logger = logger.WithField("cache-key", key)
287287

288+
if !ok {
289+
// we should really only (sync) resource event items
290+
// otherwise we run the risk of processing two the same objects concurrently.
291+
// As is, at least, the case for the namespace resolution informer,
292+
// the namespace name gets added to the queue (e.g. nsResolveQueue.Add(obj.GetNamespace())),
293+
// but the QueueInformer's Enqueue method always adds ResourceEvents.
294+
queue.Add(kubestate.NewResourceEvent(kubestate.ResourceUpdated, key))
295+
return true
296+
}
297+
288298
var resource interface{}
289299
if loop.indexer == nil {
290300
resource = event.Resource()
@@ -305,11 +315,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
305315
}
306316
}
307317

308-
if !ok {
309-
event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource)
310-
} else {
311-
event = kubestate.NewResourceEvent(event.Type(), resource)
312-
}
318+
event = kubestate.NewResourceEvent(event.Type(), resource)
313319
}
314320

315321
// Sync and requeue on error (throw out failed deletion syncs)

0 commit comments

Comments
 (0)