Skip to content

Commit eb16aa1

Browse files
committed
improve PVC Protection Controller's processing mechanism with sample performance test
1 parent 9356b0e commit eb16aa1

File tree

4 files changed

+511
-52
lines changed

4 files changed

+511
-52
lines changed

pkg/controller/volume/pvcprotection/pvc_protection_controller.go

Lines changed: 123 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pvcprotection
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -41,6 +42,65 @@ import (
4142

4243
// Controller is controller that removes PVCProtectionFinalizer
4344
// from PVCs that are used by no pods.
45+
46+
type LazyLivePodList struct {
47+
cache []v1.Pod
48+
controller *Controller
49+
}
50+
51+
func (ll *LazyLivePodList) getCache() []v1.Pod {
52+
return ll.cache
53+
}
54+
55+
func (ll *LazyLivePodList) setCache(pods []v1.Pod) {
56+
ll.cache = pods
57+
}
58+
59+
type pvcData struct {
60+
pvcKey string
61+
pvcName string
62+
}
63+
64+
type pvcProcessingStore struct {
65+
namespaceToPVCsMap map[string][]pvcData
66+
namespaceQueue workqueue.TypedInterface[string]
67+
mu sync.Mutex
68+
}
69+
70+
func NewPVCProcessingStore() *pvcProcessingStore {
71+
return &pvcProcessingStore{
72+
namespaceToPVCsMap: make(map[string][]pvcData),
73+
namespaceQueue: workqueue.NewTyped[string](),
74+
}
75+
}
76+
77+
func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) {
78+
m.mu.Lock()
79+
defer m.mu.Unlock()
80+
if _, exists := m.namespaceToPVCsMap[namespace]; !exists {
81+
m.namespaceToPVCsMap[namespace] = make([]pvcData, 0)
82+
m.namespaceQueue.Add(namespace)
83+
}
84+
m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName})
85+
}
86+
87+
// Returns a list of pvcs and the associated namespace to be processed downstream
88+
func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) {
89+
90+
nextNamespace, quit := m.namespaceQueue.Get()
91+
if quit {
92+
return nil, nextNamespace
93+
}
94+
95+
m.mu.Lock()
96+
defer m.mu.Unlock()
97+
pvcs := m.namespaceToPVCsMap[nextNamespace]
98+
99+
delete(m.namespaceToPVCsMap, nextNamespace)
100+
m.namespaceQueue.Done(nextNamespace)
101+
return pvcs, nextNamespace
102+
}
103+
44104
type Controller struct {
45105
client clientset.Interface
46106

@@ -51,7 +111,8 @@ type Controller struct {
51111
podListerSynced cache.InformerSynced
52112
podIndexer cache.Indexer
53113

54-
queue workqueue.TypedRateLimitingInterface[string]
114+
queue workqueue.TypedRateLimitingInterface[string]
115+
pvcProcessingStore *pvcProcessingStore
55116
}
56117

57118
// NewPVCProtectionController returns a new instance of PVCProtectionController.
@@ -62,6 +123,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
62123
workqueue.DefaultTypedControllerRateLimiter[string](),
63124
workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
64125
),
126+
pvcProcessingStore: NewPVCProcessingStore(),
65127
}
66128

67129
e.pvcLister = pvcInformer.Lister()
@@ -100,6 +162,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
100162
func (c *Controller) Run(ctx context.Context, workers int) {
101163
defer utilruntime.HandleCrash()
102164
defer c.queue.ShutDown()
165+
defer c.pvcProcessingStore.namespaceQueue.ShutDown()
103166

104167
logger := klog.FromContext(ctx)
105168
logger.Info("Starting PVC protection controller")
@@ -109,45 +172,64 @@ func (c *Controller) Run(ctx context.Context, workers int) {
109172
return
110173
}
111174

175+
go wait.UntilWithContext(ctx, c.runMainWorker, time.Second)
112176
for i := 0; i < workers; i++ {
113-
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
177+
go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second)
114178
}
115179

116180
<-ctx.Done()
117181
}
118182

