Skip to content

Commit fd7642c

Browse files
authored
Merge pull request kubernetes#126745 from hungnguyen243/pvcScalabilityFix
Improve PVC protection controller's scalability by batch-processing PVCs by namespace & caching live pod list results [fixed dead loop issue with idle work queue]
2 parents 09596a5 + 152ab36 commit fd7642c

File tree

4 files changed

+511
-46
lines changed

4 files changed

+511
-46
lines changed

pkg/controller/volume/pvcprotection/pvc_protection_controller.go

Lines changed: 119 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pvcprotection
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -41,6 +42,65 @@ import (
4142

4243
// Controller is controller that removes PVCProtectionFinalizer
4344
// from PVCs that are used by no pods.
45+
46+
type LazyLivePodList struct {
47+
cache []v1.Pod
48+
controller *Controller
49+
}
50+
51+
func (ll *LazyLivePodList) getCache() []v1.Pod {
52+
return ll.cache
53+
}
54+
55+
func (ll *LazyLivePodList) setCache(pods []v1.Pod) {
56+
ll.cache = pods
57+
}
58+
59+
type pvcData struct {
60+
pvcKey string
61+
pvcName string
62+
}
63+
64+
type pvcProcessingStore struct {
65+
namespaceToPVCsMap map[string][]pvcData
66+
namespaceQueue workqueue.TypedInterface[string]
67+
mu sync.Mutex
68+
}
69+
70+
func NewPVCProcessingStore() *pvcProcessingStore {
71+
return &pvcProcessingStore{
72+
namespaceToPVCsMap: make(map[string][]pvcData),
73+
namespaceQueue: workqueue.NewTyped[string](),
74+
}
75+
}
76+
77+
func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) {
78+
m.mu.Lock()
79+
defer m.mu.Unlock()
80+
if _, exists := m.namespaceToPVCsMap[namespace]; !exists {
81+
m.namespaceToPVCsMap[namespace] = make([]pvcData, 0)
82+
m.namespaceQueue.Add(namespace)
83+
}
84+
m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName})
85+
}
86+
87+
// Returns a list of pvcs and the associated namespace to be processed downstream
88+
func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) {
89+
90+
nextNamespace, quit := m.namespaceQueue.Get()
91+
if quit {
92+
return nil, nextNamespace
93+
}
94+
95+
m.mu.Lock()
96+
defer m.mu.Unlock()
97+
pvcs := m.namespaceToPVCsMap[nextNamespace]
98+
99+
delete(m.namespaceToPVCsMap, nextNamespace)
100+
m.namespaceQueue.Done(nextNamespace)
101+
return pvcs, nextNamespace
102+
}
103+
44104
type Controller struct {
45105
client clientset.Interface
46106

@@ -51,7 +111,8 @@ type Controller struct {
51111
podListerSynced cache.InformerSynced
52112
podIndexer cache.Indexer
53113

54-
queue workqueue.TypedRateLimitingInterface[string]
114+
queue workqueue.TypedRateLimitingInterface[string]
115+
pvcProcessingStore *pvcProcessingStore
55116
}
56117

57118
// NewPVCProtectionController returns a new instance of PVCProtectionController.
@@ -62,6 +123,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
62123
workqueue.DefaultTypedControllerRateLimiter[string](),
63124
workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
64125
),
126+
pvcProcessingStore: NewPVCProcessingStore(),
65127
}
66128

67129
e.pvcLister = pvcInformer.Lister()
@@ -100,6 +162,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
100162
func (c *Controller) Run(ctx context.Context, workers int) {
101163
defer utilruntime.HandleCrash()
102164
defer c.queue.ShutDown()
165+
defer c.pvcProcessingStore.namespaceQueue.ShutDown()
103166

104167
logger := klog.FromContext(ctx)
105168
logger.Info("Starting PVC protection controller")
@@ -109,45 +172,64 @@ func (c *Controller) Run(ctx context.Context, workers int) {
109172
return
110173
}
111174

175+
go wait.UntilWithContext(ctx, c.runMainWorker, time.Second)
112176
for i := 0; i < workers; i++ {
113-
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
177+
go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second)
114178
}
115179

116180
<-ctx.Done()
117181
}
118182

