Skip to content

Commit a8cbb22

Browse files
authored
Merge pull request kubernetes#74747 from liggitt/quota-deadlock
quota controller fixes
2 parents 6c22cff + bef996d commit a8cbb22

File tree

28 files changed

+1304
-42
lines changed

28 files changed

+1304
-42
lines changed

pkg/controller/garbagecollector/garbagecollector_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,6 @@ func TestGarbageCollectorSync(t *testing.T) {
873873
go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, stopCh)
874874

875875
// Wait until the sync discovers the initial resources
876-
fmt.Printf("Test output")
877876
time.Sleep(1 * time.Second)
878877

879878
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)

pkg/controller/resourcequota/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
3030
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
3131
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
32+
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
3334
"//staging/src/k8s.io/client-go/discovery:go_default_library",
3435
"//staging/src/k8s.io/client-go/informers:go_default_library",
@@ -53,12 +54,14 @@ go_test(
5354
"//staging/src/k8s.io/api/core/v1:go_default_library",
5455
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
5556
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
57+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
5658
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
5759
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
5860
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
5961
"//staging/src/k8s.io/client-go/informers:go_default_library",
6062
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
6163
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
64+
"//staging/src/k8s.io/client-go/rest:go_default_library",
6265
"//staging/src/k8s.io/client-go/testing:go_default_library",
6366
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
6467
],

pkg/controller/resourcequota/resource_quota_controller.go

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import (
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/labels"
3232
"k8s.io/apimachinery/pkg/runtime/schema"
33+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3334
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35+
"k8s.io/apimachinery/pkg/util/sets"
3436
"k8s.io/apimachinery/pkg/util/wait"
3537
"k8s.io/client-go/discovery"
3638
"k8s.io/client-go/informers"
@@ -320,22 +322,25 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
320322
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
321323
func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) {
322324
// quota is dirty if any part of spec hard limits differs from the status hard limits
323-
dirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
325+
statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
324326

325327
// dirty tracks if the usage status differs from the previous sync,
326328
// if so, we send a new usage with latest status
327329
// if this is our first sync, it will be dirty by default, since we need track usage
328-
dirty = dirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
330+
dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
329331

330332
used := v1.ResourceList{}
331333
if resourceQuota.Status.Used != nil {
332334
used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
333335
}
334336
hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
335337

338+
errors := []error{}
339+
336340
newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
337341
if err != nil {
338-
return err
342+
// if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
343+
errors = append(errors, err)
339344
}
340345
for key, value := range newUsage {
341346
used[key] = value
@@ -358,9 +363,11 @@ func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQ
358363
// there was a change observed by this controller that requires we update quota
359364
if dirty {
360365
_, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(usage)
361-
return err
366+
if err != nil {
367+
errors = append(errors, err)
368+
}
362369
}
363-
return nil
370+
return utilerrors.NewAggregate(errors)
364371
}
365372

366373
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
@@ -423,26 +430,66 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
423430
return
424431
}
425432

426-
// Something has changed, so track the new state and perform a sync.
427-
klog.V(2).Infof("syncing resource quota controller with updated resources from discovery: %v", newResources)
428-
oldResources = newResources
429-
430433
// Ensure workers are paused to avoid processing events before informers
431434
// have resynced.
432435
rq.workerLock.Lock()
433436
defer rq.workerLock.Unlock()
434437

438+
// Something has changed, so track the new state and perform a sync.
439+
if klog.V(2) {
440+
klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
441+
}
442+
435443
// Perform the monitor resync and wait for controllers to report cache sync.
436444
if err := rq.resyncMonitors(newResources); err != nil {
437445
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
438446
return
439447
}
440-
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
448+
// wait for caches to fill for a while (our sync period).
449+
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
450+
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
451+
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
452+
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
441453
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
454+
return
442455
}
456+
457+
// success, remember newly synced resources
458+
oldResources = newResources
459+
klog.V(2).Infof("synced quota controller")
443460
}, period, stopCh)
444461
}
445462

463+
// printDiff returns a human-readable summary of what resources were added and removed
464+
func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
465+
removed := sets.NewString()
466+
for oldResource := range oldResources {
467+
if _, ok := newResources[oldResource]; !ok {
468+
removed.Insert(fmt.Sprintf("%+v", oldResource))
469+
}
470+
}
471+
added := sets.NewString()
472+
for newResource := range newResources {
473+
if _, ok := oldResources[newResource]; !ok {
474+
added.Insert(fmt.Sprintf("%+v", newResource))
475+
}
476+
}
477+
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
478+
}
479+
480+
// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
481+
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
482+
stopChWithTimeout := make(chan struct{})
483+
go func() {
484+
defer close(stopChWithTimeout)
485+
select {
486+
case <-stopCh:
487+
case <-time.After(timeout):
488+
}
489+
}()
490+
return stopChWithTimeout
491+
}
492+
446493
// resyncMonitors starts or stops quota monitors as needed to ensure that all
447494
// (and only) those resources present in the map are monitored.
448495
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {

0 commit comments

Comments
 (0)