@@ -19,6 +19,7 @@ package pvcprotection
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "sync"
22
23
"time"
23
24
24
25
v1 "k8s.io/api/core/v1"
@@ -41,6 +42,65 @@ import (
41
42
42
43
// Controller is controller that removes PVCProtectionFinalizer
43
44
// from PVCs that are used by no pods.
45
+
46
+ type LazyLivePodList struct {
47
+ cache []v1.Pod
48
+ controller * Controller
49
+ }
50
+
51
+ func (ll * LazyLivePodList ) getCache () []v1.Pod {
52
+ return ll .cache
53
+ }
54
+
55
+ func (ll * LazyLivePodList ) setCache (pods []v1.Pod ) {
56
+ ll .cache = pods
57
+ }
58
+
59
+ type pvcData struct {
60
+ pvcKey string
61
+ pvcName string
62
+ }
63
+
64
+ type pvcProcessingStore struct {
65
+ namespaceToPVCsMap map [string ][]pvcData
66
+ namespaceQueue workqueue.TypedInterface [string ]
67
+ mu sync.Mutex
68
+ }
69
+
70
+ func NewPVCProcessingStore () * pvcProcessingStore {
71
+ return & pvcProcessingStore {
72
+ namespaceToPVCsMap : make (map [string ][]pvcData ),
73
+ namespaceQueue : workqueue .NewTyped [string ](),
74
+ }
75
+ }
76
+
77
+ func (m * pvcProcessingStore ) addOrUpdate (namespace string , pvcKey , pvcName string ) {
78
+ m .mu .Lock ()
79
+ defer m .mu .Unlock ()
80
+ if _ , exists := m .namespaceToPVCsMap [namespace ]; ! exists {
81
+ m .namespaceToPVCsMap [namespace ] = make ([]pvcData , 0 )
82
+ m .namespaceQueue .Add (namespace )
83
+ }
84
+ m .namespaceToPVCsMap [namespace ] = append (m .namespaceToPVCsMap [namespace ], pvcData {pvcKey : pvcKey , pvcName : pvcName })
85
+ }
86
+
87
+ // Returns a list of pvcs and the associated namespace to be processed downstream
88
+ func (m * pvcProcessingStore ) flushNextPVCsByNamespace () ([]pvcData , string ) {
89
+
90
+ nextNamespace , quit := m .namespaceQueue .Get ()
91
+ if quit {
92
+ return nil , nextNamespace
93
+ }
94
+
95
+ m .mu .Lock ()
96
+ defer m .mu .Unlock ()
97
+ pvcs := m .namespaceToPVCsMap [nextNamespace ]
98
+
99
+ delete (m .namespaceToPVCsMap , nextNamespace )
100
+ m .namespaceQueue .Done (nextNamespace )
101
+ return pvcs , nextNamespace
102
+ }
103
+
44
104
type Controller struct {
45
105
client clientset.Interface
46
106
@@ -51,7 +111,8 @@ type Controller struct {
51
111
podListerSynced cache.InformerSynced
52
112
podIndexer cache.Indexer
53
113
54
- queue workqueue.TypedRateLimitingInterface [string ]
114
+ queue workqueue.TypedRateLimitingInterface [string ]
115
+ pvcProcessingStore * pvcProcessingStore
55
116
}
56
117
57
118
// NewPVCProtectionController returns a new instance of PVCProtectionController.
@@ -62,6 +123,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
62
123
workqueue .DefaultTypedControllerRateLimiter [string ](),
63
124
workqueue.TypedRateLimitingQueueConfig [string ]{Name : "pvcprotection" },
64
125
),
126
+ pvcProcessingStore : NewPVCProcessingStore (),
65
127
}
66
128
67
129
e .pvcLister = pvcInformer .Lister ()
@@ -100,6 +162,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
100
162
func (c * Controller ) Run (ctx context.Context , workers int ) {
101
163
defer utilruntime .HandleCrash ()
102
164
defer c .queue .ShutDown ()
165
+ defer c .pvcProcessingStore .namespaceQueue .ShutDown ()
103
166
104
167
logger := klog .FromContext (ctx )
105
168
logger .Info ("Starting PVC protection controller" )
@@ -109,45 +172,64 @@ func (c *Controller) Run(ctx context.Context, workers int) {
109
172
return
110
173
}
111
174
175
+ go wait .UntilWithContext (ctx , c .runMainWorker , time .Second )
112
176
for i := 0 ; i < workers ; i ++ {
113
- go wait .UntilWithContext (ctx , c .runWorker , time .Second )
177
+ go wait .UntilWithContext (ctx , c .runProcessNamespaceWorker , time .Second )
114
178
}
115
179
116
180
<- ctx .Done ()
117
181
}
118
182
119
- func (c * Controller ) runWorker (ctx context.Context ) {
120
- for c .processNextWorkItem (ctx ) {
183
+ // Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
184
+ func (c * Controller ) runMainWorker (ctx context.Context ) {
185
+ for c .processNextWorkItem () {
186
+ }
187
+ }
188
+
189
+ // Consumer worker pulls items off namespace queue and processes associated PVCs
190
+ func (c * Controller ) runProcessNamespaceWorker (ctx context.Context ) {
191
+ for c .processPVCsByNamespace (ctx ) {
121
192
}
122
193
}
123
194
124
- // processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
125
- func (c * Controller ) processNextWorkItem (ctx context.Context ) bool {
195
+ func (c * Controller ) processNextWorkItem () bool {
126
196
pvcKey , quit := c .queue .Get ()
127
197
if quit {
128
198
return false
129
199
}
130
- defer c .queue .Done (pvcKey )
131
200
132
201
pvcNamespace , pvcName , err := cache .SplitMetaNamespaceKey (pvcKey )
133
202
if err != nil {
134
203
utilruntime .HandleError (fmt .Errorf ("error parsing PVC key %q: %w" , pvcKey , err ))
135
204
return true
136
205
}
137
206
138
- err = c .processPVC (ctx , pvcNamespace , pvcName )
139
- if err == nil {
140
- c .queue .Forget (pvcKey )
141
- return true
142
- }
207
+ c .pvcProcessingStore .addOrUpdate (pvcNamespace , pvcKey , pvcName )
208
+ return true
209
+ }
143
210
144
- utilruntime .HandleError (fmt .Errorf ("PVC %v failed with : %w" , pvcKey , err ))
145
- c .queue .AddRateLimited (pvcKey )
211
+ func (c * Controller ) processPVCsByNamespace (ctx context.Context ) bool {
212
+ pvcList , namespace := c .pvcProcessingStore .flushNextPVCsByNamespace ()
213
+ if pvcList == nil {
214
+ return false
215
+ }
146
216
217
+ lazyLivePodList := & LazyLivePodList {controller : c }
218
+ for _ , item := range pvcList {
219
+ pvcKey , pvcName := item .pvcKey , item .pvcName
220
+ err := c .processPVC (ctx , namespace , pvcName , lazyLivePodList )
221
+ if err == nil {
222
+ c .queue .Forget (pvcKey )
223
+ } else {
224
+ c .queue .AddRateLimited (pvcKey )
225
+ utilruntime .HandleError (fmt .Errorf ("PVC %v in namespace %v failed with: %w" , pvcName , namespace , err ))
226
+ }
227
+ c .queue .Done (pvcKey )
228
+ }
147
229
return true
148
230
}
149
231
150
- func (c * Controller ) processPVC (ctx context.Context , pvcNamespace , pvcName string ) error {
232
+ func (c * Controller ) processPVC (ctx context.Context , pvcNamespace , pvcName string , lazyLivePodList * LazyLivePodList ) error {
151
233
logger := klog .FromContext (ctx )
152
234
logger .V (4 ).Info ("Processing PVC" , "PVC" , klog .KRef (pvcNamespace , pvcName ))
153
235
startTime := time .Now ()
@@ -167,7 +249,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
167
249
if protectionutil .IsDeletionCandidate (pvc , volumeutil .PVCProtectionFinalizer ) {
168
250
// PVC should be deleted. Check if it's used and remove finalizer if
169
251
// it's not.
170
- isUsed , err := c .isBeingUsed (ctx , pvc )
252
+ isUsed , err := c .isBeingUsed (ctx , pvc , lazyLivePodList )
171
253
if err != nil {
172
254
return err
173
255
}
@@ -209,11 +291,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
209
291
logger .Error (err , "Error removing protection finalizer from PVC" , "PVC" , klog .KObj (pvc ))
210
292
return err
211
293
}
212
- logger .V ( 3 ). Info ("Removed protection finalizer from PVC" , "PVC" , klog .KObj (pvc ))
294
+ logger .Info ("Removed protection finalizer from PVC" , "PVC" , klog .KObj (pvc ))
213
295
return nil
214
296
}
215
297
216
- func (c * Controller ) isBeingUsed (ctx context.Context , pvc * v1.PersistentVolumeClaim ) (bool , error ) {
298
+ func (c * Controller ) isBeingUsed (ctx context.Context , pvc * v1.PersistentVolumeClaim , lazyLivePodList * LazyLivePodList ) (bool , error ) {
217
299
// Look for a Pod using pvc in the Informer's cache. If one is found the
218
300
// correct decision to keep pvc is taken without doing an expensive live
219
301
// list.
@@ -229,7 +311,9 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
229
311
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
230
312
// be 100% confident that it is safe to delete pvc make sure no Pod is using
231
313
// it among those returned by a live list.
232
- return c .askAPIServer (ctx , pvc )
314
+
315
+ // Use lazy live pod list instead of directly calling API server
316
+ return c .askAPIServer (ctx , pvc , lazyLivePodList )
233
317
}
234
318
235
319
func (c * Controller ) askInformer (logger klog.Logger , pvc * v1.PersistentVolumeClaim ) (bool , error ) {
@@ -258,16 +342,24 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
258
342
return false , nil
259
343
}
260
344
261
- func (c * Controller ) askAPIServer (ctx context.Context , pvc * v1.PersistentVolumeClaim ) (bool , error ) {
345
+ func (c * Controller ) askAPIServer (ctx context.Context , pvc * v1.PersistentVolumeClaim , lazyLivePodList * LazyLivePodList ) (bool , error ) {
262
346
logger := klog .FromContext (ctx )
263
- logger .V (4 ).Info ("Looking for Pods using PVC with a live list" , "PVC" , klog .KObj (pvc ))
347
+ logger .V (4 ).Info ("Looking for Pods using PVC" , "PVC" , klog .KObj (pvc ))
348
+ if lazyLivePodList .getCache () == nil {
349
+ logger .V (4 ).Info ("Live listing Pods in namespace" , "namespace" , pvc .Namespace )
350
+ podsList , err := c .client .CoreV1 ().Pods (pvc .Namespace ).List (ctx , metav1.ListOptions {})
351
+ if err != nil {
352
+ return false , fmt .Errorf ("live list of pods failed: %s" , err .Error ())
353
+ }
264
354
265
- podsList , err := c .client .CoreV1 ().Pods (pvc .Namespace ).List (ctx , metav1.ListOptions {})
266
- if err != nil {
267
- return false , fmt .Errorf ("live list of pods failed: %s" , err .Error ())
355
+ if podsList .Items == nil {
356
+ lazyLivePodList .setCache (make ([]v1.Pod , 0 ))
357
+ } else {
358
+ lazyLivePodList .setCache (podsList .Items )
359
+ }
268
360
}
269
361
270
- for _ , pod := range podsList . Items {
362
+ for _ , pod := range lazyLivePodList . getCache () {
271
363
if c .podUsesPVC (logger , & pod , pvc ) {
272
364
return true , nil
273
365
}
0 commit comments