Skip to content

Commit 4212f84

Browse files
hungnguyen243cemakd
authored andcommitted
update RAIDing and validation steps
1 parent 8f2a701 commit 4212f84

File tree

8 files changed

+284
-117
lines changed

8 files changed

+284
-117
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30-
"k8s.io/client-go/kubernetes"
31-
"k8s.io/client-go/rest"
3230
"k8s.io/klog/v2"
3331
"k8s.io/utils/strings/slices"
34-
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3632
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3733
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3834
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -95,9 +91,7 @@ var (
9591
)
9692

9793
const (
98-
driverName = "pd.csi.storage.gke.io"
99-
dataCacheLabel = "datacache-storage-gke-io"
100-
dataCacheLabelValue = "enabled"
94+
driverName = "pd.csi.storage.gke.io"
10195
)
10296

10397
func init() {
@@ -331,37 +325,74 @@ func urlFlag(target **url.URL, name string, usage string) {
331325
})
332326
}
333327

334-
func setupDataCache(ctx context.Context, nodeName string, nodeId string) error {
328+
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
329+
allLssds, err := driver.FetchAllLssds()
330+
if err != nil {
331+
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
332+
}
333+
334+
raidedLssds, err := driver.FetchRaidedLssds()
335+
if err != nil {
336+
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
337+
}
338+
339+
unRaidedLssds := []string{}
340+
for _, l := range allLssds {
341+
if !slices.Contains(raidedLssds, l) {
342+
unRaidedLssds = append(unRaidedLssds, l)
343+
}
344+
if len(unRaidedLssds) == lssdCount {
345+
break
346+
}
347+
}
348+
349+
LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
350+
if err != nil {
351+
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
352+
}
353+
354+
// We need to ensure the disks to be used for Datacache are both unRAIDed & not containing mountpoints for ephemeral storage already
355+
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
356+
return slices.Contains(LSSDsWithEmptyMountPoint, e)
357+
})
358+
359+
if len(availableLssds) == 0 {
360+
return nil, fmt.Errorf("No LSSDs available to set up caching")
361+
}
362+
363+
if len(availableLssds) < lssdCount {
364+
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
365+
}
366+
return availableLssds, nil
367+
}
368+
369+
func setupDataCache(ctx context.Context, nodeName string) error {
335370
isAlreadyRaided, err := driver.IsRaided()
336371
if err != nil {
337-
klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
372+
klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
338373
} else if isAlreadyRaided {
339-
klog.V(4).Infof("Local SSDs are already RAIDed. Skipping Data Cache setup.")
374+
klog.V(2).Infof("Local SSDs are already RAIDed. Skipping Datacache setup.")
340375
return nil
341376
}
342377

343378
lssdCount := common.LocalSSDCountForDataCache
344379
if nodeName != common.TestNode {
345-
cfg, err := rest.InClusterConfig()
346-
if err != nil {
347-
return err
348-
}
349-
kubeClient, err := kubernetes.NewForConfig(cfg)
350-
if err != nil {
351-
return err
380+
var err error
381+
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
382+
if lssdCount == 0 {
383+
klog.Infof("Datacache is not enabled on node %v", nodeName)
384+
return nil
352385
}
353-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
354386
if err != nil {
355-
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
356387
return err
357388
}
358-
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
359-
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
360-
return nil
361-
}
362389
}
363-
klog.V(2).Info("Raiding local ssds to setup data cache")
364-
if err := driver.RaidLocalSsds(); err != nil {
390+
lssdNames, err := fetchLssdsForRaiding(lssdCount)
391+
if err != nil {
392+
klog.Fatalf("Failed to get sufficient SSDs for Datacache's caching setup: %v", err)
393+
}
394+
klog.V(2).Infof("Raiding local ssds to setup data cache: %v", lssdNames)
395+
if err := driver.RaidLocalSsds(lssdNames); err != nil {
365396
return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err)
366397
}
367398

pkg/common/constants.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ const (
4444
ContexLocalSsdCacheSize = "local-ssd-cache-size"
4545
// Node name for E2E tests
4646
TestNode = "test-node-csi-e2e"
47+
48+
// Default LSSD count for datacache E2E tests
49+
LocalSSDCountForDataCache = 2
50+
51+
// Node label for datacache
52+
NodeLabelPrefix = "cloud.google.com/%s"
53+
DataCacheLssdCountLabel = "gke-data-cache-disk"
4754
)
4855

4956
// doc https://cloud.google.com/compute/docs/disks/hyperdisks#max-total-disks-per-vm

pkg/common/runcmd.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1717
// On error, the output is included so callers don't need to echo it again.
1818

19-
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
19+
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
2020
execCmd1 := exec.Command(cmd1, execCmdArgs...)
2121

2222
if pipeCmd != "" {
@@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
4747
}
4848
return err
4949
}
50-
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
50+
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {
5151

52-
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
5353
stdoutPipe, err := execCmd1.StdoutPipe()
5454
if err != nil {
5555
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
@@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
6363
execPipeCmd.Stdin = stdoutPipe
6464
output, err := execPipeCmd.CombinedOutput()
6565
if err != nil {
66+
// Some commands (such as grep) will return an error with exit status of 1
67+
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
68+
return output, nil
69+
}
6670
err = checkError(err, *execPipeCmd)
67-
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
71+
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
6872
}
6973

7074
return output, nil

0 commit comments

Comments
 (0)