Skip to content

Commit a27526b

Browse files
committed
Fix LSSD raiding issue
1 parent 8b9242f commit a27526b

File tree

6 files changed

+109
-61
lines changed

6 files changed

+109
-61
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ COPY --from=debian /lib/udev/rules.d/95-dm-notify.rules /lib/udev/rules.d/95-dm-
8080
COPY --from=debian /sbin/blkdeactivate /sbin/blkdeactivate
8181
COPY --from=debian /sbin/dmsetup /sbin/dmsetup
8282
COPY --from=debian /sbin/dmstats /sbin/dmstats
83+
COPY --from=debian /bin/ls /bin/ls
8384
# End of dependencies for LVM
8485
COPY --from=debian /sbin/mdadm /sbin/mdadm
8586
COPY --from=debian /sbin/mke2fs /sbin/mke2fs

pkg/common/parameters.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func ExtractAndDefaultParameters(parameters map[string]string, driverName string
242242
p.StoragePools = storagePools
243243
case ParameterKeyDataCacheSize:
244244
if !enableDataCache {
245-
return p, d, fmt.Errorf("parameters contains invalid option %q", ParameterKeyDataCacheSize)
245+
return p, d, fmt.Errorf("data caching enabled: %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
246246
}
247247
// TODO: need to parse or validate the string
248248

@@ -254,7 +254,7 @@ func ExtractAndDefaultParameters(parameters map[string]string, driverName string
254254
klog.V(2).Infof("====== Data cache size is %v ======", v)
255255
case ParameterKeyDataCacheMode:
256256
if !enableDataCache {
257-
return p, d, fmt.Errorf("parameters contains invalid option %q", ParameterKeyDataCacheSize)
257+
return p, d, fmt.Errorf("data caching enabled %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
258258
}
259259
if err := ValidateDataCacheMode(v); err != nil {
260260
return p, d, fmt.Errorf("parameters contains invalid option: %w", err)

pkg/common/runcmd.go

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"fmt"
55
"os/exec"
66
"strings"
7+
8+
"k8s.io/klog/v2"
79
)
810

911
const (
@@ -13,19 +15,57 @@ const (
1315

1416
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1517
// On error, the output is included so callers don't need to echo it again.
16-
func RunCommand(cmd string, args ...string) ([]byte, error) {
17-
execCmd := exec.Command(cmd, args...)
18-
output, err := execCmd.CombinedOutput()
18+
19+
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
20+
execCmd1 := exec.Command(cmd1, execCmdArgs...)
21+
22+
if pipeCmd != "" {
23+
output, err := execPipeCommand(pipeCmd, pipeCmdArg, execCmd1)
24+
if err != nil {
25+
return nil, fmt.Errorf("%s %s failed here: %w; output: %s", pipeCmd, pipeCmdArg, err, string(output))
26+
}
27+
return output, nil
28+
}
29+
output, err := execCmd1.CombinedOutput()
1930
if err != nil {
20-
if err.Error() == errNoChildProcesses {
21-
if execCmd.ProcessState.Success() {
22-
// If the process succeeded, this can be ignored, see k/k issue #103753
23-
return output, nil
24-
}
25-
// Get actual error
26-
err = &exec.ExitError{ProcessState: execCmd.ProcessState}
31+
err = checkError(err, *execCmd1)
32+
return nil, fmt.Errorf("%s %s failed here 2: %w; output: %s", cmd1, strings.Join(execCmdArgs, " "), err, string(output))
33+
}
34+
35+
return output, nil
36+
}
37+
38+
func checkError(err error, execCmd exec.Cmd) error {
39+
if err.Error() == errNoChildProcesses {
40+
if execCmd.ProcessState.Success() {
41+
// If the process succeeded, this can be ignored, see k/k issue #103753
42+
return nil
2743
}
28-
return output, fmt.Errorf("%s %s failed: %w; output: %s", cmd, strings.Join(args, " "), err, string(output))
44+
// Get actual error
45+
klog.Infof("Errored here")
46+
err = &exec.ExitError{ProcessState: execCmd.ProcessState}
2947
}
48+
return err
49+
}
50+
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
51+
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
53+
stdoutPipe, err := execCmd1.StdoutPipe()
54+
if err != nil {
55+
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
56+
}
57+
err = execCmd1.Start()
58+
if err != nil {
59+
klog.Infof("errored running command %v; error %v; ", execCmd1, err)
60+
}
61+
defer stdoutPipe.Close()
62+
63+
execPipeCmd.Stdin = stdoutPipe
64+
output, err := execPipeCmd.CombinedOutput()
65+
if err != nil {
66+
err = checkError(err, *execPipeCmd)
67+
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
68+
}
69+
3070
return output, nil
3171
}

pkg/gce-pd-csi-driver/cache.go

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ const (
1818
raidedLocalSsdName = "csi-driver-data-cache"
1919
raidMode = "0"
2020
raidedLssdPrefix = "/dev/md/"
21-
raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName
2221
)
2322

23+
var raidedLocalSsdPath = raidedLssdPrefix + raidedLocalSsdName
24+
2425
func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId string) (string, error) {
2526
volumeId := req.GetVolumeId()
2627
volumeGroupName := getVolumeGroupName(nodeId)
@@ -29,9 +30,17 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
2930
klog.V(2).Infof("============================== Start LVM PoC NodeStageVolume Steps ==============================")
3031
klog.V(2).Infof("============================== volumeGroupName is %v ==============================", volumeGroupName)
3132

33+
info, err := common.RunCommand("grep", raidedLocalSsdName, "ls", raidedLssdPrefix)
34+
if err != nil {
35+
klog.Errorf("================== failed while listing raided devices, err: %v, output:%v ===============", err, info)
36+
}
37+
infoString := strings.TrimSpace(string(info))
38+
klog.V(2).Infof("=================== Got Raided LSSD name %v ===================", infoString)
39+
raidedLocalSsdPath = raidedLssdPrefix + infoString
40+
3241
klog.V(2).Infof("============================== vgscan before vgcreate ==============================")
3342
args := []string{}
34-
info, err := common.RunCommand("vgscan", args...)
43+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...)
3544
if err != nil {
3645
klog.Errorf("vgscan error %v: %s", err, info)
3746
}
@@ -56,14 +65,14 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
5665
"-o",
5766
"vg_name",
5867
}
59-
info, err = common.RunCommand("pvs", args...)
68+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "pvs", args...)
6069
if err != nil {
6170
klog.Errorf("errored while checking physical volume details %v: %s", err, info)
6271
// On error info contains the error message which we cannot use for further steps
6372
info = nil
6473
}
6574

66-
infoString := strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " "))
75+
infoString = strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " "))
6776
infoString = strings.ReplaceAll(infoString, ".", "")
6877
infoString = strings.ReplaceAll(infoString, "\"", "")
6978
infoSlice := strings.Split(strings.TrimSpace(infoString), " ")
@@ -74,38 +83,40 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
7483
} else if vgNameForPv != "VG" && vgNameForPv != "" {
7584

7685
klog.V(2).Infof("============================== Deactivate VG %s ==============================", vgNameForPv)
77-
info, err = common.RunCommand("vgchange", []string{"-an", vgNameForPv}...)
86+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...)
7887
if err != nil {
7988
klog.Errorf("Errored while deactivating VG %v: err: %v: %s", vgNameForPv, err, info)
8089
}
8190

