Skip to content

Commit 0a406ee

Browse files
authored
Merge pull request kubernetes#126723 from pwschuurman/revert-125372-pvcScalabilityFix
Revert "Improve PVC protection controller's scalability by batch-processing PVCs by namespace & caching live pod list results"
2 parents 0b3b733 + dbcbdbf commit 0a406ee

File tree

4 files changed

+52
-511
lines changed

4 files changed

+52
-511
lines changed

pkg/controller/volume/pvcprotection/pvc_protection_controller.go

Lines changed: 31 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package pvcprotection
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
2322
"time"
2423

2524
v1 "k8s.io/api/core/v1"
@@ -42,65 +41,6 @@ import (
4241

4342
// Controller is controller that removes PVCProtectionFinalizer
4443
// from PVCs that are used by no pods.
45-
46-
type LazyLivePodList struct {
47-
cache []v1.Pod
48-
controller *Controller
49-
}
50-
51-
func (ll *LazyLivePodList) getCache() []v1.Pod {
52-
return ll.cache
53-
}
54-
55-
func (ll *LazyLivePodList) setCache(pods []v1.Pod) {
56-
ll.cache = pods
57-
}
58-
59-
type pvcData struct {
60-
pvcKey string
61-
pvcName string
62-
}
63-
64-
type pvcProcessingStore struct {
65-
namespaceToPVCsMap map[string][]pvcData
66-
namespaceQueue workqueue.TypedInterface[string]
67-
mu sync.Mutex
68-
}
69-
70-
func NewPVCProcessingStore() *pvcProcessingStore {
71-
return &pvcProcessingStore{
72-
namespaceToPVCsMap: make(map[string][]pvcData),
73-
namespaceQueue: workqueue.NewTyped[string](),
74-
}
75-
}
76-
77-
func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) {
78-
m.mu.Lock()
79-
defer m.mu.Unlock()
80-
if _, exists := m.namespaceToPVCsMap[namespace]; !exists {
81-
m.namespaceToPVCsMap[namespace] = make([]pvcData, 0)
82-
m.namespaceQueue.Add(namespace)
83-
}
84-
m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName})
85-
}
86-
87-
// Returns a list of pvcs and the associated namespace to be processed downstream
88-
func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) {
89-
90-
nextNamespace, quit := m.namespaceQueue.Get()
91-
if quit {
92-
return nil, nextNamespace
93-
}
94-
95-
m.mu.Lock()
96-
defer m.mu.Unlock()
97-
pvcs := m.namespaceToPVCsMap[nextNamespace]
98-
99-
delete(m.namespaceToPVCsMap, nextNamespace)
100-
m.namespaceQueue.Done(nextNamespace)
101-
return pvcs, nextNamespace
102-
}
103-
10444
type Controller struct {
10545
client clientset.Interface
10646

@@ -111,8 +51,7 @@ type Controller struct {
11151
podListerSynced cache.InformerSynced
11252
podIndexer cache.Indexer
11353

114-
queue workqueue.TypedRateLimitingInterface[string]
115-
pvcProcessingStore *pvcProcessingStore
54+
queue workqueue.TypedRateLimitingInterface[string]
11655
}
11756

11857
// NewPVCProtectionController returns a new instance of PVCProtectionController.
@@ -123,7 +62,6 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
12362
workqueue.DefaultTypedControllerRateLimiter[string](),
12463
workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
12564
),
126-
pvcProcessingStore: NewPVCProcessingStore(),
12765
}
12866

12967
e.pvcLister = pvcInformer.Lister()
@@ -162,7 +100,6 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
162100
func (c *Controller) Run(ctx context.Context, workers int) {
163101
defer utilruntime.HandleCrash()
164102
defer c.queue.ShutDown()
165-
defer c.pvcProcessingStore.namespaceQueue.ShutDown()
166103

167104
logger := klog.FromContext(ctx)
168105
logger.Info("Starting PVC protection controller")
@@ -172,64 +109,45 @@ func (c *Controller) Run(ctx context.Context, workers int) {
172109
return
173110
}
174111

175-
go wait.UntilWithContext(ctx, c.runMainWorker, time.Second)
176112
for i := 0; i < workers; i++ {
177-
go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second)
113+
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
178114
}
179115

180116
<-ctx.Done()
181117
}
182118

