Skip to content

Commit b8c0435

Browse files
committed
Handle volume-in-use error
1 parent d20c5ed commit b8c0435

File tree

9 files changed

+245
-41
lines changed

9 files changed

+245
-41
lines changed

pkg/kubelet/volumemanager/cache/actual_state_of_world.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ type attachedVolume struct {
265265
// deviceMountPath contains the path on the node where the device should
266266
// be mounted after it is attached.
267267
deviceMountPath string
268+
269+
// volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error
270+
// for this volume and volume expansion on this node should not be retried
271+
volumeInUseErrorForExpansion bool
268272
}
269273

270274
// The mountedPod object represents a pod for which the kubelet volume manager
@@ -381,6 +385,17 @@ func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeNam
381385
return volumeObj.deviceMountState
382386
}
383387

388+
func (asw *actualStateOfWorld) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) {
389+
asw.Lock()
390+
defer asw.Unlock()
391+
392+
volumeObj, ok := asw.attachedVolumes[volumeName]
393+
if ok {
394+
volumeObj.volumeInUseErrorForExpansion = true
395+
asw.attachedVolumes[volumeName] = volumeObj
396+
}
397+
}
398+
384399
func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState {
385400
asw.RLock()
386401
defer asw.RUnlock()
@@ -672,6 +687,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
672687
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
673688
}
674689
if podObj.fsResizeRequired &&
690+
!volumeObj.volumeInUseErrorForExpansion &&
675691
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
676692
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
677693
}

pkg/kubelet/volumemanager/reconciler/reconciler_test.go

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,29 +1001,58 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
10011001
fsMode := v1.PersistentVolumeFilesystem
10021002

10031003
var tests = []struct {
1004-
name string
1005-
volumeMode *v1.PersistentVolumeMode
1004+
name string
1005+
volumeMode *v1.PersistentVolumeMode
1006+
expansionFailed bool
1007+
pvName string
1008+
pvcSize resource.Quantity
1009+
pvcStatusSize resource.Quantity
1010+
oldPVSize resource.Quantity
1011+
newPVSize resource.Quantity
10061012
}{
10071013
{
1008-
name: "expand-fs-volume",
1009-
volumeMode: &fsMode,
1014+
name: "expand-fs-volume",
1015+
volumeMode: &fsMode,
1016+
pvName: "pv",
1017+
pvcSize: resource.MustParse("10G"),
1018+
pvcStatusSize: resource.MustParse("10G"),
1019+
newPVSize: resource.MustParse("15G"),
1020+
oldPVSize: resource.MustParse("10G"),
10101021
},
10111022
{
1012-
name: "expand-raw-block",
1013-
volumeMode: &blockMode,
1023+
name: "expand-raw-block",
1024+
volumeMode: &blockMode,
1025+
pvName: "pv",
1026+
pvcSize: resource.MustParse("10G"),
1027+
pvcStatusSize: resource.MustParse("10G"),
1028+
newPVSize: resource.MustParse("15G"),
1029+
oldPVSize: resource.MustParse("10G"),
1030+
},
1031+
{
1032+
name: "expand-fs-volume with in-use error",
1033+
volumeMode: &fsMode,
1034+
expansionFailed: true,
1035+
pvName: volumetesting.FailWithInUseVolumeName,
1036+
pvcSize: resource.MustParse("10G"),
1037+
pvcStatusSize: resource.MustParse("10G"),
1038+
newPVSize: resource.MustParse("15G"),
1039+
oldPVSize: resource.MustParse("13G"),
10141040
},
10151041
}
10161042

