diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 07e883d92..d72a2c51f 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -34,6 +34,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -97,6 +98,8 @@ var ( diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.") + diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks") + version string ) @@ -275,6 +278,13 @@ func handle() { klog.Fatalf("Failed to get node info from API server: %v", err.Error()) } + deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, *diskCacheSyncPeriod, *nodeName, driverName, deviceUtils) + if err != nil { + klog.Warningf("Failed to create device cache: %v", err.Error()) + } else { + go deviceCache.Run(ctx) + } + // TODO(2042): Move more of the constructor args into this struct nsArgs := &driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, @@ -283,6 +293,7 @@ func handle() { DataCacheEnabledNodePool: isDataCacheEnabledNodePool, SysfsPath: "/sys", MetricsManager: metricsManager, + DeviceCache: deviceCache, } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) @@ -297,9 +308,10 @@ func handle() { if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil { klog.Errorf("Data Cache setup failed: %v", err) } - go driver.StartWatcher(*nodeName) + go driver.StartWatcher(ctx, *nodeName) } } + } err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) diff --git a/deploy/kubernetes/base/controller/controller.yaml b/deploy/kubernetes/base/controller/controller.yaml index e15b8593a..302874f82 100644 --- a/deploy/kubernetes/base/controller/controller.yaml +++ b/deploy/kubernetes/base/controller/controller.yaml @@ -145,6 +145,7 @@ spec: - "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme" - "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml" - --enable-data-cache + - --run-node-service=false - --enable-multitenancy command: - /gce-pd-csi-driver diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index 0663677c3..b0da81d85 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -46,6 +46,6 @@ imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver # pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver - newTag: "v1.17.3" + newTag: "v1.17.4" --- diff --git a/manifest_osversion.sh b/manifest_osversion.sh index 4feb4d5c6..37f61e3f2 100755 --- a/manifest_osversion.sh +++ b/manifest_osversion.sh @@ -4,7 +4,7 @@ set -o xtrace # The following is a workaround for issue https://github.com/moby/moby/issues/41417 # to manually inserver os.version information into docker manifest file -# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here, +# TODO: once docker manifest annotation for os.versions is available for the installed docker here, # replace the following with annotation approach. https://github.com/docker/cli/pull/2578 export DOCKER_CLI_EXPERIMENTAL=enabled @@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS" IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES" MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION} -# translate from image tag to docker manifest foler format +# translate from image tag to docker manifest folder format # e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2 manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/") echo ${manifest_folder} diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index b2ccd5e70..1b894d53c 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -7,19 +7,14 @@ import ( "regexp" "strconv" "strings" - "time" csi "github.com/container-storage-interface/spec/lib/go/csi" fsnotify "github.com/fsnotify/fsnotify" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" ) const ( @@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con } func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } + if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found { dataCacheCount, err := strconv.Atoi(val) if err != nil { @@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, return 0, nil } -func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { - var nodeObj *v1.Node - backoff := wait.Backoff{ - Duration: 1 * time.Second, - Factor: 2.0, - Steps: 5, - } - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { - node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) - return false, nil - } - nodeObj = node - klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) - return true, nil - }) - - if err != nil { - klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) - } - return nodeObj, err -} - func FetchRaidedLssdCountForDatacache() (int, error) { raidedPath, err := fetchRAIDedLocalSsdPath() if err != nil { @@ -615,7 +579,7 @@ func InitializeDataCacheNode(nodeId string) error { return nil } -func StartWatcher(nodeName string) { +func StartWatcher(ctx context.Context, nodeName string) { dirToWatch := "/dev/" watcher, err := fsnotify.NewWatcher() if err != nil { @@ -630,7 +594,7 @@ func StartWatcher(nodeName string) { } errorCh := make(chan error, 1) // Handle the error received from the watcher goroutine - go watchDiskDetaches(watcher, nodeName, errorCh) + go watchDiskDetaches(ctx, watcher, nodeName, errorCh) select { case err := <-errorCh: @@ -638,9 +602,12 @@ func StartWatcher(nodeName string) { } } -func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { +func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { for { select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return nil // watch for errors case err := <-watcher.Errors: errorCh <- fmt.Errorf("disk update event errored: %v", err) diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 76aa38381..83ff9767a 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi DataCacheEnabledNodePool: args.DataCacheEnabledNodePool, SysfsPath: args.SysfsPath, metricsManager: args.MetricsManager, + DeviceCache: args.DeviceCache, } } @@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT maxLogChar = grpcLogCharCap klog.V(4).Infof("Driver: %v", gceDriver.name) - //Start the nonblocking GRPC + // Start the nonblocking GRPC s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager) // TODO(#34): Only start specific servers based on a flag. // In the future have this only run specific combinations of servers depending on which version this is. diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 00f0b0859..f96bdac14 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -32,14 +32,14 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs" @@ -80,6 +80,8 @@ type GCENodeServer struct { csi.UnimplementedNodeServer metricsManager *metrics.MetricsManager + // A cache of the device paths for the volumes that are attached to the node. + DeviceCache *linkcache.DeviceCache } type NodeServerArgs struct { @@ -97,6 +99,7 @@ type NodeServerArgs struct { SysfsPath string MetricsManager *metrics.MetricsManager + DeviceCache *linkcache.DeviceCache } var _ csi.NodeServer = &GCENodeServer{} @@ -509,6 +512,13 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } } + if ns.DeviceCache != nil { + err = ns.DeviceCache.AddVolume(volumeID) + if err != nil { + klog.Warningf("Error adding volume %s to cache: %v", volumeID, err) + } + } + klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } @@ -622,6 +632,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err) } } + + if ns.DeviceCache != nil { + ns.DeviceCache.RemoveVolume(volumeID) + } + klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil } @@ -899,15 +914,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) { } func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 2ac1b9e2e..77ef9c39c 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -44,11 +45,13 @@ const ( ) func getTestGCEDriver(t *testing.T) *GCEDriver { - return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) + return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{ + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{defaultVolumeID})), + }) } -func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver { - return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) +func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver { + return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args) } func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver { @@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) { } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{tc.volumeID})), + }) ns := gceDriver.ns req := &csi.NodeGetVolumeStatsRequest{ @@ -1227,7 +1232,9 @@ func TestNodeStageVolume(t *testing.T) { )) } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{volumeID})), + }) ns := gceDriver.ns ns.SysfsPath = tempDir + "/sys" _, err := ns.NodeStageVolume(context.Background(), tc.req) diff --git a/pkg/k8sclient/node.go b/pkg/k8sclient/node.go new file mode 100644 index 000000000..1c4fa9b8b --- /dev/null +++ b/pkg/k8sclient/node.go @@ -0,0 +1,49 @@ +package k8sclient + +import ( + "context" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + return getNodeWithRetry(ctx, kubeClient, nodeName) +} + +func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { + var nodeObj *v1.Node + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Steps: 5, + } + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) + return false, nil + } + nodeObj = node + klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) + return true, nil + }) + + if err != nil { + klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) + } + return nodeObj, err +} diff --git a/pkg/linkcache/devices_linux.go b/pkg/linkcache/devices_linux.go new file mode 100644 index 000000000..71e733e35 --- /dev/null +++ b/pkg/linkcache/devices_linux.go @@ -0,0 +1,172 @@ +package linkcache + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" +) + +const byIdDir = "/dev/disk/by-id" + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) { + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) + } + + return newDeviceCacheForNode(period, node, driverName, deviceUtils), nil +} + +func NewTestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache { + return newDeviceCacheForNode(period, node, "pd.csi.storage.gke.io", deviceutils.NewDeviceUtils()) +} + +func NewTestNodeWithVolumes(volumes []string) *v1.Node { + volumesInUse := make([]v1.UniqueVolumeName, len(volumes)) + for i, volume := range volumes { + volumesInUse[i] = v1.UniqueVolumeName("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume) + } + + return &v1.Node{ + Status: v1.NodeStatus{ + VolumesInUse: volumesInUse, + }, + } +} + +func newDeviceCacheForNode(period time.Duration, node *v1.Node, driverName string, deviceUtils deviceutils.DeviceUtils) *DeviceCache { + deviceCache := &DeviceCache{ + symlinks: make(map[string]deviceMapping), + period: period, + deviceUtils: deviceUtils, + dir: byIdDir, + } + + // Look at the status.volumesInUse field. For each, take the last section + // of the string (after the last "/") and call AddVolume for that + for _, volume := range node.Status.VolumesInUse { + volumeName := string(volume) + tokens := strings.Split(volumeName, "^") + if len(tokens) != 2 { + klog.V(5).Infof("Skipping volume %q because splitting volumeName on `^` returns %d tokens, expected 2", volumeName, len(tokens)) + continue + } + + // The first token is of the form "kubernetes.io/csi/" or just "". + // We should check if it contains the driver name we are interested in. + if !strings.Contains(tokens[0], driverName) { + klog.V(5).Infof("Skipping volume %q because it is not a %s volume.", volumeName, driverName) + continue + } + klog.Infof("Adding volume %s to cache", string(volume)) + deviceCache.AddVolume(tokens[1]) + } + + return deviceCache +} + +// Run since it needs an infinite loop to keep itself up to date +func (d *DeviceCache) Run(ctx context.Context) { + klog.Infof("Starting device cache watcher for directory %s with period %s", d.dir, d.period) + + // Start the loop that runs every minute + ticker := time.NewTicker(d.period) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return + case <-ticker.C: + d.listAndUpdate() + + klog.Infof("Cache contents: %+v", d.symlinks) + } + } +} + +// Add a volume. This will yield a call to the filesystem to find a +// /dev/disk/by-id symlink and an evaluation of that symlink. +func (d *DeviceCache) AddVolume(volumeID string) error { + klog.Infof("Adding volume %s to cache", volumeID) + + _, volumeKey, err := common.VolumeIDToKey(volumeID) + if err != nil { + return fmt.Errorf("error converting volume ID to key: %w", err) + } + deviceName, err := common.GetDeviceName(volumeKey) + if err != nil { + return fmt.Errorf("error getting device name: %w", err) + } + + symlinks := d.deviceUtils.GetDiskByIdPaths(deviceName, "") + if len(symlinks) == 0 { + return fmt.Errorf("no symlink paths found for volume %s", volumeID) + } + + d.mutex.Lock() + defer d.mutex.Unlock() + + // We may have multiple symlinks for a given device, we should add all of them. + for _, symlink := range symlinks { + realPath, err := filepath.EvalSymlinks(symlink) + if err != nil { + // This is not an error, as the symlink may not have been created yet. + // Leave real_path empty; the periodic check will update it. + klog.V(5).Infof("Could not evaluate symlink %s, will retry: %v", symlink, err) + realPath = "" + } else { + klog.Infof("Found real path %s for volume %s", realPath, volumeID) + } + // The key is the symlink path. The value contains the evaluated + // real path and the original volumeID for better logging. + d.symlinks[symlink] = deviceMapping{ + volumeID: volumeID, + realPath: realPath, + } + klog.V(4).Infof("Added volume %s to cache with symlink %s", volumeID, symlink) + } + + return nil +} + +// Remove the volume from the cache. +func (d *DeviceCache) RemoveVolume(volumeID string) { + klog.Infof("Removing volume %s from cache", volumeID) + d.mutex.Lock() + defer d.mutex.Unlock() + for symlink, device := range d.symlinks { + if device.volumeID == volumeID { + delete(d.symlinks, symlink) + } + } +} + +func (d *DeviceCache) listAndUpdate() { + for symlink, device := range d.symlinks { + // Evaluate the symlink + realPath, err := filepath.EvalSymlinks(symlink) + if err != nil { + klog.Warningf("Error evaluating symlink for volume %s: %v", device.volumeID, err) + continue + } + + // Check if the realPath has changed + if realPath != device.realPath { + klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", device.volumeID, symlink, device.realPath, realPath) + + // Update the cache with the new realPath + device.realPath = realPath + d.symlinks[symlink] = device + } + } +} diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go new file mode 100644 index 000000000..d7937bc0b --- /dev/null +++ b/pkg/linkcache/devices_windows.go @@ -0,0 +1,29 @@ +//go:build windows + +package linkcache + +import ( + "context" + "time" + + "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" +) + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) { + klog.Infof("NewDeviceCacheForNode is not implemented for Windows") + return nil, nil +} + +func (d *DeviceCache) Run(ctx context.Context) { + // Not implemented for Windows +} + +func (d *DeviceCache) AddVolume(volumeID string) error { + klog.Infof("AddVolume is not implemented for Windows") + return nil +} + +func (d *DeviceCache) RemoveVolume(volumeID string) { + // Not implemented for Windows +} diff --git a/pkg/linkcache/types.go b/pkg/linkcache/types.go new file mode 100644 index 000000000..04d4688eb --- /dev/null +++ b/pkg/linkcache/types.go @@ -0,0 +1,22 @@ +package linkcache + +import ( + "sync" + "time" + + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" +) + +type deviceMapping struct { + volumeID string + realPath string +} + +type DeviceCache struct { + mutex sync.Mutex + symlinks map[string]deviceMapping + period time.Duration + // dir is the directory to look for device symlinks + dir string + deviceUtils deviceutils.DeviceUtils +} diff --git a/test/remote/instance.go b/test/remote/instance.go index 554e7612e..84c56625f 100644 --- a/test/remote/instance.go +++ b/test/remote/instance.go @@ -235,7 +235,7 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { } if i.cfg.CloudtopHost { - output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project, "--", "-o", "ProxyCommand=corp-ssh-helper %h %p", "--", "echo").CombinedOutput() + output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project).CombinedOutput() if err != nil { klog.Errorf("Failed to bootstrap ssh (%v): %s", err, string(output)) return false, nil @@ -257,9 +257,8 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { return true, nil }) - // If instance didn't reach running state in time, return with error now. if err != nil { - return err + return fmt.Errorf("instance %v did not reach running state in time: %v", i.cfg.Name, err.Error()) } // Instance reached running state in time, make sure that cloud-init is complete diff --git a/test/remote/setup-teardown.go b/test/remote/setup-teardown.go index 3026b28b0..14cbbbf19 100644 --- a/test/remote/setup-teardown.go +++ b/test/remote/setup-teardown.go @@ -73,7 +73,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes archiveName := fmt.Sprintf("e2e_driver_binaries_%s.tar.gz", uuid.NewUUID()) archivePath, err := CreateDriverArchive(archiveName, instance.cfg.Architecture, config.PkgPath, config.BinPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create driver archive: %v", err.Error()) } defer func() { err = os.Remove(archivePath) @@ -92,7 +92,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes // Upload archive to instance and run binaries driverPID, err := instance.UploadAndRun(archivePath, config.WorkspaceDir, config.RunDriverCmd) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to upload and run driver: %v", err.Error()) } // Create an SSH tunnel from port to port diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 89192f150..5ce72493c 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -33,6 +33,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -77,6 +78,7 @@ func TestSanity(t *testing.T) { EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0, EnableDataCache: enableDataCache, + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{})), } // Initialize GCE Driver