Skip to content

Commit a42d3d3

Browse files
committed
Update caching logic and add E2E tests
1 parent f61a8ab commit a42d3d3

File tree

16 files changed

+350
-156
lines changed

16 files changed

+350
-156
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ COPY --from=debian /sbin/blkdeactivate /sbin/blkdeactivate
8181
COPY --from=debian /sbin/dmsetup /sbin/dmsetup
8282
COPY --from=debian /sbin/dmstats /sbin/dmstats
8383
# End of dependencies for LVM
84+
COPY --from=debian /sbin/mdadm /sbin/mdadm
8485
COPY --from=debian /sbin/mke2fs /sbin/mke2fs
8586
COPY --from=debian /sbin/mkfs* /sbin/
8687
COPY --from=debian /sbin/resize2fs /sbin/resize2fs

cmd/gce-pd-csi-driver/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ func handle() {
234234
}
235235
}
236236

237+
if *enableDataCacheFlag {
238+
klog.V(2).Info("Raiding local ssd")
239+
driver.RaidLocalSsds()
240+
}
237241
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
238242
if err != nil {
239243
klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())
@@ -251,7 +255,6 @@ func handle() {
251255
gce.WaitForOpBackoff.Steps = *waitForOpBackoffSteps
252256
gce.WaitForOpBackoff.Cap = *waitForOpBackoffCap
253257

254-
driver.RaidLocalSsds()
255258
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing)
256259
}
257260

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ imageTag:
5151
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
5252
# Don't change stable image without changing pdImagePlaceholder in
5353
# test/k8s-integration/main.go
54-
newName: gcr.io/snehaaradhey-joonix/gcp-compute-persistent-disk-csi-driver
55-
newTag: "datacache"
54+
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
55+
newTag: "v1.13.2"
5656
---

pkg/common/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ const (
3636
// Data cache mode
3737
DataCacheModeWriteBack = "writeback"
3838
DataCacheModeWriteThrough = "writethrough"
39+
40+
ContextDataCacheSize = "data-cache-size"
41+
ContextDataCacheMode = "data-cache-mode"
42+
43+
// Keys in the publish context
44+
ContexLocalSsdCacheSize = "local-ssd-cache-size"
3945
)

pkg/common/runcmd.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"fmt"
55
"os/exec"
66
"strings"
7-
8-
"k8s.io/klog/v2"
97
)
108

