Skip to content

Commit 791d1c8

Browse files
committed
Add live list of pods to PVC protection controller
Add live list of pods to PVC protection controller, as opposed to doing only a cache-based list through the Informer. Both lists are performed while processing a PVC with deletionTimestamp set to check whether Pods using the PVC exist and remove the finalizer to enable deletion of the PVC if that's not the case. Prior to this commit only the cache-based list was done but that's unreliable because a pod using the PVC might exist but not be in the cache just yet. On the other hand, the live list is 100% reliable. Note that it would be enough to do only the live list. Instead, this commit adds it after the cache-based list and performs it only if the latter finds no Pod blocking deletion of the PVC being processed. The rationale is that live lists are expensive and it's desirable to minimize them. The drawback is that if at the time of the cache-based list the cache has not been notified yet of the deletion of a Pod using the PVC the PVC is kept. Correctness is not compromised because the finalizer will be removed when the Pod deletion notification is received, but this means PVC deletion is delayed. Reducing live lists was valued more than deleting PVCs slightly faster. Also, add a unit test that fails without the change introduced by this commit and revamp old unit tests. The latter is needed because expected behavior is described in terms of API calls the controller makes, and this commit introduces new API calls (the live lists).
1 parent 368ee4b commit 791d1c8

File tree

3 files changed

+132
-43
lines changed

3 files changed

+132
-43
lines changed

pkg/controller/volume/pvcprotection/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//pkg/volume/util:go_default_library",
1414
"//staging/src/k8s.io/api/core/v1:go_default_library",
1515
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
16+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1617
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1718
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1819
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

pkg/controller/volume/pvcprotection/pvc_protection_controller.go

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
v1 "k8s.io/api/core/v1"
2424
apierrs "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/apimachinery/pkg/labels"
2627
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2728
"k8s.io/apimachinery/pkg/util/wait"
@@ -168,6 +169,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
168169
if !isUsed {
169170
return c.removeFinalizer(pvc)
170171
}
172+
klog.V(2).Infof("Keeping PVC %s/%s because it is still being used", pvc.Namespace, pvc.Name)
171173
}
172174

173175
if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) {
@@ -209,42 +211,82 @@ func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
209211
}
210212

211213
func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
214+
// Look for a Pod using pvc in the Informer's cache. If one is found the
215+
// correct decision to keep pvc is taken without doing an expensive live
216+
// list.
217+
if inUse, err := c.askInformer(pvc); err != nil {
218+
// No need to return because a live list will follow.
219+
klog.Error(err)
220+
} else if inUse {
221+
return true, nil
222+
}
223+
224+
// Even if no Pod using pvc was found in the Informer's cache it doesn't
225+
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
226+
// be 100% confident that it is safe to delete pvc make sure no Pod is using
227+
// it among those returned by a live list.
228+
return c.askAPIServer(pvc)
229+
}
230+
231+
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
232+
klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name)
233+
212234
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
213235
if err != nil {
214-
return false, err
236+
return false, fmt.Errorf("Cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error())
215237
}
238+
216239
for _, pod := range pods {
217-
if pod.Spec.NodeName == "" {
218-
// This pod is not scheduled. We have a predicated in scheduler that
219-
// prevents scheduling pods with deletion timestamp, so we can be
220-
// pretty sure it won't be scheduled in parallel to this check.
221-
// Therefore this pod does not block the PVC from deletion.
222-
klog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
223-
continue
240+
if podUsesPVC(pod, pvc.Name) {
241+
return true, nil
224242
}
225-
for _, volume := range pod.Spec.Volumes {
226-
if volume.PersistentVolumeClaim == nil {
227-
continue
228-
}
229-
if volume.PersistentVolumeClaim.ClaimName == pvc.Name {
230-
klog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name)
231-
return true, nil
232-
}
243+
}
244+
245+
klog.V(4).Infof("No Pod using PVC %s/%s was found in the Informer's cache", pvc.Namespace, pvc.Name)
246+
return false, nil
247+
}
248+
249+
func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) {
250+
klog.V(4).Infof("Looking for Pods using PVC %s/%s with a live list", pvc.Namespace, pvc.Name)
251+
252+
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(metav1.ListOptions{})
253+
if err != nil {
254+
return false, fmt.Errorf("Live list of pods failed: %s", err.Error())
255+
}
256+
257+
for _, pod := range podsList.Items {
258+
if podUsesPVC(&pod, pvc.Name) {
259+
return true, nil
233260
}
234261
}
235262

236-
klog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
263+
klog.V(2).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
237264
return false, nil
238265
}
239266

