Skip to content

Commit 1f8b1b8

Browse files
authored
Merge pull request kubernetes#102886 from gnufied/add-local-expansion
Add support for expanding local volumes
2 parents 81dca3d + 593eda4 commit 1f8b1b8

File tree

4 files changed

+288
-0
lines changed

4 files changed

+288
-0
lines changed

pkg/volume/local/local.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type localVolumePlugin struct {
5757
var _ volume.VolumePlugin = &localVolumePlugin{}
5858
var _ volume.PersistentVolumePlugin = &localVolumePlugin{}
5959
var _ volume.BlockVolumePlugin = &localVolumePlugin{}
60+
var _ volume.NodeExpandableVolumePlugin = &localVolumePlugin{}
6061

6162
const (
6263
localVolumePluginName = "kubernetes.io/local-volume"
@@ -376,6 +377,48 @@ func (dm *deviceMounter) MountDevice(spec *volume.Spec, devicePath string, devic
376377
}
377378
}
378379

380+
func (plugin *localVolumePlugin) RequiresFSResize() bool {
381+
return true
382+
}
383+
384+
func (plugin *localVolumePlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
385+
fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
386+
if err != nil {
387+
return false, fmt.Errorf("error checking VolumeMode: %v", err)
388+
}
389+
if !fsVolume {
390+
return true, nil
391+
}
392+
393+
localDevicePath := resizeOptions.VolumeSpec.PersistentVolume.Spec.Local.Path
394+
395+
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
396+
if !ok {
397+
return false, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
398+
}
399+
400+
fileType, err := kvh.GetHostUtil().GetFileType(localDevicePath)
401+
if err != nil {
402+
return false, err
403+
}
404+
405+
switch fileType {
406+
case hostutil.FileTypeBlockDev:
407+
_, err = util.GenericResizeFS(plugin.host, plugin.GetPluginName(), localDevicePath, resizeOptions.DeviceMountPath)
408+
if err != nil {
409+
return false, err
410+
}
411+
return true, nil
412+
case hostutil.FileTypeDirectory:
413+
// if the given local volume path is of already filesystem directory, return directly because
414+
// we do not want to prevent mount operation from succeeding.
415+
klog.InfoS("expansion of directory based local volumes is NO-OP", "local-volume-path", localDevicePath)
416+
return true, nil
417+
default:
418+
return false, fmt.Errorf("only directory and block device are supported")
419+
}
420+
}
421+
379422
func getVolumeSourceFSType(spec *volume.Spec) (string, error) {
380423
if spec.PersistentVolume != nil &&
381424
spec.PersistentVolume.Spec.Local != nil {

pkg/volume/local/local_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,37 @@ func getBlockPlugin(t *testing.T) (string, volume.BlockVolumePlugin) {
8383
return tmpDir, plug
8484
}
8585

86+
func getNodeExpandablePlugin(t *testing.T, isBlockDevice bool) (string, volume.NodeExpandableVolumePlugin) {
87+
tmpDir, err := utiltesting.MkTmpdir("localVolumeTest")
88+
if err != nil {
89+
t.Fatalf("can't make a temp dir: %v", err)
90+
}
91+
92+
plugMgr := volume.VolumePluginMgr{}
93+
var pathToFSType map[string]hostutil.FileType
94+
if isBlockDevice {
95+
pathToFSType = map[string]hostutil.FileType{
96+
tmpDir: hostutil.FileTypeBlockDev,
97+
}
98+
} else {
99+
pathToFSType = map[string]hostutil.FileType{
100+
tmpDir: hostutil.FileTypeDirectory,
101+
}
102+
}
103+
104+
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHostWithMounterFSType(t, tmpDir, nil, nil, pathToFSType))
105+
106+
plug, err := plugMgr.FindNodeExpandablePluginByName(localVolumePluginName)
107+
if err != nil {
108+
os.RemoveAll(tmpDir)
109+
t.Fatalf("Can't find the plugin by name")
110+
}
111+
if plug.GetPluginName() != localVolumePluginName {
112+
t.Errorf("Wrong name: %s", plug.GetPluginName())
113+
}
114+
return tmpDir, plug
115+
}
116+
86117
func getPersistentPlugin(t *testing.T) (string, volume.PersistentVolumePlugin) {
87118
tmpDir, err := utiltesting.MkTmpdir("localVolumeTest")
88119
if err != nil {
@@ -148,6 +179,9 @@ func getTestVolume(readOnly bool, path string, isBlock bool, mountOptions []stri
148179
if isBlock {
149180
blockMode := v1.PersistentVolumeBlock
150181
pv.Spec.VolumeMode = &blockMode
182+
} else {
183+
fsMode := v1.PersistentVolumeFilesystem
184+
pv.Spec.VolumeMode = &fsMode
151185
}
152186
return volume.NewSpecFromPersistentVolume(pv, readOnly)
153187
}
@@ -289,6 +323,28 @@ func TestFSGlobalPathAndMountDevice(t *testing.T) {
289323
}
290324
}
291325

326+
func TestNodeExpand(t *testing.T) {
327+
// FS global path testing
328+
tmpFSDir, plug := getNodeExpandablePlugin(t, false)
329+
defer os.RemoveAll(tmpFSDir)
330+
331+
pvSpec := getTestVolume(false, tmpFSDir, false, nil)
332+
333+
resizeOptions := volume.NodeResizeOptions{
334+
VolumeSpec: pvSpec,
335+
DevicePath: tmpFSDir,
336+
}
337+
338+
// Actually, we will do no volume expansion if volume is of type dir
339+
resizeDone, err := plug.NodeExpand(resizeOptions)
340+
if err != nil {
341+
t.Fatal(err)
342+
}
343+
if !resizeDone {
344+
t.Errorf("expected resize to be done")
345+
}
346+
}
347+
292348
func TestMountUnmount(t *testing.T) {
293349
tmpDir, plug := getPlugin(t)
294350
defer os.RemoveAll(tmpDir)
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package storage
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"path/filepath"
23+
"time"
24+
25+
v1 "k8s.io/api/core/v1"
26+
storagev1 "k8s.io/api/storage/v1"
27+
"k8s.io/apimachinery/pkg/api/resource"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
clientset "k8s.io/client-go/kubernetes"
30+
31+
"github.com/onsi/ginkgo"
32+
"github.com/onsi/gomega"
33+
"k8s.io/apimachinery/pkg/util/rand"
34+
"k8s.io/apimachinery/pkg/util/wait"
35+
"k8s.io/kubernetes/test/e2e/framework"
36+
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
37+
"k8s.io/kubernetes/test/e2e/storage/testsuites"
38+
"k8s.io/kubernetes/test/e2e/storage/utils"
39+
)
40+
41+
var _ = utils.SIGDescribe("PersistentVolumes-expansion ", func() {
42+
f := framework.NewDefaultFramework("persistent-local-volumes-expansion")
43+
ginkgo.Context("loopback local block volume", func() {
44+
var (
45+
config *localTestConfig
46+
scName string
47+
)
48+
49+
testVolType := BlockFsWithFormatLocalVolumeType
50+
var testVol *localTestVolume
51+
testMode := immediateMode
52+
ginkgo.BeforeEach(func() {
53+
nodes, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes)
54+
framework.ExpectNoError(err)
55+
56+
scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
57+
// Choose a random node
58+
randomNode := &nodes.Items[rand.Intn(len(nodes.Items))]
59+
60+
hostExec := utils.NewHostExec(f)
61+
ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
62+
config = &localTestConfig{
63+
ns: f.Namespace.Name,
64+
client: f.ClientSet,
65+
timeouts: f.Timeouts,
66+
nodes: nodes.Items,
67+
randomNode: randomNode,
68+
scName: scName,
69+
discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
70+
hostExec: hostExec,
71+
ltrMgr: ltrMgr,
72+
}
73+
74+
setupExpandableLocalStorageClass(config, &testMode)
75+
testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.randomNode, 1, testMode)
76+
testVol = testVols[0]
77+
})
78+
ginkgo.AfterEach(func() {
79+
cleanupLocalVolumes(config, []*localTestVolume{testVol})
80+
cleanupStorageClass(config)
81+
})
82+
83+
ginkgo.It("should support online expansion on node", func() {
84+
var (
85+
pod1 *v1.Pod
86+
pod1Err error
87+
)
88+
ginkgo.By("Creating pod1")
89+
pod1, pod1Err = createLocalPod(config, testVol, nil)
90+
framework.ExpectNoError(pod1Err)
91+
verifyLocalPod(config, testVol, pod1, config.randomNode.Name)
92+
93+
// We expand the PVC while l.pod is using it for online expansion.
94+
ginkgo.By("Expanding current pvc")
95+
currentPvcSize := testVol.pvc.Spec.Resources.Requests[v1.ResourceStorage]
96+
newSize := currentPvcSize.DeepCopy()
97+
newSize.Add(resource.MustParse("10Mi"))
98+
framework.Logf("currentPvcSize %s, newSize %s", currentPvcSize.String(), newSize.String())
99+
newPVC, err := testsuites.ExpandPVCSize(testVol.pvc, newSize, f.ClientSet)
100+
framework.ExpectNoError(err, "While updating pvc for more size")
101+
testVol.pvc = newPVC
102+
gomega.Expect(testVol.pvc).NotTo(gomega.BeNil())
103+
104+
pvcSize := testVol.pvc.Spec.Resources.Requests[v1.ResourceStorage]
105+
if pvcSize.Cmp(newSize) != 0 {
106+
framework.Failf("error updating pvc size %q", testVol.pvc.Name)
107+
}
108+
109+
// Now update the underlying volume manually
110+
err = config.ltrMgr.ExpandBlockDevice(testVol.ltr, 10 /*number of 1M blocks to add*/)
111+
framework.ExpectNoError(err, "while expanding loopback device")
112+
113+
// now update PV to matching size
114+
pv, err := UpdatePVSize(testVol.pv, newSize, f.ClientSet)
115+
framework.ExpectNoError(err, "while updating pv to more size")
116+
gomega.Expect(pv).NotTo(gomega.BeNil())
117+
testVol.pv = pv
118+
119+
ginkgo.By("Waiting for file system resize to finish")
120+
testVol.pvc, err = testsuites.WaitForFSResize(testVol.pvc, f.ClientSet)
121+
framework.ExpectNoError(err, "while waiting for fs resize to finish")
122+
123+
pvcConditions := testVol.pvc.Status.Conditions
124+
framework.ExpectEqual(len(pvcConditions), 0, "pvc should not have conditions")
125+
})
126+
127+
})
128+
129+
})
130+
131+
func UpdatePVSize(pv *v1.PersistentVolume, size resource.Quantity, c clientset.Interface) (*v1.PersistentVolume, error) {
132+
pvName := pv.Name
133+
pvToUpdate := pv.DeepCopy()
134+
135+
var lastError error
136+
waitErr := wait.PollImmediate(5*time.Second, csiResizeWaitPeriod, func() (bool, error) {
137+
var err error
138+
pvToUpdate, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
139+
if err != nil {
140+
return false, fmt.Errorf("error fetching pv %s: %v", pvName, err)
141+
}
142+
pvToUpdate.Spec.Capacity[v1.ResourceStorage] = size
143+
pvToUpdate, err = c.CoreV1().PersistentVolumes().Update(context.TODO(), pvToUpdate, metav1.UpdateOptions{})
144+
if err != nil {
145+
framework.Logf("error updating PV %s: %v", pvName, err)
146+
lastError = err
147+
return false, nil
148+
}
149+
return true, nil
150+
})
151+
if waitErr == wait.ErrWaitTimeout {
152+
return nil, fmt.Errorf("timed out attempting to update PV size. last update error: %v", lastError)
153+
}
154+
if waitErr != nil {
155+
return nil, fmt.Errorf("failed to expand PV size: %v", waitErr)
156+
}
157+
return pvToUpdate, nil
158+
}
159+
160+
func setupExpandableLocalStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) {
161+
enableExpansion := true
162+
sc := &storagev1.StorageClass{
163+
ObjectMeta: metav1.ObjectMeta{
164+
Name: config.scName,
165+
},
166+
Provisioner: "kubernetes.io/no-provisioner",
167+
VolumeBindingMode: mode,
168+
AllowVolumeExpansion: &enableExpansion,
169+
}
170+
171+
_, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
172+
framework.ExpectNoError(err)
173+
}