119-
func (c *Controller) runWorker(ctx context.Context) {
120-
for c.processNextWorkItem(ctx) {
183+
// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
184+
func (c *Controller) runMainWorker(ctx context.Context) {
185+
for c.processNextWorkItem() {
121186
}
122187
}
123188

124-
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
125-
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
126-
pvcKey, quit := c.queue.Get()
127-
if quit {
128-
return false
189+
// Consumer worker pulls items off namespace queue and processes associated PVCs
190+
func (c *Controller) runProcessNamespaceWorker(ctx context.Context) {
191+
for c.processPVCsByNamespace(ctx) {
129192
}
130-
defer c.queue.Done(pvcKey)
193+
}
131194

132-
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
133-
if err != nil {
134-
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %v", pvcKey, err))
135-
return true
195+
func (c *Controller) processNextWorkItem() bool {
196+
queueLength := c.queue.Len()
197+
for i := 0; i < queueLength; i++ {
198+
pvcKey, quit := c.queue.Get()
199+
if quit {
200+
return false
201+
}
202+
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
203+
if err != nil {
204+
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
205+
}
206+
c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName)
136207
}
208+
return !c.queue.ShuttingDown()
209+
}
137210

138-
err = c.processPVC(ctx, pvcNamespace, pvcName)
139-
if err == nil {
140-
c.queue.Forget(pvcKey)
141-
return true
211+
func (c *Controller) processPVCsByNamespace(ctx context.Context) bool {
212+
pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace()
213+
if pvcList == nil {
214+
return false
142215
}
143216

144-
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %v", pvcKey, err))
145-
c.queue.AddRateLimited(pvcKey)
146-
217+
lazyLivePodList := &LazyLivePodList{controller: c}
218+
for _, item := range pvcList {
219+
pvcKey, pvcName := item.pvcKey, item.pvcName
220+
err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList)
221+
if err == nil {
222+
c.queue.Forget(pvcKey)
223+
} else {
224+
c.queue.AddRateLimited(pvcKey)
225+
utilruntime.HandleError(fmt.Errorf("PVC %v in namespace %v failed with: %w", pvcName, namespace, err))
226+
}
227+
c.queue.Done(pvcKey)
228+
}
147229
return true
148230
}
149231

150-
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
232+
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error {
151233
logger := klog.FromContext(ctx)
152234
logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
153235
startTime := time.Now()
@@ -167,7 +249,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
167249
if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
168250
// PVC should be deleted. Check if it's used and remove finalizer if
169251
// it's not.
170-
isUsed, err := c.isBeingUsed(ctx, pvc)
252+
isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList)
171253
if err != nil {
172254
return err
173255
}
@@ -209,11 +291,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
209291
logger.Error(err, "Error removing protection finalizer from PVC", "PVC", klog.KObj(pvc))
210292
return err
211293
}
212-
logger.V(3).Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
294+
logger.Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
213295
return nil
214296
}
215297

216-
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
298+
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
217299
// Look for a Pod using pvc in the Informer's cache. If one is found the
218300
// correct decision to keep pvc is taken without doing an expensive live
219301
// list.
@@ -229,7 +311,9 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
229311
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
230312
// be 100% confident that it is safe to delete pvc make sure no Pod is using
231313
// it among those returned by a live list.
232-
return c.askAPIServer(ctx, pvc)
314+
315+
// Use lazy live pod list instead of directly calling API server
316+
return c.askAPIServer(ctx, pvc, lazyLivePodList)
233317
}
234318

235319
func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
@@ -258,16 +342,24 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
258342
return false, nil
259343
}
260344

261-
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
345+
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
262346
logger := klog.FromContext(ctx)
263347
logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
348+
if lazyLivePodList.getCache() == nil {
349+
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
264350

265-
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
266-
if err != nil {
267-
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
351+
if err != nil {
352+
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
353+
}
354+
355+
if podsList.Items == nil {
356+
lazyLivePodList.setCache(make([]v1.Pod, 0))
357+
} else {
358+
lazyLivePodList.setCache(podsList.Items)
359+
}
268360
}
269361

270-
for _, pod := range podsList.Items {
362+
for _, pod := range lazyLivePodList.getCache() {
271363
if c.podUsesPVC(logger, &pod, pvc) {
272364
return true, nil
273365
}

0 commit comments

Comments
 (0)