240-
// pvcAddedUpdated reacts to pvc added/updated/deleted events
267+
func podUsesPVC(pod *v1.Pod, pvc string) bool {
268+
// Check whether pvc is used by pod only if pod is scheduled, because
269+
// kubelet sees pods after they have been scheduled and it won't allow
270+
// starting a pod referencing a PVC with a non-nil deletionTimestamp.
271+
if pod.Spec.NodeName != "" {
272+
for _, volume := range pod.Spec.Volumes {
273+
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc {
274+
klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc)
275+
return true
276+
}
277+
}
278+
}
279+
return false
280+
}
281+
282+
// pvcAddedUpdated reacts to pvc added/updated events
241283
func (c *Controller) pvcAddedUpdated(obj interface{}) {
242284
pvc, ok := obj.(*v1.PersistentVolumeClaim)
243285
if !ok {
244286
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
245287
return
246288
}
247-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
289+
key, err := cache.MetaNamespaceKeyFunc(pvc)
248290
if err != nil {
249291
utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err))
250292
return

pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,30 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF
147147
}
148148

149149
func TestPVCProtectionController(t *testing.T) {
150-
pvcVer := schema.GroupVersionResource{
150+
pvcGVR := schema.GroupVersionResource{
151151
Group: v1.GroupName,
152152
Version: "v1",
153153
Resource: "persistentvolumeclaims",
154154
}
155+
podGVR := schema.GroupVersionResource{
156+
Group: v1.GroupName,
157+
Version: "v1",
158+
Resource: "pods",
159+
}
160+
podGVK := schema.GroupVersionKind{
161+
Group: v1.GroupName,
162+
Version: "v1",
163+
Kind: "Pod",
164+
}
155165

156166
tests := []struct {
157167
name string
158168
// Object to insert into fake kubeclient before the test starts.
159169
initialObjects []runtime.Object
170+
// Whether not to insert the content of initialObjects into the
171+
// informers before the test starts. Set it to true to simulate the case
172+
// where informers have not been notified yet of certain API objects.
173+
informersAreLate bool
160174
// Optional client reactors.
161175
reactors []reaction
162176
// PVC event to simulate. This PVC will be automatically added to
@@ -180,7 +194,7 @@ func TestPVCProtectionController(t *testing.T) {
180194
name: "StorageObjectInUseProtection Enabled, PVC without finalizer -> finalizer is added",
181195
updatedPVC: pvc(),
182196
expectedActions: []clienttesting.Action{
183-
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
197+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
184198
},
185199
storageObjectInUseProtectionEnabled: true,
186200
},
@@ -208,27 +222,29 @@ func TestPVCProtectionController(t *testing.T) {
208222
},
209223
expectedActions: []clienttesting.Action{
210224
// This fails
211-
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
225+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
212226
// This fails too
213-
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
227+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
214228
// This succeeds
215-
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
229+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
216230
},
217231
storageObjectInUseProtectionEnabled: true,
218232
},
219233
{
220234
name: "StorageObjectInUseProtection Enabled, deleted PVC with finalizer -> finalizer is removed",
221235
updatedPVC: deleted(withProtectionFinalizer(pvc())),
222236
expectedActions: []clienttesting.Action{
223-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
237+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
238+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
224239
},
225240
storageObjectInUseProtectionEnabled: true,
226241
},
227242
{
228243
name: "StorageObjectInUseProtection Disabled, deleted PVC with finalizer -> finalizer is removed",
229244
updatedPVC: deleted(withProtectionFinalizer(pvc())),
230245
expectedActions: []clienttesting.Action{
231-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
246+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
247+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
232248
},
233249
storageObjectInUseProtectionEnabled: false,
234250
},
@@ -243,43 +259,59 @@ func TestPVCProtectionController(t *testing.T) {
243259
},
244260
},
245261
expectedActions: []clienttesting.Action{
262+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
246263
// Fails
247-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
264+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
265+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
248266
// Fails too
249-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
267+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
268+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
250269
// Succeeds
251-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
270+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
252271
},
253272
storageObjectInUseProtectionEnabled: true,
254273
},
255274
{
256-
name: "deleted PVC with finalizer + pods with the PVC exists -> finalizer is not removed",
275+
name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed",
257276
initialObjects: []runtime.Object{
258277
withPVC(defaultPVCName, pod()),
259278
},
260279
updatedPVC: deleted(withProtectionFinalizer(pvc())),
261280
expectedActions: []clienttesting.Action{},
262281
},
263282
{
264-
name: "deleted PVC with finalizer + pods with unrelated PVC and EmptyDir exists -> finalizer is removed",
283+
name: "deleted PVC with finalizer + pod with unrelated PVC and EmptyDir exists -> finalizer is removed",
265284
initialObjects: []runtime.Object{
266285
withEmptyDir(withPVC("unrelatedPVC", pod())),
267286
},
268287
updatedPVC: deleted(withProtectionFinalizer(pvc())),
269288
expectedActions: []clienttesting.Action{
270-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
289+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
290+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
271291
},
272292
storageObjectInUseProtectionEnabled: true,
273293
},
274294
{
275-
name: "deleted PVC with finalizer + pods with the PVC finished but is not deleted -> finalizer is not removed",
295+
name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed",
276296
initialObjects: []runtime.Object{
277297
withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
278298
},
279299
updatedPVC: deleted(withProtectionFinalizer(pvc())),
280300
expectedActions: []clienttesting.Action{},
281301
storageObjectInUseProtectionEnabled: true,
282302
},
303+
{
304+
name: "deleted PVC with finalizer + pod with the PVC exists but is not in the Informer's cache yet -> finalizer is not removed",
305+
initialObjects: []runtime.Object{
306+
withPVC(defaultPVCName, pod()),
307+
},
308+
informersAreLate: true,
309+
updatedPVC: deleted(withProtectionFinalizer(pvc())),
310+
expectedActions: []clienttesting.Action{
311+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
312+
},
313+
storageObjectInUseProtectionEnabled: true,
314+
},
283315
//
284316
// Pod events
285317
//
@@ -308,7 +340,8 @@ func TestPVCProtectionController(t *testing.T) {
308340
},
309341
updatedPod: unscheduled(withPVC(defaultPVCName, pod())),
310342
expectedActions: []clienttesting.Action{
311-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
343+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
344+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
312345
},
313346
storageObjectInUseProtectionEnabled: true,
314347
},
@@ -319,7 +352,8 @@ func TestPVCProtectionController(t *testing.T) {
319352
},
320353
deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())),
321354
expectedActions: []clienttesting.Action{
322-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
355+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
356+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
323357
},
324358
storageObjectInUseProtectionEnabled: true,
325359
},
@@ -331,7 +365,8 @@ func TestPVCProtectionController(t *testing.T) {
331365
deletedPod: withPVC(defaultPVCName, pod()),
332366
updatedPod: withUID("uid2", pod()),
333367
expectedActions: []clienttesting.Action{
334-
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
368+
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
369+
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
335370
},
336371
storageObjectInUseProtectionEnabled: true,
337372
},
@@ -368,15 +403,26 @@ func TestPVCProtectionController(t *testing.T) {
368403
}
369404