119-
func (c *Controller) runWorker(ctx context.Context) {
120-
for c.processNextWorkItem(ctx) {
183+
// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
184+
func (c *Controller) runMainWorker(ctx context.Context) {
185+
for c.processNextWorkItem() {
186+
}
187+
}
188+
189+
// Consumer worker pulls items off namespace queue and processes associated PVCs
190+
func (c *Controller) runProcessNamespaceWorker(ctx context.Context) {
191+
for c.processPVCsByNamespace(ctx) {
121192
}
122193
}
123194

124-
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
125-
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
195+
func (c *Controller) processNextWorkItem() bool {
126196
pvcKey, quit := c.queue.Get()
127197
if quit {
128198
return false
129199
}
130-
defer c.queue.Done(pvcKey)
131200

132201
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
133202
if err != nil {
134203
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
135204
return true
136205
}
137206

138-
err = c.processPVC(ctx, pvcNamespace, pvcName)
139-
if err == nil {
140-
c.queue.Forget(pvcKey)
141-
return true
142-
}
207+
c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName)
208+
return true
209+
}
143210

144-
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %w", pvcKey, err))
145-
c.queue.AddRateLimited(pvcKey)
211+
func (c *Controller) processPVCsByNamespace(ctx context.Context) bool {
212+
pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace()
213+
if pvcList == nil {
214+
return false
215+
}
146216

217+
lazyLivePodList := &LazyLivePodList{controller: c}
218+
for _, item := range pvcList {
219+
pvcKey, pvcName := item.pvcKey, item.pvcName
220+
err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList)
221+
if err == nil {
222+
c.queue.Forget(pvcKey)
223+
} else {
224+
c.queue.AddRateLimited(pvcKey)
225+
utilruntime.HandleError(fmt.Errorf("PVC %v/%v failed with: %w", pvcName, namespace, err))
226+
}
227+
c.queue.Done(pvcKey)
228+
}
147229
return true
148230
}
149231

150-
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
232+
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error {
151233
logger := klog.FromContext(ctx)
152234
logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
153235
startTime := time.Now()
@@ -167,7 +249,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
167249
if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
168250
// PVC should be deleted. Check if it's used and remove finalizer if
169251
// it's not.
170-
isUsed, err := c.isBeingUsed(ctx, pvc)
252+
isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList)
171253
if err != nil {
172254
return err
173255
}
@@ -213,7 +295,7 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
213295
return nil
214296
}
215297

216-
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
298+
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
217299
// Look for a Pod using pvc in the Informer's cache. If one is found the
218300
// correct decision to keep pvc is taken without doing an expensive live
219301
// list.
@@ -228,8 +310,12 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
228310
// Even if no Pod using pvc was found in the Informer's cache it doesn't
229311
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
230312
// be 100% confident that it is safe to delete pvc make sure no Pod is using
231-
// it among those returned by a live list.
232-
return c.askAPIServer(ctx, pvc)
313+
// it among those returned by a "lazy" live list.
314+
315+
// Use a "lazy" live pod list: lazyLivePodList caches the first successful live pod list response,
316+
// so for a large number of PVC deletions in a short duration, subsequent requests can use the cached pod list
317+
// instead of issuing a lot of API requests. The cache is refreshed for each run of processNextWorkItem().
318+
return c.askAPIServer(ctx, pvc, lazyLivePodList)
233319
}
234320

235321
func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
@@ -258,16 +344,24 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
258344
return false, nil
259345
}
260346

261-
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
347+
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
262348
logger := klog.FromContext(ctx)
263-
logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
349+
logger.V(4).Info("Looking for Pods using PVC", "PVC", klog.KObj(pvc))
350+
if lazyLivePodList.getCache() == nil {
351+
logger.V(4).Info("Live listing Pods in namespace", "namespace", pvc.Namespace)
352+
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
353+
if err != nil {
354+
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
355+
}
264356

265-
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
266-
if err != nil {
267-
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
357+
if podsList.Items == nil {
358+
lazyLivePodList.setCache(make([]v1.Pod, 0))
359+
} else {
360+
lazyLivePodList.setCache(podsList.Items)
361+
}
268362
}
269363

270-
for _, pod := range podsList.Items {
364+
for _, pod := range lazyLivePodList.getCache() {
271365
if c.podUsesPVC(logger, &pod, pvc) {
272366
return true, nil
273367
}

0 commit comments

Comments
 (0)