8291
reduceVolumeGroup(vgNameForPv, false)
83-
_, isCached := isCachingSetup(raidedLocalSsdPath, mainLvName)
92+
_, isCached := isCachingSetup(mainLvName)
8493
// We will continue to uncache even if it errors to check caching as it is not a terminal issue.
8594

86-
if !isCached {
95+
if isCached {
8796
klog.Infof("============================== Uncaching the LV %v==============================", mainLvName)
8897
// Uncache LV
8998
args = []string{
9099
"--uncache",
91100
vgNameForPv + "/" + mainLvName,
101+
"--force",
102+
"-y", // force remove cache without flushing data
92103
}
93-
info, err = common.RunCommand("lvconvert", args...)
104+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...)
94105
if err != nil {
95106
klog.Errorf("errored while uncaching main LV. %v: %s", err, info)
96107
}
97108

98109
reduceVolumeGroup(vgNameForPv, false)
99110
}
100111
klog.V(2).Infof("============================== Merge VG %v to Node VG %v ==============================", vgNameForPv, volumeGroupName)
101-
info, err = common.RunCommand("vgmerge", []string{volumeGroupName, vgNameForPv}...)
112+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgmerge", []string{volumeGroupName, vgNameForPv}...)
102113
if err != nil {
103114
klog.Errorf("Errored while merging Volume group %s into %s %v: %s", vgNameForPv, volumeGroupName, err, info)
104115
}
105116