10171043
for _, tc := range tests {
10181044
t.Run(tc.name, func(t *testing.T) {
10191045
pv := &v1.PersistentVolume{
10201046
ObjectMeta: metav1.ObjectMeta{
1021-
Name: "pv",
1047+
Name: tc.pvName,
10221048
UID: "pvuid",
10231049
},
10241050
Spec: v1.PersistentVolumeSpec{
10251051
ClaimRef: &v1.ObjectReference{Name: "pvc"},
10261052
VolumeMode: tc.volumeMode,
1053+
Capacity: v1.ResourceList{
1054+
v1.ResourceStorage: tc.oldPVSize,
1055+
},
10271056
},
10281057
}
10291058
pvc := &v1.PersistentVolumeClaim{
@@ -1032,9 +1061,19 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
10321061
UID: "pvcuid",
10331062
},
10341063
Spec: v1.PersistentVolumeClaimSpec{
1064+
Resources: v1.ResourceRequirements{
1065+
Requests: v1.ResourceList{
1066+
v1.ResourceStorage: tc.pvcSize,
1067+
},
1068+
},
10351069
VolumeName: "pv",
10361070
VolumeMode: tc.volumeMode,
10371071
},
1072+
Status: v1.PersistentVolumeClaimStatus{
1073+
Capacity: v1.ResourceList{
1074+
v1.ResourceStorage: tc.pvcStatusSize,
1075+
},
1076+
},
10381077
}
10391078
pod := &v1.Pod{
10401079
ObjectMeta: metav1.ObjectMeta{
@@ -1058,7 +1097,10 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
10581097
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
10591098
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
10601099
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1061-
kubeClient := createtestClientWithPVPVC(pv, pvc)
1100+
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
1101+
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
1102+
DevicePath: "fake/path",
1103+
})
10621104
fakeRecorder := &record.FakeRecorder{}
10631105
fakeHandler := volumetesting.NewBlockVolumePathHandler()
10641106
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
@@ -1104,24 +1146,36 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
11041146
close(stopChan)
11051147
<-stoppedChan
11061148

1107-
// Mark volume as fsResizeRequired.
1149+
// Simulate what DSOWP does
1150+
pv.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
1151+
volumeSpec = &volume.Spec{PersistentVolume: pv}
1152+
dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
1153+
// mark volume as resize required
11081154
asw.MarkFSResizeRequired(volumeName, podName)
1155+
11091156
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
1110-
if !cache.IsFSResizeRequiredError(podExistErr) {
1111-
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
1112-
}
1157+
if tc.expansionFailed {
1158+
if cache.IsFSResizeRequiredError(podExistErr) {
1159+
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
1160+
}
1161+
} else {
1162+
if !cache.IsFSResizeRequiredError(podExistErr) {
1163+
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
1164+
}
11131165

1114-
// Start the reconciler again, we hope reconciler will perform the
1115-
// resize operation and clear the fsResizeRequired flag for volume.
1116-
go reconciler.Run(wait.NeverStop)
1166+
// Start the reconciler again, we hope reconciler will perform the
1167+
// resize operation and clear the fsResizeRequired flag for volume.
1168+
go reconciler.Run(wait.NeverStop)
11171169

1118-
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
1119-
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
1120-
return mounted && err == nil, nil
1121-
})
1122-
if waitErr != nil {
1123-
t.Fatal("Volume resize should succeeded")
1170+
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
1171+
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
1172+
return mounted && err == nil, nil
1173+
})
1174+
if waitErr != nil {
1175+
t.Fatal("Volume resize should succeeded")
1176+
}
11241177
}
1178+
11251179
})
11261180
}
11271181
}
@@ -1653,6 +1707,12 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
16531707
fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
16541708
return true, pv, nil
16551709
})
1710+
fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
1711+
if action.GetSubresource() == "status" {
1712+
return true, pvc, nil
1713+
}
1714+
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
1715+
})
16561716
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
16571717
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
16581718
})

pkg/volume/csi/expander.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import (
2121
"errors"
2222
"fmt"
2323

24+
"google.golang.org/grpc/codes"
25+
"google.golang.org/grpc/status"
2426
api "k8s.io/api/core/v1"
2527
utilfeature "k8s.io/apiserver/pkg/util/feature"
2628
"k8s.io/klog/v2"
2729
"k8s.io/kubernetes/pkg/features"
2830
"k8s.io/kubernetes/pkg/volume"
2931
"k8s.io/kubernetes/pkg/volume/util"
32+
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
3033
)
3134

3235
var _ volume.NodeExpandableVolumePlugin = &csiPlugin{}
@@ -123,7 +126,26 @@ func (c *csiPlugin) nodeExpandWithClient(
123126

124127
_, err = csClient.NodeExpandVolume(ctx, opts)
125128
if err != nil {
126-
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err)
129+
if inUseError(err) {
130+
failedConditionErr := fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", volumetypes.NewFailedPreconditionError(err.Error()))
131+
return false, failedConditionErr
132+
}
133+
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", err)
127134
}
128135
return true, nil
129136
}
137+
138+
func inUseError(err error) bool {
139+
st, ok := status.FromError(err)
140+
if !ok {
141+
// not a grpc error
142+
return false
143+
}
144+
// if this is a failed precondition error then that means driver does not support expansion
145+
// of in-use volumes
146+
// More info - https://github.com/container-storage-interface/spec/blob/master/spec.md#controllerexpandvolume-errors
147+
if st.Code() == codes.FailedPrecondition {
148+
return true
149+
}
150+
return false
151+
}