370405
for _, test := range tests {
371-
// Create client with initial data
372-
objs := test.initialObjects
406+
// Create initial data for client and informers.
407+
var (
408+
clientObjs []runtime.Object
409+
informersObjs []runtime.Object
410+
)
373411
if test.updatedPVC != nil {
374-
objs = append(objs, test.updatedPVC)
412+
clientObjs = append(clientObjs, test.updatedPVC)
413+
informersObjs = append(informersObjs, test.updatedPVC)
375414
}
376415
if test.updatedPod != nil {
377-
objs = append(objs, test.updatedPod)
416+
clientObjs = append(clientObjs, test.updatedPod)
417+
informersObjs = append(informersObjs, test.updatedPod)
418+
}
419+
clientObjs = append(clientObjs, test.initialObjects...)
420+
if !test.informersAreLate {
421+
informersObjs = append(informersObjs, test.initialObjects...)
378422
}
379-
client := fake.NewSimpleClientset(objs...)
423+
424+
// Create client with initial data
425+
client := fake.NewSimpleClientset(clientObjs...)
380426

381427
// Create informers
382428
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
@@ -385,7 +431,7 @@ func TestPVCProtectionController(t *testing.T) {
385431

386432
// Populate the informers with initial objects so the controller can
387433
// Get() and List() it.
388-
for _, obj := range objs {
434+
for _, obj := range informersObjs {
389435
switch obj.(type) {
390436
case *v1.PersistentVolumeClaim:
391437
pvcInformer.Informer().GetStore().Add(obj)
@@ -435,7 +481,7 @@ func TestPVCProtectionController(t *testing.T) {
435481
}
436482
currentActionCount := len(client.Actions())
437483
if currentActionCount < len(test.expectedActions) {
438-
// Do not log evey wait, only when the action count changes.
484+
// Do not log every wait, only when the action count changes.
439485
if lastReportedActionCount < currentActionCount {
440486
klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions))
441487
lastReportedActionCount = currentActionCount

0 commit comments

Comments
 (0)