106117
} else {
107118
klog.V(2).Infof("============================== Extend Node VG %v for PV %v ==============================", volumeGroupName, devicePath)
108-
info, err := common.RunCommand("vgextend", []string{volumeGroupName, devicePath}...)
119+
info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgextend", []string{volumeGroupName, devicePath}...)
109120
if err != nil {
110121
klog.Errorf("Errored while extending VGs %v: %s", err, info)
111122
}
@@ -118,7 +129,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
118129
"-o",
119130
"lv_name",
120131
}
121-
lvList, err := common.RunCommand("lvs", args...)
132+
lvList, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvs", args...)
122133
if err != nil {
123134
return mainDevicePath, fmt.Errorf("lv list error %w: %s", err, info)
124135
}
@@ -135,13 +146,13 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
135146
volumeGroupName,
136147
devicePath,
137148
}
138-
info, err = common.RunCommand("lvcreate", args...)
149+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...)
139150
if err != nil {
140151
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
141152
}
142153

143154
}
144-
err, isCached := isCachingSetup(raidedLocalSsdPath, mainLvName)
155+
err, isCached := isCachingSetup(mainLvName)
145156
if err != nil {
146157
klog.Errorf("faild to check if caching ius setup for LV. Continuing to setup caching.")
147158
}
@@ -163,7 +174,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
163174
volumeGroupName,
164175
raidedLocalSsdPath,
165176
}
166-
info, err = common.RunCommand("lvcreate", args...)
177+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvcreate", args...)
167178
if err != nil {
168179
klog.V(2).Infof("============================== lvcreate error %v: %s ==============================", err, info)
169180
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
@@ -186,7 +197,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
186197
"--force",
187198
"-y",
188199
}
189-
info, err = common.RunCommand("lvconvert", args...)
200+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...)
190201
if err != nil {
191202
klog.V(2).Infof("============================== lvconvert error %v: %s ==============================", err, info)
192203
return mainDevicePath, fmt.Errorf("lvconvert error %w: %s", err, info)
@@ -195,7 +206,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
195206

196207
// activate all the LVs in the Volume group
197208
klog.V(2).Infof("============================== Activate Volume group %s ==============================", volumeGroupName)
198-
info, err = common.RunCommand("vgchange", []string{"-ay", volumeGroupName}...)
209+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgchange", []string{"-ay", volumeGroupName}...)
199210
if err != nil {
200211
klog.Errorf("Failed to activate VG %v %v:%s", volumeGroupName, err, info)
201212
}
@@ -212,15 +223,15 @@ func cleanupCache(volumeId string, nodeId string) error {
212223
"-an",
213224
"/dev/" + volumeGroupName + "/" + mainLvName,
214225
}
215-
info, err := common.RunCommand("lvchange", args...)
226+
info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvchange", args...)
216227
if err != nil {
217228
klog.Errorf("Errored while deactivating the disk %v: %s", err, info)
218229
}
219230
args = []string{
220231
"--uncache",
221232
volumeGroupName + "/" + mainLvName,
222233
}
223-
info, err = common.RunCommand("lvconvert", args...)
234+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "lvconvert", args...)
224235
if err != nil {
225236
return fmt.Errorf("errored while uncaching the disk %w: %s", err, info)
226237
}
@@ -247,15 +258,15 @@ func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string)
247258
volumeGroupName,
248259
raidedLocalSsds,
249260
}
250-
info, err := common.RunCommand("vgcreate", args...)
261+
info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgcreate", args...)
251262
if err != nil {
252263
klog.Errorf("vgcreate error %v: %s", err, info)
253264
return fmt.Errorf("vgcreate error %w: %s", err, info)
254265
}
255266