pkg/volume/csi/expander_test.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@ import (
2020
"os"
2121
"testing"
2222

23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2325
"k8s.io/apimachinery/pkg/api/resource"
2426
"k8s.io/kubernetes/pkg/volume"
27+
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
2528
)
2629

2730
func TestNodeExpand(t *testing.T) {
2831
tests := []struct {
29-
name string
30-
nodeExpansion bool
31-
nodeStageSet bool
32-
volumePhase volume.CSIVolumePhaseType
33-
success bool
34-
fsVolume bool
35-
deviceStagePath string
32+
name string
33+
nodeExpansion bool
34+
nodeStageSet bool
35+
volumePhase volume.CSIVolumePhaseType
36+
success bool
37+
fsVolume bool
38+
grpcError error
39+
hasVolumeInUseError bool
40+
deviceStagePath string
3641
}{
3742
{
3843
name: "when node expansion is not set",
@@ -76,6 +81,26 @@ func TestNodeExpand(t *testing.T) {
7681
success: true,
7782
fsVolume: false,
7883
},
84+
{
85+
name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has grpc volume-in-use error",
86+
nodeExpansion: true,
87+
nodeStageSet: true,
88+
volumePhase: volume.CSIVolumePublished,
89+
success: false,
90+
fsVolume: true,
91+
grpcError: status.Error(codes.FailedPrecondition, "volume-in-use"),
92+
hasVolumeInUseError: true,
93+
},
94+
{
95+
name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has other grpc error",
96+
nodeExpansion: true,
97+
nodeStageSet: true,
98+
volumePhase: volume.CSIVolumePublished,
99+
success: false,
100+
fsVolume: true,
101+
grpcError: status.Error(codes.InvalidArgument, "invalid-argument"),
102+
hasVolumeInUseError: false,
103+
},
79104
}
80105
for _, tc := range tests {
81106
t.Run(tc.name, func(t *testing.T) {
@@ -99,8 +124,19 @@ func TestNodeExpand(t *testing.T) {
99124

100125
fakeCSIClient, _ := csClient.(*fakeCsiDriverClient)
101126
fakeNodeClient := fakeCSIClient.nodeClient
127+
128+
if tc.grpcError != nil {
129+
fakeNodeClient.SetNextError(tc.grpcError)
130+
}
131+
102132
ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient, tc.fsVolume)
103133

134+
if tc.hasVolumeInUseError {
135+
if !volumetypes.IsFailedPreconditionError(err) {
136+
t.Errorf("expected failed precondition error got: %v", err)
137+
}
138+
}
139+
104140
// verify device staging targer path
105141
stagingTargetPath := fakeNodeClient.FakeNodeExpansionRequest.GetStagingTargetPath()
106142
if tc.deviceStagePath != "" && tc.deviceStagePath != stagingTargetPath {

pkg/volume/testing/testing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ const (
8989
// SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail
9090
SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name"
9191

92+
// FailWithInUseVolumeName will cause NodeExpandVolume to result in FailedPrecondition error
93+
FailWithInUseVolumeName = "fail-expansion-in-use"
94+
9295
deviceNotMounted = "deviceNotMounted"
9396
deviceMountUncertain = "deviceMountUncertain"
9497
deviceMounted = "deviceMounted"
@@ -658,6 +661,9 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
658661
}
659662

660663
func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) {
664+
if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName {
665+
return false, volumetypes.NewFailedPreconditionError("volume-in-use")
666+
}
661667
return true, nil
662668
}
663669

pkg/volume/util/operationexecutor/operation_executor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ type ActualStateOfWorldMounterUpdater interface {
202202

203203
// GetVolumeMountState returns mount state of the volume for the Pod
204204
GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
205+
206+
// MarkForInUseExpansionError marks the volume to have in-use error during expansion.
207+
// volume expansion must not be retried for this volume
208+
MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
205209
}
206210

207211
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the

0 commit comments

Comments
 (0)