test/e2e/storage/utils/local.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type LocalTestResource struct {
7070
// LocalTestResourceManager represents interface to create/destroy local test resources on node
7171
type LocalTestResourceManager interface {
7272
Create(node *v1.Node, volumeType LocalVolumeType, parameters map[string]string) *LocalTestResource
73+
ExpandBlockDevice(ltr *LocalTestResource, mbToAdd int) error
7374
Remove(ltr *LocalTestResource)
7475
}
7576

@@ -289,6 +290,21 @@ func (l *ltrMgr) cleanupLocalVolumeGCELocalSSD(ltr *LocalTestResource) {
289290
framework.ExpectNoError(err)
290291
}
291292

293+
func (l *ltrMgr) expandLocalVolumeBlockFS(ltr *LocalTestResource, mbToAdd int) error {
294+
ddCmd := fmt.Sprintf("dd if=/dev/zero of=%s/file conv=notrunc oflag=append bs=1M count=%d", ltr.loopDir, mbToAdd)
295+
loopDev := l.findLoopDevice(ltr.loopDir, ltr.Node)
296+
losetupCmd := fmt.Sprintf("losetup -c %s", loopDev)
297+
return l.hostExec.IssueCommand(fmt.Sprintf("%s && %s", ddCmd, losetupCmd), ltr.Node)
298+
}
299+
300+
func (l *ltrMgr) ExpandBlockDevice(ltr *LocalTestResource, mbtoAdd int) error {
301+
switch ltr.VolumeType {
302+
case LocalVolumeBlockFS:
303+
return l.expandLocalVolumeBlockFS(ltr, mbtoAdd)
304+
}
305+
return fmt.Errorf("Failed to expand local test resource, unsupported volume type: %s", ltr.VolumeType)
306+
}
307+
292308
func (l *ltrMgr) Create(node *v1.Node, volumeType LocalVolumeType, parameters map[string]string) *LocalTestResource {
293309
var ltr *LocalTestResource
294310
switch volumeType {

0 commit comments

Comments
 (0)