119
const (
@@ -16,9 +14,7 @@ const (
1614
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1715
// On error, the output is included so callers don't need to echo it again.
1816
func RunCommand(cmd string, args ...string) ([]byte, error) {
19-
klog.V(2).Infof("====== Start RunCommand ======")
2017
execCmd := exec.Command(cmd, args...)
21-
klog.V(2).Infof("=======Running command %v==========", execCmd)
2218
output, err := execCmd.CombinedOutput()
2319
if err != nil {
2420
if err.Error() == errNoChildProcesses {

pkg/gce-pd-csi-driver/cache.go

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@ import (
1212
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
1313
)
1414

15-
const cacheSuffix = "csi-fast"
16-
const mainLvSuffix = "csi-main"
17-
const raidedLocalSsdName = "csi-driver-data-cache"
18-
const raidMode = "0"
19-
const raidedLssdPrefix = "/dev/md/"
15+
const (
16+
cacheSuffix = "csi-fast"
17+
mainLvSuffix = "csi-main"
18+
raidedLocalSsdName = "csi-driver-data-cache"
19+
raidMode = "0"
20+
raidedLssdPrefix = "/dev/md/"
21+
raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName
22+
)
2023

2124
func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId string) (string, error) {
2225
volumeId := req.GetVolumeId()
@@ -32,7 +35,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
3235
if err != nil {
3336
klog.Errorf("vgscan error %v: %s", err, info)
3437
}
35-
klog.V(2).Infof("====== vgscan info %v ======", string(info))
3638
klog.V(2).Infof("====== vgscan info contains volumeGroupName or not %v ======", strings.Contains(string(info), volumeGroupName))
3739
// Check if the required volume group already exists
3840
if strings.Contains(string(info), volumeGroupName) {
@@ -41,28 +43,12 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
4143
// Clean up Volume Group before adding the PD
4244
reduceVolumeGroup(volumeGroupName, true)
4345
} else {
44-
err := createVg(volumeGroupName, devicePath)
46+
err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath)
4547
if err != nil {
4648
return mainDevicePath, err
4749
}
48-
// VG doesn't exist so throw error
49-
klog.Errorf("No VG on Node, ensure Volume group on node")
5050
}
5151

52-
// Verify if the PD is already in the required volume group
53-
// args = []string{
54-
// "--select",
55-
// "vg_name",
56-
// volumeGroupName
57-
// }
58-
// vgPvs, err := common.RunCommand("pvs", args...)
59-
// if err != nil {
60-
// klog.Errorf("Errored while checking Physical Volume(PV)s for the volume group %v: %s", err, vgPvs)
61-
// }
62-
// if strings.Contains(string(vgPvs), devicePath) {
63-
// klog.V(2).Infof("====Physical Volume(PV) already exists in the Volume Group=====")
64-
// } else {
65-
6652
// Check if the Physical Volume(PV) is part of some other volume group
6753
args = []string{
6854
"--select",
@@ -82,7 +68,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
8268
infoString = strings.ReplaceAll(infoString, ".", "")
8369
infoString = strings.ReplaceAll(infoString, "\"", "")
8470
infoSlice := strings.Split(strings.TrimSpace(infoString), " ")
85-
klog.V(2).Info("============ infoSlice %s =============", infoSlice[(len(infoSlice)-1)], infoSlice)
8671
vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)])
8772
klog.V(2).Infof("============ Physical volume is part of Volume group: %v=======", vgNameForPv)
8873
if vgNameForPv == volumeGroupName {
@@ -94,10 +79,13 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
9479
if err != nil {
9580
klog.Errorf("Errored while deactivating VG %v: err: %v: %s", vgNameForPv, err, info)
9681
}
82+
83+
reduceVolumeGroup(vgNameForPv, false)
9784
// Uncache LV
9885
args = []string{
9986
"--uncache",
10087
vgNameForPv + "/" + mainLvName,
88+
"--force",
10189
}
10290
info, err = common.RunCommand("lvconvert", args...)
10391
if err != nil {
@@ -112,12 +100,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
112100
klog.Errorf("Errored while merging Volume group %s into %s %v: %s", vgNameForPv, volumeGroupName, err, info)
113101
}
114102

115-
klog.V(2).Infof("==========Remove VG from node %v ==========", vgNameForPv)
116-
info, err = common.RunCommand("vgremove", []string{vgNameForPv, "-y"}...)
117-
if err != nil {
118-
klog.Errorf("Errored while removing Volume group %s: info:%s, error:%v", vgNameForPv, err, info)
119-
}
120-
121103
} else {
122104
klog.V(2).Infof("==========Extend Node VG %v for PV %v==========", volumeGroupName, devicePath)
123105
info, err := common.RunCommand("vgextend", []string{volumeGroupName, devicePath}...)
@@ -135,7 +117,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
135117
}
136118
lvList, err := common.RunCommand("lvs", args...)
137119
if err != nil {
138-
klog.V(2).Infof("====== lvs error %v: %s ======", err, info)
139120
return mainDevicePath, fmt.Errorf("lv list error %w: %s", err, info)
140121
}
141122
klog.Info("=============== Got LVs %s on Volume group %s ============", string(lvList), volumeGroupName)
@@ -153,13 +134,11 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
153134
}
154135
info, err = common.RunCommand("lvcreate", args...)
155136
if err != nil {
156-
klog.V(2).Infof("====== lvcreate error %v: %s ======", err, info)
157137
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
158138
}
159139

160140
}
161-
// Replace this with RAIDed Local SSD
162-
cachePoolName := "dev/nvme0n1"
141+
cachePoolName := raidedLocalSsdPath
163142
// Verify caching is setup for PD
164143
args = []string{
165144
"--select",
@@ -169,17 +148,16 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
169148
}
170149
poolName, err := common.RunCommand("lvs", args...)
171150
if err != nil {
172-
klog.V(2).Infof("====== lvcreate error %v: %s ======", err, info)
173151
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
174152
}
175153
cacheLvName := getLvName(cacheSuffix, volumeId)
176154
if strings.Contains(string(poolName), "csi-fast") {
177155
// Validate that cache is setup for required size
178156
klog.V(2).Infof("================Validate Cache is setup for correct size and mode===============")
179157
} else {
180-
fastCacheSize := req.GetPublishContext()[contexLocalSsdCacheSize] + "Gi"
158+
fastCacheSize := req.GetPublishContext()[common.ContexLocalSsdCacheSize]
159+
chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB
181160
klog.V(2).Infof("====== fastCacheSize is %v ======", fastCacheSize)
182-
// lvcreate -n fast -L 50G cachegroup /dev/nvme0n1
183161
klog.V(2).Infof("====== lvcreate fast cache layer again with the VolumeGroup %v======", volumeGroupName)
184162
args = []string{
185163
"--yes",
@@ -197,7 +175,6 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
197175
}
198176

199177
// Once caching is setup, link the PD to cache
200-
// lvconvert --type cache --cachevol fast --zero y --cachemode writethrough cachegroup/main --force -y
201178
klog.V(2).Infof("====== lvconvert fast and main to cache ======")
202179
args = []string{
203180
"--type",
@@ -207,8 +184,10 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
207184
"--zero",
208185
"y",
209186
"--cachemode",
210-
req.GetPublishContext()[contextDataCacheMode],
187+
req.GetPublishContext()[common.ContextDataCacheMode],
211188
volumeGroupName + "/" + mainLvName,
189+
"--chunksize",
190+
string(chunkSize),
212191
"--force",
213192
"-y",
214193
}
@@ -247,7 +226,6 @@ func cleanupCache(volumeId string, nodeId string) error {
247226
}
248227
info, err = common.RunCommand("lvconvert", args...)
249228
if err != nil {
250-
klog.Errorf("Errored while uncaching the disk %v: %s", err, info)
251229
return fmt.Errorf("errored while uncaching the disk %w: %s", err, info)
252230
}
253231
return nil
@@ -265,14 +243,14 @@ func getLvName(suffix string, volumeId string) string {
265243
return fmt.Sprintf("%s-%s", suffix, pvcName)
266244
}
267245

268-
func createVg(volumeGroupName string, devicePath string) error {
246+
func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error {
269247
// No existing volume group
270248
klog.V(2).Infof("====== vgcreate ======")
271249
args := []string{
272250
"--zero",
273251
"y",
274252
volumeGroupName,
275-
"/dev/nvme0n1",
253+
raidedLocalSsds,
276254
}
277255
info, err := common.RunCommand("vgcreate", args...)
278256
if err != nil {
@@ -307,48 +285,56 @@ func reduceVolumeGroup(volumeGroupName string, force bool) {
307285
}
308286

309287
func RaidLocalSsds() error {
310-
isRaided, err := isRaided()
288+
isAlreadyRaided, err := isRaided()
311289
if err != nil {
312290
klog.V(2).Info("======Errored while scanning for available LocalSSDs err:%v; continuing Raiding=======", err)
313-
} else if isRaided {
291+
} else if isAlreadyRaided {
314292
klog.V(2).Infof("===============Local SSDs are already RAIDed==============")
315293
return nil
316294
}
317-
info, err := common.RunCommand("nvme", []string{"list"}...)
295+
info, err := common.RunCommand("nvme", []string{"list", "-o", "json"}...)
318296
if err != nil {
319-
klog.Errorf("nvme list error %v: %s", err, info)
320297
return fmt.Errorf("errored while scanning available NVME disks info: %v; err:%v", info, err)
321298
}
322-
klog.V(2).Infof("==========NVME list %v========", string(info))
323299
infoString := strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " "))
300+
klog.V(2).Infof("=============NVME list %v ============", infoString)
324301
infoString = strings.ReplaceAll(infoString, "\"", "")
325-
infoSlice := strings.Split(strings.TrimSpace(infoString), " ")
302+
infoString = strings.ReplaceAll(infoString, " :", ":")
303+
infoString = strings.ReplaceAll(infoString, ": ", ":")
304+
infoString = strings.ReplaceAll(infoString, ",", " ")
305+
infoSlice := strings.Split(infoString, " ")
326306
diskList := []string{}
327307
for _, diskInfo := range infoSlice {
328308
diskName := strings.TrimSpace(diskInfo)
329-
if strings.HasPrefix(diskName, "/dev/n") {
309+
if strings.Contains(diskName, "DevicePath") {
310+
diskName := strings.TrimSpace(strings.Split(diskName, ":")[1])
330311
diskList = append(diskList, diskName)
331312
}
332313
}
333314
nvmeDiskCount := len(diskList)
334315
nvmeDiskList := strings.Join(diskList, " ")
335-
klog.V(2).Infof("========= nvmeDiskCount %v; nvmeDislList: %v ================", nvmeDiskCount, nvmeDiskList)
316+
klog.V(2).Infof("========= nvmeDiskCount %v; nvmeDiskList: %v; diskList %v================", nvmeDiskCount, nvmeDiskList, diskList)
336317
args := []string{
337318
"--create",
338319
raidedLssdPrefix + raidedLocalSsdName,
339-
"-l=" + raidMode,
320+
"-l" + raidMode,
340321
// Force RAIDing as sometime it might fail for caution if there is just 1 LSSD present as 1 LSSD need not be RAIDed
341322
"--force",
342323
"-n",
343324
strconv.Itoa(nvmeDiskCount),
344-
nvmeDiskList,
345325
}
326+
args = append(args, diskList...)
346327
info, err = common.RunCommand("mdadm", args...)
347328
if err != nil {
348-
klog.Errorf("Errored while RAIDing LSSDs %v: %s", err, info)
349329
return fmt.Errorf("errored while RAIDing LSSDs info: %v; err:%v", info, err)
350-
} else {
351-
klog.V(2).Infof("========RAIDed Local SSDs in mode 0===========")
330+
}
331+
// Validate if Raided successfully
332+
isAlreadyRaided, err = isRaided()
333+
if err != nil {
334+
klog.V(2).Info("======Errored while scanning for available raided LocalSSDs err:%v=======", err)
335+
}
336+
if !isAlreadyRaided {
337+
return fmt.Errorf("failed raiding, raided device not found on scanning")
352338
}
353339
return nil
354340
}
@@ -363,7 +349,7 @@ func isRaided() (bool, error) {
363349
return false, fmt.Errorf("errored while scanning for raided LSSD %v: %s", err, info)
364350
}
365351
klog.V(2).Infof("=========== Got LSSDs %v===========", string(info))
366-
if strings.Contains(string(info), raidedLssdPrefix+raidedLocalSsdName) {
352+
if info != nil && strings.Contains(string(info), raidedLocalSsdName) {
367353
return true, nil
368354
}
369355
return false, nil

pkg/gce-pd-csi-driver/controller.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ import (
4343
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
4444
)
4545

46+
var (
47+
// Keys in the volume context.
48+
contextForceAttach = "force-attach"
49+
)
50+
4651
type GCEControllerServer struct {
4752
Driver *GCEDriver
4853
CloudProvider gce.GCECompute
@@ -190,14 +195,6 @@ const (
190195
// See https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h#L503)
191196
maxListVolumesResponseEntries = 500
192197

193-
// Keys in the volume context.
194-
contextForceAttach = "force-attach"
195-
contextDataCacheSize = "data-cache-size"
196-
contextDataCacheMode = "data-cache-mode"
197-
198-
// Keys in the publish context
199-
contexLocalSsdCacheSize = "local-ssd-cache-size"
200-
201198
resourceApiScheme = "https"
202199
resourceApiService = "compute"
203200
resourceProject = "projects"
@@ -690,10 +687,10 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
690687
klog.V(2).Infof("====== ControllerPublishVolume VolumeContext is %v ======", req.GetVolumeContext())
691688
// Set data cache publish context
692689
if gceCS.enableDataCache && req.GetVolumeContext() != nil {
693-
if req.GetVolumeContext()[contextDataCacheSize] != "" {
690+
if req.GetVolumeContext()[common.ContextDataCacheSize] != "" {
694691
pubVolResp.PublishContext = map[string]string{}
695-
pubVolResp.PublishContext[contexLocalSsdCacheSize] = req.GetVolumeContext()[contextDataCacheSize]
696-
pubVolResp.PublishContext[contextDataCacheMode] = req.GetVolumeContext()[contextDataCacheMode]
692+
pubVolResp.PublishContext[common.ContexLocalSsdCacheSize] = req.GetVolumeContext()[common.ContextDataCacheSize]
693+
pubVolResp.PublishContext[common.ContextDataCacheMode] = req.GetVolumeContext()[common.ContextDataCacheMode]
697694
}
698695
}
699696

@@ -2000,8 +1997,8 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co
20001997
if createResp.Volume.VolumeContext == nil {
20011998
createResp.Volume.VolumeContext = map[string]string{}
20021999
}
2003-
createResp.Volume.VolumeContext[contextDataCacheMode] = dataCacheParams.DataCacheMode
2004-
createResp.Volume.VolumeContext[contextDataCacheSize] = dataCacheParams.DataCacheSize
2000+
createResp.Volume.VolumeContext[common.ContextDataCacheMode] = dataCacheParams.DataCacheMode
2001+
createResp.Volume.VolumeContext[common.ContextDataCacheSize] = dataCacheParams.DataCacheSize
20052002
}
20062003
snapshotID := disk.GetSnapshotId()
20072004
imageID := disk.GetImageId()

0 commit comments

Comments
 (0)