183-
// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
184-
func (c *Controller) runMainWorker(ctx context.Context) {
185-
for c.processNextWorkItem() {
119+
func (c *Controller) runWorker(ctx context.Context) {
120+
for c.processNextWorkItem(ctx) {
186121
}
187122
}
188123

189-
// Consumer worker pulls items off namespace queue and processes associated PVCs
190-
func (c *Controller) runProcessNamespaceWorker(ctx context.Context) {
191-
for c.processPVCsByNamespace(ctx) {
124+
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
125+
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
126+
pvcKey, quit := c.queue.Get()
127+
if quit {
128+
return false
192129
}
193-
}
130+
defer c.queue.Done(pvcKey)
194131

195-
func (c *Controller) processNextWorkItem() bool {
196-
queueLength := c.queue.Len()
197-
for i := 0; i < queueLength; i++ {
198-
pvcKey, quit := c.queue.Get()
199-
if quit {
200-
return false
201-
}
202-
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
203-
if err != nil {
204-
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
205-
}
206-
c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName)
132+
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
133+
if err != nil {
134+
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
135+
return true
207136
}
208-
return !c.queue.ShuttingDown()
209-
}
210137

211-
func (c *Controller) processPVCsByNamespace(ctx context.Context) bool {
212-
pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace()
213-
if pvcList == nil {
214-
return false
138+
err = c.processPVC(ctx, pvcNamespace, pvcName)
139+
if err == nil {
140+
c.queue.Forget(pvcKey)
141+
return true
215142
}
216143

217-
lazyLivePodList := &LazyLivePodList{controller: c}
218-
for _, item := range pvcList {
219-
pvcKey, pvcName := item.pvcKey, item.pvcName
220-
err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList)
221-
if err == nil {
222-
c.queue.Forget(pvcKey)
223-
} else {
224-
c.queue.AddRateLimited(pvcKey)
225-
utilruntime.HandleError(fmt.Errorf("PVC %v in namespace %v failed with: %w", pvcName, namespace, err))
226-
}
227-
c.queue.Done(pvcKey)
228-
}
144+
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %w", pvcKey, err))
145+
c.queue.AddRateLimited(pvcKey)
146+
229147
return true
230148
}
231149

232-
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error {
150+
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
233151
logger := klog.FromContext(ctx)
234152
logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
235153
startTime := time.Now()
@@ -249,7 +167,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
249167
if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
250168
// PVC should be deleted. Check if it's used and remove finalizer if
251169
// it's not.
252-
isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList)
170+
isUsed, err := c.isBeingUsed(ctx, pvc)
253171
if err != nil {
254172
return err
255173
}
@@ -291,11 +209,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
291209
logger.Error(err, "Error removing protection finalizer from PVC", "PVC", klog.KObj(pvc))
292210
return err
293211
}
294-
logger.Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
212+
logger.V(3).Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
295213
return nil
296214
}
297215

298-
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
216+
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
299217
// Look for a Pod using pvc in the Informer's cache. If one is found the
300218
// correct decision to keep pvc is taken without doing an expensive live
301219
// list.
@@ -311,9 +229,7 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
311229
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
312230
// be 100% confident that it is safe to delete pvc make sure no Pod is using
313231
// it among those returned by a live list.
314-
315-
// Use lazy live pod list instead of directly calling API server
316-
return c.askAPIServer(ctx, pvc, lazyLivePodList)
232+
return c.askAPIServer(ctx, pvc)
317233
}
318234

319235
func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
@@ -342,24 +258,16 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
342258
return false, nil
343259
}
344260

345-
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
261+
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
346262
logger := klog.FromContext(ctx)
347263
logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
348-
if lazyLivePodList.getCache() == nil {
349-
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
350264

351-
if err != nil {
352-
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
353-
}
354-
355-
if podsList.Items == nil {
356-
lazyLivePodList.setCache(make([]v1.Pod, 0))
357-
} else {
358-
lazyLivePodList.setCache(podsList.Items)
359-
}
265+
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
266+
if err != nil {
267+
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
360268
}
361269

362-
for _, pod := range lazyLivePodList.getCache() {
270+
for _, pod := range podsList.Items {
363271
if c.podUsesPVC(logger, &pod, pvc) {
364272
return true, nil
365273
}

0 commit comments

Comments
 (0)