256267
klog.V(2).Infof("============================== vgscan after vgcreate ==============================")
257268
args = []string{}
258-
info, err = common.RunCommand("vgscan", args...)
269+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgscan", args...)
259270
if err != nil {
260271
klog.Errorf("vgscan error %v: %s", err, info)
261272
} else {
@@ -273,7 +284,7 @@ func reduceVolumeGroup(volumeGroupName string, force bool) {
273284
if force {
274285
args = append(args, "--force")
275286
}
276-
info, err := common.RunCommand("vgreduce", args...)
287+
info, err := common.RunCommand("" /* pipedCmd */, "" /* pipedCmdArg */, "vgreduce", args...)
277288
if err != nil {
278289
klog.Errorf("Errored while cleaning up volume group %v: %s", err, info)
279290
}
@@ -287,24 +298,19 @@ func RaidLocalSsds() error {
287298
klog.V(2).Infof("============================== Local SSDs are already RAIDed ==============================")
288299
return nil
289300
}
290-
info, err := common.RunCommand("nvme", []string{"list", "-o", "json"}...)
301+
info, err := common.RunCommand("grep" /* pipedCmd */, "DevicePath" /* pipeCmdArg */, "nvme", []string{"list", "-o", "json"}...)
291302
if err != nil {
292303
return fmt.Errorf("errored while scanning available NVME disks info: %v; err:%v", info, err)
293304
}
294-
infoString := strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " "))
295-
klog.V(2).Infof("============================== NVME list %v ==============================", infoString)
296-
infoString = strings.ReplaceAll(infoString, "\"", "")
297-
infoString = strings.ReplaceAll(infoString, " :", ":")
298-
infoString = strings.ReplaceAll(infoString, ": ", ":")
299-
infoString = strings.ReplaceAll(infoString, ",", " ")
300-
infoSlice := strings.Split(infoString, " ")
305+
infoString := strings.ReplaceAll(string(info), "\"", "")
306+
infoString = strings.TrimSpace(strings.ReplaceAll(infoString, ",", " "))
307+
infoSlice := strings.Split(infoString, "\n")
308+
klog.V(2).Infof("============================== NVME list %v ==============================", infoSlice)
301309
diskList := []string{}
302310
for _, diskInfo := range infoSlice {
303311
diskName := strings.TrimSpace(diskInfo)
304-
if strings.Contains(diskName, "DevicePath") {
305-
diskName := strings.TrimSpace(strings.Split(diskName, ":")[1])
306-
diskList = append(diskList, diskName)
307-
}
312+
diskName = strings.TrimSpace(strings.Split(diskName, ":")[1])
313+
diskList = append(diskList, diskName)
308314
}
309315
nvmeDiskCount := len(diskList)
310316
nvmeDiskList := strings.Join(diskList, " ")
@@ -323,7 +329,7 @@ func RaidLocalSsds() error {
323329
strconv.Itoa(nvmeDiskCount),
324330
}
325331
args = append(args, diskList...)
326-
info, err = common.RunCommand("mdadm", args...)
332+
info, err = common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...)
327333
if err != nil {
328334
return fmt.Errorf("errored while RAIDing LSSDs info: %v; err:%v", info, err)
329335
}
@@ -343,7 +349,7 @@ func isRaided() (bool, error) {
343349
"--detail",
344350
"--scan",
345351
}
346-
info, err := common.RunCommand("mdadm", args...)
352+
info, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "mdadm", args...)
347353
if err != nil {
348354
return false, fmt.Errorf("errored while scanning for raided LSSD %v: %s", err, info)
349355
}
@@ -354,17 +360,18 @@ func isRaided() (bool, error) {
354360
return false, nil
355361
}
356362

357-
func isCachingSetup(cachePoolName, mainLvName string) (error, bool) {
363+
func isCachingSetup(mainLvName string) (error, bool) {
358364
// Verify caching is setup for PD
365+
klog.V(2).Infof("============================== Verifying if caching is setup for %v ==============================", mainLvName)
359366
args := []string{
360367
"--select",
361368
"lv_name=" + mainLvName,
362369
"-o",
363370
"pool_lv",
364371
}
365-
poolName, err := common.RunCommand("lvs", args...)
372+
poolName, err := common.RunCommand("" /* pipedCmd */, "" /* pipeCmdArg */, "lvs", args...)
366373
if err != nil {
367-
return fmt.Errorf("lvcreate error %w", err), false
374+
return fmt.Errorf("lvs error %w", err), false
368375
}
369376
if strings.Contains(string(poolName), "csi-fast") {
370377
return nil, true

test/e2e/tests/setup_e2e_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,10 @@ func NewTestContext(zone string, instanceNumber string) *remote.TestContext {
160160
if err != nil {
161161
klog.Fatalf("Failed to copy google_nvme_id to containerized directory: %v", err)
162162
}
163-
pkgs := []string{"lvm2", "mdadm"}
164-
for _, pkg := range pkgs {
165-
err = testutils.InstallDependencies(i, pkg)
166-
if err != nil {
167-
klog.Errorf("Failed to install dependency package %v to node %v", pkg, i.GetNodeID())
168-
}
163+
pkgs := []string{"lvm2", "mdadm", "grep", "coreutils"}
164+
err = testutils.InstallDependencies(i, pkgs)
165+
if err != nil {
166+
klog.Errorf("Failed to install dependency package on node %v: error : %v", i.GetNodeID(), err)
169167
}
170168

171169
err = testutils.SetupDataCachingConfig(i)

0 commit comments

Comments
 (0)