Skip to content

Commit e1987f4

Browse files
author
Per Goncalves da Silva
committed
queue item fix
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 87a3269 commit e1987f4

File tree

2 files changed

+24
-19
lines changed

2 files changed

+24
-19
lines changed

pkg/controller/registry/resolver/source_csvs.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,19 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
9393
continue
9494
}
9595

96-
//if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
97-
// // we might be in an incoherent state, so let's check with live clients to make sure
98-
// realSubscriptions, err := s.listSubscriptions(ctx)
99-
// if err != nil {
100-
// return nil, fmt.Errorf("failed to list subscriptions: %w", err)
101-
// }
102-
// for _, realSubscription := range realSubscriptions.Items {
103-
// if realSubscription.Status.InstalledCSV == csv.Name {
104-
// // oops, live cluster state is coherent
105-
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
106-
// }
107-
// }
108-
//}
96+
if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
97+
// we might be in an incoherent state, so let's check with live clients to make sure
98+
realSubscriptions, err := s.listSubscriptions(ctx)
99+
if err != nil {
100+
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
101+
}
102+
for _, realSubscription := range realSubscriptions.Items {
103+
if realSubscription.Status.InstalledCSV == csv.Name {
104+
// oops, live cluster state is coherent
105+
return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
106+
}
107+
}
108+
}
109109

110110
if failForwardEnabled {
111111
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,18 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
282282
queue.Forget(item)
283283
return true
284284
}
285-
286285
logger = logger.WithField("cache-key", key)
287286

287+
if !ok {
288+
// we should really only (sync) resource event items
289+
// otherwise we run the risk of processing two the same objects concurrently.
290+
// As is, at least, the case for the namespace resolution informer,
291+
// the namespace name gets added to the queue (e.g. nsResolveQueue.Add(obj.GetNamespace())),
292+
// but the QueueInformer's Enqueue method always adds ResourceEvents.
293+
queue.Add(kubestate.NewResourceEvent(kubestate.ResourceUpdated, key))
294+
return true
295+
}
296+
288297
var resource interface{}
289298
if loop.indexer == nil {
290299
resource = event.Resource()
@@ -305,11 +314,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
305314
}
306315
}
307316

308-
if !ok {
309-
event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource)
310-
} else {
311-
event = kubestate.NewResourceEvent(event.Type(), resource)
312-
}
317+
event = kubestate.NewResourceEvent(event.Type(), resource)
313318
}
314319

315320
// Sync and requeue on error (throw out failed deletion syncs)

0 commit comments

Comments
 (0)