From dba53e0becfc478f574334cfe0f25ed8a9578272 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Tue, 20 May 2025 17:28:24 -0700 Subject: [PATCH 01/23] Cache devices and their symlinks in node driver, periodically noting changes and printing the full list. example: periodic symlink cache read: /dev/disk/by-id/google-persistent-disk-0 -> /dev/sda; /dev/disk/by-id/google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67 -> /dev/sdb; /dev/disk/by-id/scsi-0Google_PersistentDisk_persistent-disk-0 -> /dev/sda; /dev/disk/by-id/scsi-0Google_PersistentDisk_pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67 -> /dev/sdb --- cmd/gce-pd-csi-driver/main.go | 5 +- pkg/gce-pd-csi-driver/cache.go | 9 +- pkg/linkcache/cache.go | 173 +++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 pkg/linkcache/cache.go diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 07e883d92..3235436a9 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -34,6 +34,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -297,9 +298,11 @@ func handle() { if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil { klog.Errorf("Data Cache setup failed: %v", err) } - go driver.StartWatcher(*nodeName) + go driver.StartWatcher(ctx, *nodeName) } } + + go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx) } err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index b2ccd5e70..1dc486690 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -615,7 +615,7 @@ func InitializeDataCacheNode(nodeId string) error { return nil } -func StartWatcher(nodeName string) { +func StartWatcher(ctx context.Context, nodeName string) { dirToWatch := "/dev/" watcher, err := fsnotify.NewWatcher() if err != nil { @@ -630,7 +630,7 @@ func StartWatcher(nodeName string) { } errorCh := make(chan error, 1) // Handle the error received from the watcher goroutine - go watchDiskDetaches(watcher, nodeName, errorCh) + go watchDiskDetaches(ctx, watcher, nodeName, errorCh) select { case err := <-errorCh: @@ -638,9 +638,12 @@ func StartWatcher(nodeName string) { } } -func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { +func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { for { select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return nil // watch for errors case err := <-watcher.Errors: errorCh <- fmt.Errorf("disk update event errored: %v", err) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go new file mode 100644 index 000000000..ee8e70b73 --- /dev/null +++ b/pkg/linkcache/cache.go @@ -0,0 +1,173 @@ +package linkcache + +import ( + "context" + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "k8s.io/klog/v2" +) + +var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) + +// ListingCache polls the filesystem at the specified directory once per +// periodand checks each non-directory entry for a symlink. The results are +// cached. Changes to the cache are logged, as well as the full contents of the +// cache. The cache's Run() method is expected to be called in a goroutine. +// Its cancellation is controlled via the context argument. +type ListingCache struct { + period time.Duration + dir string + links *linkCache +} + +func NewListingCache(period time.Duration, dir string) *ListingCache { + return &ListingCache{ + period: period, + dir: dir, + links: newLinkCache(), + } +} + +// Run starts the cache's background loop. The filesystem is listed and the +// cache updated according to the frequency specified by the period. It will run +// until the context is cancelled. +func (l *ListingCache) Run(ctx context.Context) { + // Start the loop that runs every minute + ticker := time.NewTicker(l.period) + defer ticker.Stop() + + // Initial list and update so we don't wait for the first tick. + err := l.listAndUpdate() + if err != nil { + klog.Warningf("Error listing and updating symlinks: %v", err) + } + + for { + select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return + case <-ticker.C: + err := l.listAndUpdate() + if err != nil { + klog.Warningf("Error listing and updating symlinks: %v", err) + continue + } + + klog.Infof("periodic symlink cache read: %s", l.links.String()) + } + } +} + +func (l *ListingCache) listAndUpdate() error { + visited := make(map[string]struct{}) + + entries, err := os.ReadDir(l.dir) + if err != nil { + return fmt.Errorf("failed to read directory %s: %w", l.dir, err) + } + + var errs []error + for _, entry := range entries { + if entry.IsDir() { + continue + } + + // TODO(juliankatz): To have certainty this works for all edge cases, we + // need to test this with a manually partitioned disk. + if partitionNameRegex.MatchString(entry.Name()) { + continue + } + + diskByIdPath := filepath.Join(l.dir, entry.Name()) + + // Add the device to the map regardless of successful symlink eval. + // Otherwise, a broken symlink will lead us to remove it from the cache. + visited[diskByIdPath] = struct{}{} + + realFSPath, err := filepath.EvalSymlinks(diskByIdPath) + if err != nil { + errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) + l.links.BrokenSymlink(diskByIdPath) + continue + } + + l.links.AddOrUpdateDevice(diskByIdPath, realFSPath) + } + + for _, id := range l.links.DeviceIDs() { + if _, found := visited[id]; !found { + l.links.RemoveDevice(id) + } + } + + if len(errs) > 0 { + return fmt.Errorf("failed to evaluate symlinks for %d devices: %v", len(errs), errs) + } + return nil +} + +// linkCache is a structure that maintains a cache of symlinks between +// /dev/disk/by-id and /dev/sd* paths. It provides methods to add/update, +// retrieve, and remove device symlinks from the cache. +type linkCache struct { + devices map[string]linkCacheEntry +} + +type linkCacheEntry struct { + path string + // If true, the symlink is known to be broken. + brokenSymlink bool +} + +func newLinkCache() *linkCache { + return &linkCache{ + devices: make(map[string]linkCacheEntry), + } +} + +func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { + prevEntry, exists := d.devices[symlink] + if !exists || prevEntry.path != realPath { + klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) + } + d.devices[symlink] = linkCacheEntry{path: realPath, brokenSymlink: false} +} + +// BrokenSymlink marks a symlink as broken. If the symlink is not in the cache, +// it is ignored. +func (d *linkCache) BrokenSymlink(symlink string) { + if entry, ok := d.devices[symlink]; ok { + entry.brokenSymlink = true + d.devices[symlink] = entry + } +} + +func (d *linkCache) RemoveDevice(symlink string) { + delete(d.devices, symlink) +} + +func (d *linkCache) DeviceIDs() []string { + ids := make([]string, 0, len(d.devices)) + for id := range d.devices { + ids = append(ids, id) + } + return ids +} + +func (d *linkCache) String() string { + var sb strings.Builder + for symlink, entry := range d.devices { + if entry.brokenSymlink { + sb.WriteString(fmt.Sprintf("%s -> broken symlink... last known value: %s; ", symlink, entry.path)) + } else { + sb.WriteString(fmt.Sprintf("%s -> %s; ", symlink, entry.path)) + } + } + return strings.TrimSuffix(sb.String(), "; ") +} From d3a824ecb0a67d7b23c10abea0c0607c62b0c9f0 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 11:02:59 -0700 Subject: [PATCH 02/23] Some doc comment updates --- pkg/linkcache/cache.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index ee8e70b73..f62d7eb40 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -15,10 +15,10 @@ import ( var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) // ListingCache polls the filesystem at the specified directory once per -// periodand checks each non-directory entry for a symlink. The results are -// cached. Changes to the cache are logged, as well as the full contents of the -// cache. The cache's Run() method is expected to be called in a goroutine. -// Its cancellation is controlled via the context argument. +// period and checks each non-directory entry for a symlink. The results are +// cached. Changes to the cache are logged, as well as the full contents of the +// cache. The cache's Run() method is expected to be called in a goroutine. Its +// cancellation is controlled via the context argument. type ListingCache struct { period time.Duration dir string @@ -33,9 +33,9 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } -// Run starts the cache's background loop. The filesystem is listed and the -// cache updated according to the frequency specified by the period. It will run -// until the context is cancelled. +// Run starts the cache's background loop. The filesystem is listed and the cache +// updated according to the frequency specified by the period. It will run until +// the context is cancelled. func (l *ListingCache) Run(ctx context.Context) { // Start the loop that runs every minute ticker := time.NewTicker(l.period) From 485beaf8269ca0c959c094b5c239d8cefd77c428 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:33:41 -0700 Subject: [PATCH 03/23] Add unit tests --- pkg/linkcache/cache.go | 31 +++-- pkg/linkcache/cache_test.go | 238 ++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 7 deletions(-) create mode 100644 pkg/linkcache/cache_test.go diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index f62d7eb40..7f3040725 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -3,9 +3,11 @@ package linkcache import ( "context" "fmt" + "maps" "os" "path/filepath" "regexp" + "slices" "strings" "time" @@ -14,6 +16,23 @@ import ( var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) +// fsInterface defines the filesystem operations needed by ListingCache +type fsInterface interface { + ReadDir(name string) ([]os.DirEntry, error) + EvalSymlinks(path string) (string, error) +} + +// realFS implements fsInterface using the real filesystem +type realFS struct{} + +func (f *realFS) ReadDir(name string) ([]os.DirEntry, error) { + return os.ReadDir(name) +} + +func (f *realFS) EvalSymlinks(path string) (string, error) { + return filepath.EvalSymlinks(path) +} + // ListingCache polls the filesystem at the specified directory once per // period and checks each non-directory entry for a symlink. The results are // cached. Changes to the cache are logged, as well as the full contents of the @@ -23,6 +42,7 @@ type ListingCache struct { period time.Duration dir string links *linkCache + fs fsInterface } func NewListingCache(period time.Duration, dir string) *ListingCache { @@ -30,6 +50,7 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { period: period, dir: dir, links: newLinkCache(), + fs: &realFS{}, } } @@ -67,7 +88,7 @@ func (l *ListingCache) Run(ctx context.Context) { func (l *ListingCache) listAndUpdate() error { visited := make(map[string]struct{}) - entries, err := os.ReadDir(l.dir) + entries, err := l.fs.ReadDir(l.dir) if err != nil { return fmt.Errorf("failed to read directory %s: %w", l.dir, err) } @@ -90,7 +111,7 @@ func (l *ListingCache) listAndUpdate() error { // Otherwise, a broken symlink will lead us to remove it from the cache. visited[diskByIdPath] = struct{}{} - realFSPath, err := filepath.EvalSymlinks(diskByIdPath) + realFSPath, err := l.fs.EvalSymlinks(diskByIdPath) if err != nil { errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) l.links.BrokenSymlink(diskByIdPath) @@ -153,11 +174,7 @@ func (d *linkCache) RemoveDevice(symlink string) { } func (d *linkCache) DeviceIDs() []string { - ids := make([]string, 0, len(d.devices)) - for id := range d.devices { - ids = append(ids, id) - } - return ids + return slices.Collect(maps.Keys(d.devices)) } func (d *linkCache) String() string { diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go new file mode 100644 index 000000000..65c86fcbd --- /dev/null +++ b/pkg/linkcache/cache_test.go @@ -0,0 +1,238 @@ +package linkcache + +import ( + "os" + "testing" + "testing/fstest" +) + +const ( + // Test disk names in /dev/disk/by-id format + gcpPersistentDiskID = "google-persistent-disk-0" + gcpPVCID = "google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67" + gcpPersistentDiskPartitionID = "google-persistent-disk-0-part1" + + // Test device paths in /dev format + devicePathSDA = "/dev/sda" + devicePathSDB = "/dev/sdb" +) + +// mockFS implements fsInterface for testing +type mockFS struct { + fstest.MapFS + symlinks map[string]string +} + +func newMockFS() *mockFS { + return &mockFS{ + MapFS: make(fstest.MapFS), + symlinks: make(map[string]string), + } +} + +func (m *mockFS) ReadDir(name string) ([]os.DirEntry, error) { + entries, err := m.MapFS.ReadDir(name) + if err != nil { + return nil, err + } + return entries, nil +} + +func (m *mockFS) EvalSymlinks(path string) (string, error) { + if target, ok := m.symlinks[path]; ok { + return target, nil + } + return "", os.ErrNotExist +} + +func TestListAndUpdate(t *testing.T) { + tests := []struct { + name string + setupFS func(*mockFS) + expectedLinks map[string]string + expectError bool + }{ + { + name: "valid symlinks", + setupFS: func(m *mockFS) { + // Create some device files + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + m.MapFS[gcpPVCID] = &fstest.MapFile{} + // Create symlinks + m.symlinks[gcpPersistentDiskID] = devicePathSDA + m.symlinks[gcpPVCID] = devicePathSDB + }, + expectedLinks: map[string]string{ + gcpPersistentDiskID: devicePathSDA, + gcpPVCID: devicePathSDB, + }, + expectError: false, + }, + { + name: "broken symlink not added to cache", + setupFS: func(m *mockFS) { + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + // No symlink target for gcpPersistentDiskID + }, + expectedLinks: map[string]string{}, + expectError: true, + }, + { + name: "partition files ignored", + setupFS: func(m *mockFS) { + m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + m.symlinks[gcpPersistentDiskID] = devicePathSDA + }, + expectedLinks: map[string]string{ + gcpPersistentDiskID: devicePathSDA, + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mock := newMockFS() + tt.setupFS(mock) + + cache := NewListingCache(0, ".") + cache.fs = mock // Inject our mock filesystem + err := cache.listAndUpdate() + + if tt.expectError { + if err == nil { + t.Error("expected error but got none") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + } + + // Verify the cache contents + for symlink, expectedTarget := range tt.expectedLinks { + entry, exists := cache.links.devices[symlink] + if !exists { + t.Errorf("symlink %s should exist in cache", symlink) + continue + } + if entry.path != expectedTarget { + t.Errorf("symlink %s should point to %s, got %s", symlink, expectedTarget, entry.path) + } + if entry.brokenSymlink { + t.Errorf("symlink %s should not be marked as broken", symlink) + } + } + }) + } +} + +func TestListAndUpdateWithChanges(t *testing.T) { + mock := newMockFS() + cache := NewListingCache(0, ".") + cache.fs = mock + + // Initial state: one disk with a valid symlink + mock.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + mock.symlinks[gcpPersistentDiskID] = devicePathSDA + + // First listAndUpdate should add the disk to cache + err := cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in first listAndUpdate: %v", err) + } + + // Verify initial state + entry, exists := cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should exist in cache after first listAndUpdate") + } + if entry.path != devicePathSDA { + t.Errorf("gcpPersistentDiskID should point to %s, got %s", devicePathSDA, entry.path) + } + + // Add a new disk and update the symlink target + mock.MapFS[gcpPVCID] = &fstest.MapFile{} + mock.symlinks[gcpPVCID] = devicePathSDB + mock.symlinks[gcpPersistentDiskID] = devicePathSDB // Update existing disk's target + + // Second listAndUpdate should update the cache + err = cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in second listAndUpdate: %v", err) + } + + // Verify both disks are in cache with correct paths + entry, exists = cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPersistentDiskID should now point to %s, got %s", devicePathSDB, entry.path) + } + + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should exist in cache after second listAndUpdate") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should point to %s, got %s", devicePathSDB, entry.path) + } + + // Break the symlink for gcpPersistentDiskID but keep the file + delete(mock.symlinks, gcpPersistentDiskID) + + // Third listAndUpdate should mark the disk as broken but keep its last known value + err = cache.listAndUpdate() + if err == nil { + t.Error("expected error for broken symlink") + } + + // Verify gcpPersistentDiskID is marked as broken but maintains its last known value + entry, exists = cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPersistentDiskID should maintain its last known value %s, got %s", devicePathSDB, entry.path) + } + if !entry.brokenSymlink { + t.Error("gcpPersistentDiskID should be marked as broken") + } + + // Verify gcpPVCID is still valid + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + } + if entry.brokenSymlink { + t.Error("gcpPVCID should not be marked as broken") + } + + // Remove one disk + delete(mock.MapFS, gcpPersistentDiskID) + delete(mock.symlinks, gcpPersistentDiskID) + + // Fourth listAndUpdate should remove the deleted disk + err = cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in fourth listAndUpdate: %v", err) + } + + // Verify only gcpPVCID remains + if _, exists := cache.links.devices[gcpPersistentDiskID]; exists { + t.Error("gcpPersistentDiskID should be removed from cache") + } + + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + } +} From afb23934ea93e2c71798aa4cddef38403ff340ac Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:35:20 -0700 Subject: [PATCH 04/23] improve partition unit test --- pkg/linkcache/cache_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 65c86fcbd..e43799bc7 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -83,6 +83,7 @@ func TestListAndUpdate(t *testing.T) { m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} m.symlinks[gcpPersistentDiskID] = devicePathSDA + m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" }, expectedLinks: map[string]string{ gcpPersistentDiskID: devicePathSDA, From 643817f7c8c2c3eb4f81d994b498802cd45a45f7 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:37:48 -0700 Subject: [PATCH 05/23] Log on removal as well --- pkg/linkcache/cache.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 7f3040725..c2ceb4ab1 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -170,7 +170,10 @@ func (d *linkCache) BrokenSymlink(symlink string) { } func (d *linkCache) RemoveDevice(symlink string) { - delete(d.devices, symlink) + if entry, ok := d.devices[symlink]; ok { + klog.Infof("Removing device %s with path %s from cache, brokenSymlink: %t", symlink, entry.path, entry.brokenSymlink) + delete(d.devices, symlink) + } } func (d *linkCache) DeviceIDs() []string { From fa2d2f9d3d44706d46db7b03e5796031703acb59 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 16:03:48 -0700 Subject: [PATCH 06/23] Updated unit tests to be clearer, relying on asserting linkCache --- pkg/linkcache/cache.go | 9 ++ pkg/linkcache/cache_test.go | 194 +++++++++++++----------------------- 2 files changed, 79 insertions(+), 124 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index c2ceb4ab1..006eed27d 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -54,6 +54,15 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } +func NewMockListingCache(period time.Duration, dir string) *ListingCache { + return &ListingCache{ + period: period, + dir: dir, + links: newLinkCache(), + fs: &mockFS{}, + } +} + // Run starts the cache's background loop. The filesystem is listed and the cache // updated according to the frequency specified by the period. It will run until // the context is cancelled. diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index e43799bc7..7683ed258 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -4,8 +4,12 @@ import ( "os" "testing" "testing/fstest" + + "github.com/google/go-cmp/cmp" ) +var allowUnexportedLinkCache = cmp.AllowUnexported(linkCache{}, linkCacheEntry{}) + const ( // Test disk names in /dev/disk/by-id format gcpPersistentDiskID = "google-persistent-disk-0" @@ -49,7 +53,7 @@ func TestListAndUpdate(t *testing.T) { tests := []struct { name string setupFS func(*mockFS) - expectedLinks map[string]string + expectedCache *linkCache expectError bool }{ { @@ -62,9 +66,11 @@ func TestListAndUpdate(t *testing.T) { m.symlinks[gcpPersistentDiskID] = devicePathSDA m.symlinks[gcpPVCID] = devicePathSDB }, - expectedLinks: map[string]string{ - gcpPersistentDiskID: devicePathSDA, - gcpPVCID: devicePathSDB, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, + }, }, expectError: false, }, @@ -74,8 +80,10 @@ func TestListAndUpdate(t *testing.T) { m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} // No symlink target for gcpPersistentDiskID }, - expectedLinks: map[string]string{}, - expectError: true, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{}, + }, + expectError: true, }, { name: "partition files ignored", @@ -85,8 +93,10 @@ func TestListAndUpdate(t *testing.T) { m.symlinks[gcpPersistentDiskID] = devicePathSDA m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" }, - expectedLinks: map[string]string{ - gcpPersistentDiskID: devicePathSDA, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + }, }, expectError: false, }, @@ -111,129 +121,65 @@ func TestListAndUpdate(t *testing.T) { } } - // Verify the cache contents - for symlink, expectedTarget := range tt.expectedLinks { - entry, exists := cache.links.devices[symlink] - if !exists { - t.Errorf("symlink %s should exist in cache", symlink) - continue - } - if entry.path != expectedTarget { - t.Errorf("symlink %s should point to %s, got %s", symlink, expectedTarget, entry.path) - } - if entry.brokenSymlink { - t.Errorf("symlink %s should not be marked as broken", symlink) - } + // Compare the entire cache state + if diff := cmp.Diff(tt.expectedCache, cache.links, allowUnexportedLinkCache); diff != "" { + t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) } }) } } -func TestListAndUpdateWithChanges(t *testing.T) { - mock := newMockFS() - cache := NewListingCache(0, ".") - cache.fs = mock - - // Initial state: one disk with a valid symlink - mock.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - mock.symlinks[gcpPersistentDiskID] = devicePathSDA - - // First listAndUpdate should add the disk to cache - err := cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in first listAndUpdate: %v", err) - } - - // Verify initial state - entry, exists := cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should exist in cache after first listAndUpdate") - } - if entry.path != devicePathSDA { - t.Errorf("gcpPersistentDiskID should point to %s, got %s", devicePathSDA, entry.path) - } - - // Add a new disk and update the symlink target - mock.MapFS[gcpPVCID] = &fstest.MapFile{} - mock.symlinks[gcpPVCID] = devicePathSDB - mock.symlinks[gcpPersistentDiskID] = devicePathSDB // Update existing disk's target - - // Second listAndUpdate should update the cache - err = cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in second listAndUpdate: %v", err) - } - - // Verify both disks are in cache with correct paths - entry, exists = cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPersistentDiskID should now point to %s, got %s", devicePathSDB, entry.path) - } - - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should exist in cache after second listAndUpdate") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should point to %s, got %s", devicePathSDB, entry.path) - } - - // Break the symlink for gcpPersistentDiskID but keep the file - delete(mock.symlinks, gcpPersistentDiskID) - - // Third listAndUpdate should mark the disk as broken but keep its last known value - err = cache.listAndUpdate() - if err == nil { - t.Error("expected error for broken symlink") - } - - // Verify gcpPersistentDiskID is marked as broken but maintains its last known value - entry, exists = cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPersistentDiskID should maintain its last known value %s, got %s", devicePathSDB, entry.path) - } - if !entry.brokenSymlink { - t.Error("gcpPersistentDiskID should be marked as broken") - } - - // Verify gcpPVCID is still valid - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) - } - if entry.brokenSymlink { - t.Error("gcpPVCID should not be marked as broken") - } - - // Remove one disk - delete(mock.MapFS, gcpPersistentDiskID) - delete(mock.symlinks, gcpPersistentDiskID) - - // Fourth listAndUpdate should remove the deleted disk - err = cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in fourth listAndUpdate: %v", err) +func TestLinkCache(t *testing.T) { + tests := []struct { + name string + setupCache func(*linkCache) + expected *linkCache + }{ + { + name: "AddOrUpdateDevice", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.AddOrUpdateDevice("symlink2", "/dev/sdb") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + "symlink1": {path: "/dev/sda", brokenSymlink: false}, + "symlink2": {path: "/dev/sdb", brokenSymlink: false}, + }, + }, + }, + { + name: "BrokenSymlink", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.BrokenSymlink("symlink1") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + "symlink1": {path: "/dev/sda", brokenSymlink: true}, + }, + }, + }, + { + name: "RemoveDevice", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.RemoveDevice("symlink1") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{}, + }, + }, } - // Verify only gcpPVCID remains - if _, exists := cache.links.devices[gcpPersistentDiskID]; exists { - t.Error("gcpPersistentDiskID should be removed from cache") - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newLinkCache() + tt.setupCache(cache) - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + if diff := cmp.Diff(tt.expected, cache, allowUnexportedLinkCache); diff != "" { + t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) + } + }) } } From 95163b7be1877970521d3cecc7943ae595af243a Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 16:30:07 -0700 Subject: [PATCH 07/23] Remove unused broken function --- pkg/linkcache/cache.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 006eed27d..c2ceb4ab1 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -54,15 +54,6 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } -func NewMockListingCache(period time.Duration, dir string) *ListingCache { - return &ListingCache{ - period: period, - dir: dir, - links: newLinkCache(), - fs: &mockFS{}, - } -} - // Run starts the cache's background loop. The filesystem is listed and the cache // updated according to the frequency specified by the period. It will run until // the context is cancelled. From 8d8d926792a4f4001723f3b3d7255b6605fb2deb Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 23 May 2025 14:04:02 -0700 Subject: [PATCH 08/23] Move partition checking into the inner linkcache type. This makes it easier to unit test. --- pkg/linkcache/cache.go | 14 ++++++++++---- pkg/linkcache/cache_test.go | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index c2ceb4ab1..034cba99d 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -99,14 +99,12 @@ func (l *ListingCache) listAndUpdate() error { continue } - // TODO(juliankatz): To have certainty this works for all edge cases, we - // need to test this with a manually partitioned disk. + diskByIdPath := filepath.Join(l.dir, entry.Name()) + if partitionNameRegex.MatchString(entry.Name()) { continue } - diskByIdPath := filepath.Join(l.dir, entry.Name()) - // Add the device to the map regardless of successful symlink eval. // Otherwise, a broken symlink will lead us to remove it from the cache. visited[diskByIdPath] = struct{}{} @@ -153,6 +151,14 @@ func newLinkCache() *linkCache { } func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { + // Ignore partitions, which are noise as far as our logging is concerned. + // Expression: -part[0-9]+$ + if partitionNameRegex.MatchString(symlink) { + // TODO(juliankatz): To have certainty this works for all edge cases, we + // need to test this with a manually partitioned disk. + return + } + prevEntry, exists := d.devices[symlink] if !exists || prevEntry.path != realPath { klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 7683ed258..0b811e833 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -170,6 +170,20 @@ func TestLinkCache(t *testing.T) { devices: map[string]linkCacheEntry{}, }, }, + { + name: "PartitionIgnored", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice(gcpPersistentDiskPartitionID, devicePathSDA+"1") + lc.AddOrUpdateDevice(gcpPersistentDiskID, devicePathSDA) + lc.AddOrUpdateDevice(gcpPVCID, devicePathSDB) + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, + }, + }, + }, } for _, tt := range tests { From 448b0bac666636e1d3d975b06e46431efbafcc0c Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 23 May 2025 14:14:36 -0700 Subject: [PATCH 09/23] Log when linkcache Run is triggered --- pkg/linkcache/cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 034cba99d..d4ec426ba 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -58,6 +58,8 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { // updated according to the frequency specified by the period. It will run until // the context is cancelled. func (l *ListingCache) Run(ctx context.Context) { + klog.Infof("Starting symlink cache watcher for directory %s with period %s", l.dir, l.period) + // Start the loop that runs every minute ticker := time.NewTicker(l.period) defer ticker.Stop() @@ -150,6 +152,9 @@ func newLinkCache() *linkCache { } } +// AddOrUpdateDevice adds a new device or updates an existing device in the cache. +// It ignores partition symlinks as they are considered noise for logging purposes. +// If the symlink already exists and the real path has changed, it logs the update. func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { // Ignore partitions, which are noise as far as our logging is concerned. // Expression: -part[0-9]+$ From 2ef351ccc70c16cb110442224ff5b611b09a52cc Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 14:38:56 -0700 Subject: [PATCH 10/23] New implementation that is hooked into nodestage/unstage. Just linux right now. --- cmd/gce-pd-csi-driver/main.go | 10 +- .../base/controller/controller.yaml | 1 + .../images/stable-master/image.yaml | 2 +- pkg/gce-pd-csi-driver/cache.go | 42 +--- pkg/gce-pd-csi-driver/gce-pd-driver.go | 3 +- pkg/gce-pd-csi-driver/node.go | 25 ++- pkg/k8sclient/node.go | 49 +++++ pkg/linkcache/cache.go | 204 ------------------ pkg/linkcache/cache_test.go | 198 ----------------- pkg/linkcache/devices.go | 136 ++++++++++++ 10 files changed, 215 insertions(+), 455 deletions(-) create mode 100644 pkg/k8sclient/node.go delete mode 100644 pkg/linkcache/cache.go create mode 100644 pkg/linkcache/devices.go diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 3235436a9..675fc9b7f 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -276,6 +276,12 @@ func handle() { klog.Fatalf("Failed to get node info from API server: %v", err.Error()) } + deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) + if err != nil { + klog.Fatalf("Failed to create device cache: %v", err.Error()) + } + go deviceCache.Run(ctx) + // TODO(2042): Move more of the constructor args into this struct nsArgs := &driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, @@ -284,6 +290,7 @@ func handle() { DataCacheEnabledNodePool: isDataCacheEnabledNodePool, SysfsPath: "/sys", MetricsManager: metricsManager, + DeviceCache: deviceCache, } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) @@ -302,9 +309,10 @@ func handle() { } } - go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx) } + klog.Infof("NOT BLOCKED") + err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) if err != nil { klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error()) diff --git a/deploy/kubernetes/base/controller/controller.yaml b/deploy/kubernetes/base/controller/controller.yaml index e15b8593a..302874f82 100644 --- a/deploy/kubernetes/base/controller/controller.yaml +++ b/deploy/kubernetes/base/controller/controller.yaml @@ -145,6 +145,7 @@ spec: - "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme" - "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml" - --enable-data-cache + - --run-node-service=false - --enable-multitenancy command: - /gce-pd-csi-driver diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index 0663677c3..b0da81d85 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -46,6 +46,6 @@ imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver # pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver - newTag: "v1.17.3" + newTag: "v1.17.4" --- diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index 1dc486690..1b894d53c 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -7,19 +7,14 @@ import ( "regexp" "strconv" "strings" - "time" csi "github.com/container-storage-interface/spec/lib/go/csi" fsnotify "github.com/fsnotify/fsnotify" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" ) const ( @@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con } func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } + if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found { dataCacheCount, err := strconv.Atoi(val) if err != nil { @@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, return 0, nil } -func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { - var nodeObj *v1.Node - backoff := wait.Backoff{ - Duration: 1 * time.Second, - Factor: 2.0, - Steps: 5, - } - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { - node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) - return false, nil - } - nodeObj = node - klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) - return true, nil - }) - - if err != nil { - klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) - } - return nodeObj, err -} - func FetchRaidedLssdCountForDatacache() (int, error) { raidedPath, err := fetchRAIDedLocalSsdPath() if err != nil { diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 76aa38381..83ff9767a 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi DataCacheEnabledNodePool: args.DataCacheEnabledNodePool, SysfsPath: args.SysfsPath, metricsManager: args.MetricsManager, + DeviceCache: args.DeviceCache, } } @@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT maxLogChar = grpcLogCharCap klog.V(4).Infof("Driver: %v", gceDriver.name) - //Start the nonblocking GRPC + // Start the nonblocking GRPC s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager) // TODO(#34): Only start specific servers based on a flag. // In the future have this only run specific combinations of servers depending on which version this is. diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 00f0b0859..9c1960cc4 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -32,14 +32,14 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs" @@ -80,6 +80,8 @@ type GCENodeServer struct { csi.UnimplementedNodeServer metricsManager *metrics.MetricsManager + // A cache of the device paths for the volumes that are attached to the node. + DeviceCache *linkcache.DeviceCache } type NodeServerArgs struct { @@ -97,6 +99,7 @@ type NodeServerArgs struct { SysfsPath string MetricsManager *metrics.MetricsManager + DeviceCache *linkcache.DeviceCache } var _ csi.NodeServer = &GCENodeServer{} @@ -509,6 +512,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } } + err = ns.DeviceCache.AddVolume(volumeID) + if err != nil { + klog.Warningf("Error adding volume %s to cache: %v", volumeID, err) + } + klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } @@ -622,6 +630,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err) } } + + ns.DeviceCache.RemoveVolume(volumeID) + klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil } @@ -899,15 +910,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) { } func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } diff --git a/pkg/k8sclient/node.go b/pkg/k8sclient/node.go new file mode 100644 index 000000000..1c4fa9b8b --- /dev/null +++ b/pkg/k8sclient/node.go @@ -0,0 +1,49 @@ +package k8sclient + +import ( + "context" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + return getNodeWithRetry(ctx, kubeClient, nodeName) +} + +func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { + var nodeObj *v1.Node + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Steps: 5, + } + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) + return false, nil + } + nodeObj = node + klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) + return true, nil + }) + + if err != nil { + klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) + } + return nodeObj, err +} diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go deleted file mode 100644 index d4ec426ba..000000000 --- a/pkg/linkcache/cache.go +++ /dev/null @@ -1,204 +0,0 @@ -package linkcache - -import ( - "context" - "fmt" - "maps" - "os" - "path/filepath" - "regexp" - "slices" - "strings" - "time" - - "k8s.io/klog/v2" -) - -var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) - -// fsInterface defines the filesystem operations needed by ListingCache -type fsInterface interface { - ReadDir(name string) ([]os.DirEntry, error) - EvalSymlinks(path string) (string, error) -} - -// realFS implements fsInterface using the real filesystem -type realFS struct{} - -func (f *realFS) ReadDir(name string) ([]os.DirEntry, error) { - return os.ReadDir(name) -} - -func (f *realFS) EvalSymlinks(path string) (string, error) { - return filepath.EvalSymlinks(path) -} - -// ListingCache polls the filesystem at the specified directory once per -// period and checks each non-directory entry for a symlink. The results are -// cached. Changes to the cache are logged, as well as the full contents of the -// cache. The cache's Run() method is expected to be called in a goroutine. Its -// cancellation is controlled via the context argument. -type ListingCache struct { - period time.Duration - dir string - links *linkCache - fs fsInterface -} - -func NewListingCache(period time.Duration, dir string) *ListingCache { - return &ListingCache{ - period: period, - dir: dir, - links: newLinkCache(), - fs: &realFS{}, - } -} - -// Run starts the cache's background loop. The filesystem is listed and the cache -// updated according to the frequency specified by the period. It will run until -// the context is cancelled. -func (l *ListingCache) Run(ctx context.Context) { - klog.Infof("Starting symlink cache watcher for directory %s with period %s", l.dir, l.period) - - // Start the loop that runs every minute - ticker := time.NewTicker(l.period) - defer ticker.Stop() - - // Initial list and update so we don't wait for the first tick. - err := l.listAndUpdate() - if err != nil { - klog.Warningf("Error listing and updating symlinks: %v", err) - } - - for { - select { - case <-ctx.Done(): - klog.Infof("Context done, stopping watcher") - return - case <-ticker.C: - err := l.listAndUpdate() - if err != nil { - klog.Warningf("Error listing and updating symlinks: %v", err) - continue - } - - klog.Infof("periodic symlink cache read: %s", l.links.String()) - } - } -} - -func (l *ListingCache) listAndUpdate() error { - visited := make(map[string]struct{}) - - entries, err := l.fs.ReadDir(l.dir) - if err != nil { - return fmt.Errorf("failed to read directory %s: %w", l.dir, err) - } - - var errs []error - for _, entry := range entries { - if entry.IsDir() { - continue - } - - diskByIdPath := filepath.Join(l.dir, entry.Name()) - - if partitionNameRegex.MatchString(entry.Name()) { - continue - } - - // Add the device to the map regardless of successful symlink eval. - // Otherwise, a broken symlink will lead us to remove it from the cache. - visited[diskByIdPath] = struct{}{} - - realFSPath, err := l.fs.EvalSymlinks(diskByIdPath) - if err != nil { - errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) - l.links.BrokenSymlink(diskByIdPath) - continue - } - - l.links.AddOrUpdateDevice(diskByIdPath, realFSPath) - } - - for _, id := range l.links.DeviceIDs() { - if _, found := visited[id]; !found { - l.links.RemoveDevice(id) - } - } - - if len(errs) > 0 { - return fmt.Errorf("failed to evaluate symlinks for %d devices: %v", len(errs), errs) - } - return nil -} - -// linkCache is a structure that maintains a cache of symlinks between -// /dev/disk/by-id and /dev/sd* paths. It provides methods to add/update, -// retrieve, and remove device symlinks from the cache. -type linkCache struct { - devices map[string]linkCacheEntry -} - -type linkCacheEntry struct { - path string - // If true, the symlink is known to be broken. - brokenSymlink bool -} - -func newLinkCache() *linkCache { - return &linkCache{ - devices: make(map[string]linkCacheEntry), - } -} - -// AddOrUpdateDevice adds a new device or updates an existing device in the cache. -// It ignores partition symlinks as they are considered noise for logging purposes. -// If the symlink already exists and the real path has changed, it logs the update. -func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { - // Ignore partitions, which are noise as far as our logging is concerned. - // Expression: -part[0-9]+$ - if partitionNameRegex.MatchString(symlink) { - // TODO(juliankatz): To have certainty this works for all edge cases, we - // need to test this with a manually partitioned disk. - return - } - - prevEntry, exists := d.devices[symlink] - if !exists || prevEntry.path != realPath { - klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) - } - d.devices[symlink] = linkCacheEntry{path: realPath, brokenSymlink: false} -} - -// BrokenSymlink marks a symlink as broken. If the symlink is not in the cache, -// it is ignored. -func (d *linkCache) BrokenSymlink(symlink string) { - if entry, ok := d.devices[symlink]; ok { - entry.brokenSymlink = true - d.devices[symlink] = entry - } -} - -func (d *linkCache) RemoveDevice(symlink string) { - if entry, ok := d.devices[symlink]; ok { - klog.Infof("Removing device %s with path %s from cache, brokenSymlink: %t", symlink, entry.path, entry.brokenSymlink) - delete(d.devices, symlink) - } -} - -func (d *linkCache) DeviceIDs() []string { - return slices.Collect(maps.Keys(d.devices)) -} - -func (d *linkCache) String() string { - var sb strings.Builder - for symlink, entry := range d.devices { - if entry.brokenSymlink { - sb.WriteString(fmt.Sprintf("%s -> broken symlink... last known value: %s; ", symlink, entry.path)) - } else { - sb.WriteString(fmt.Sprintf("%s -> %s; ", symlink, entry.path)) - } - } - return strings.TrimSuffix(sb.String(), "; ") -} diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 0b811e833..aeb2416bf 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -1,199 +1 @@ package linkcache - -import ( - "os" - "testing" - "testing/fstest" - - "github.com/google/go-cmp/cmp" -) - -var allowUnexportedLinkCache = cmp.AllowUnexported(linkCache{}, linkCacheEntry{}) - -const ( - // Test disk names in /dev/disk/by-id format - gcpPersistentDiskID = "google-persistent-disk-0" - gcpPVCID = "google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67" - gcpPersistentDiskPartitionID = "google-persistent-disk-0-part1" - - // Test device paths in /dev format - devicePathSDA = "/dev/sda" - devicePathSDB = "/dev/sdb" -) - -// mockFS implements fsInterface for testing -type mockFS struct { - fstest.MapFS - symlinks map[string]string -} - -func newMockFS() *mockFS { - return &mockFS{ - MapFS: make(fstest.MapFS), - symlinks: make(map[string]string), - } -} - -func (m *mockFS) ReadDir(name string) ([]os.DirEntry, error) { - entries, err := m.MapFS.ReadDir(name) - if err != nil { - return nil, err - } - return entries, nil -} - -func (m *mockFS) EvalSymlinks(path string) (string, error) { - if target, ok := m.symlinks[path]; ok { - return target, nil - } - return "", os.ErrNotExist -} - -func TestListAndUpdate(t *testing.T) { - tests := []struct { - name string - setupFS func(*mockFS) - expectedCache *linkCache - expectError bool - }{ - { - name: "valid symlinks", - setupFS: func(m *mockFS) { - // Create some device files - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - m.MapFS[gcpPVCID] = &fstest.MapFile{} - // Create symlinks - m.symlinks[gcpPersistentDiskID] = devicePathSDA - m.symlinks[gcpPVCID] = devicePathSDB - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, - }, - }, - expectError: false, - }, - { - name: "broken symlink not added to cache", - setupFS: func(m *mockFS) { - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - // No symlink target for gcpPersistentDiskID - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{}, - }, - expectError: true, - }, - { - name: "partition files ignored", - setupFS: func(m *mockFS) { - m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - m.symlinks[gcpPersistentDiskID] = devicePathSDA - m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - }, - }, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mock := newMockFS() - tt.setupFS(mock) - - cache := NewListingCache(0, ".") - cache.fs = mock // Inject our mock filesystem - err := cache.listAndUpdate() - - if tt.expectError { - if err == nil { - t.Error("expected error but got none") - } - } else { - if err != nil { - t.Errorf("unexpected error: %v", err) - } - } - - // Compare the entire cache state - if diff := cmp.Diff(tt.expectedCache, cache.links, allowUnexportedLinkCache); diff != "" { - t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) - } - }) - } -} - -func TestLinkCache(t *testing.T) { - tests := []struct { - name string - setupCache func(*linkCache) - expected *linkCache - }{ - { - name: "AddOrUpdateDevice", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.AddOrUpdateDevice("symlink2", "/dev/sdb") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - "symlink1": {path: "/dev/sda", brokenSymlink: false}, - "symlink2": {path: "/dev/sdb", brokenSymlink: false}, - }, - }, - }, - { - name: "BrokenSymlink", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.BrokenSymlink("symlink1") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - "symlink1": {path: "/dev/sda", brokenSymlink: true}, - }, - }, - }, - { - name: "RemoveDevice", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.RemoveDevice("symlink1") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{}, - }, - }, - { - name: "PartitionIgnored", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice(gcpPersistentDiskPartitionID, devicePathSDA+"1") - lc.AddOrUpdateDevice(gcpPersistentDiskID, devicePathSDA) - lc.AddOrUpdateDevice(gcpPVCID, devicePathSDB) - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cache := newLinkCache() - tt.setupCache(cache) - - if diff := cmp.Diff(tt.expected, cache, allowUnexportedLinkCache); diff != "" { - t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) - } - }) - } -} diff --git a/pkg/linkcache/devices.go b/pkg/linkcache/devices.go new file mode 100644 index 000000000..963ba43a9 --- /dev/null +++ b/pkg/linkcache/devices.go @@ -0,0 +1,136 @@ +package linkcache + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" +) + +const byIdDir = "/dev/disk/by-id" + +type deviceMapping struct { + symlink string + realPath string +} + +type DeviceCache struct { + volumes map[string]deviceMapping + period time.Duration + // dir is the directory to look for device symlinks + dir string +} + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) + } + + return newDeviceCacheForNode(ctx, period, node) +} + +func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.Node) (*DeviceCache, error) { + deviceCache := &DeviceCache{ + volumes: make(map[string]deviceMapping), + period: period, + dir: byIdDir, + } + + // Look at the status.volumesInUse field. For each, take the last section + // of the string (after the last "/") and call AddVolume for that + for _, volume := range node.Status.VolumesInUse { + klog.Infof("Adding volume %s to cache", string(volume)) + volumeID := strings.Split(string(volume), "^")[1] + deviceCache.AddVolume(volumeID) + } + + return deviceCache, nil +} + +// Run since it needs an infinite loop to keep itself up to date +func (d *DeviceCache) Run(ctx context.Context) { + klog.Infof("Starting device cache watcher for directory %s with period %s", d.dir, d.period) + + // Start the loop that runs every minute + ticker := time.NewTicker(d.period) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return + case <-ticker.C: + d.listAndUpdate() + + klog.Infof("Cache contents: %+v", d.volumes) + } + } +} + +// Add a volume. This will yield a call to the filesystem to find a +// /dev/disk/by-id symlink and an evaluation of that symlink. +func (d *DeviceCache) AddVolume(volumeID string) error { + klog.Infof("Adding volume %s to cache", volumeID) + + _, volumeKey, err := common.VolumeIDToKey(volumeID) + if err != nil { + return fmt.Errorf("error converting volume ID to key: %w", err) + } + deviceName, err := common.GetDeviceName(volumeKey) + if err != nil { + return fmt.Errorf("error getting device name: %w", err) + } + + // Look at the dir for a symlink that matches the pvName + symlink := filepath.Join(d.dir, "google-"+deviceName) + klog.Infof("Looking for symlink %s", symlink) + + realPath, err := filepath.EvalSymlinks(symlink) + if err != nil { + klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) + return nil + } + + klog.Infof("Found real path %s for volume %s", realPath, volumeID) + + d.volumes[volumeID] = deviceMapping{ + symlink: symlink, + realPath: realPath, + } + + return nil +} + +// Remove the volume from the cache. +func (d *DeviceCache) RemoveVolume(volumeID string) { + klog.Infof("Removing volume %s from cache", volumeID) + delete(d.volumes, volumeID) +} + +func (d *DeviceCache) listAndUpdate() { + for volumeID, device := range d.volumes { + // Evaluate the symlink + realPath, err := filepath.EvalSymlinks(device.symlink) + if err != nil { + klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) + continue + } + + // Check if the realPath has changed + if realPath != device.realPath { + klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", volumeID, device.symlink, device.realPath, realPath) + + // Update the cache with the new realPath + device.realPath = realPath + d.volumes[volumeID] = device + } + } +} From 9ab0d2faca9fe5163aa2cf2030b6d11b0b0d10a3 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 28 May 2025 13:11:52 -0700 Subject: [PATCH 11/23] Made a no-op windows implementation of the linkcache package --- manifest_osversion.sh | 4 +-- pkg/linkcache/cache_test.go | 1 - .../{devices.go => devices_linux.go} | 12 --------- pkg/linkcache/devices_windows.go | 25 +++++++++++++++++++ pkg/linkcache/types.go | 15 +++++++++++ 5 files changed, 42 insertions(+), 15 deletions(-) delete mode 100644 pkg/linkcache/cache_test.go rename pkg/linkcache/{devices.go => devices_linux.go} (94%) create mode 100644 pkg/linkcache/devices_windows.go create mode 100644 pkg/linkcache/types.go diff --git a/manifest_osversion.sh b/manifest_osversion.sh index 4feb4d5c6..37f61e3f2 100755 --- a/manifest_osversion.sh +++ b/manifest_osversion.sh @@ -4,7 +4,7 @@ set -o xtrace # The following is a workaround for issue https://github.com/moby/moby/issues/41417 # to manually inserver os.version information into docker manifest file -# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here, +# TODO: once docker manifest annotation for os.versions is available for the installed docker here, # replace the following with annotation approach. https://github.com/docker/cli/pull/2578 export DOCKER_CLI_EXPERIMENTAL=enabled @@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS" IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES" MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION} -# translate from image tag to docker manifest foler format +# translate from image tag to docker manifest folder format # e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2 manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/") echo ${manifest_folder} diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go deleted file mode 100644 index aeb2416bf..000000000 --- a/pkg/linkcache/cache_test.go +++ /dev/null @@ -1 +0,0 @@ -package linkcache diff --git a/pkg/linkcache/devices.go b/pkg/linkcache/devices_linux.go similarity index 94% rename from pkg/linkcache/devices.go rename to pkg/linkcache/devices_linux.go index 963ba43a9..a6f61edff 100644 --- a/pkg/linkcache/devices.go +++ b/pkg/linkcache/devices_linux.go @@ -15,18 +15,6 @@ import ( const byIdDir = "/dev/disk/by-id" -type deviceMapping struct { - symlink string - realPath string -} - -type DeviceCache struct { - volumes map[string]deviceMapping - period time.Duration - // dir is the directory to look for device symlinks - dir string -} - func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go new file mode 100644 index 000000000..59e8eac73 --- /dev/null +++ b/pkg/linkcache/devices_windows.go @@ -0,0 +1,25 @@ +//go:build windows + +package linkcache + +import ( + "context" + "fmt" + "time" +) + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { + return nil, fmt.Errorf("NewDeviceCacheForNode is not implemented for Windows") +} + +func (d *DeviceCache) Run(ctx context.Context) { + // Not implemented for Windows +} + +func (d *DeviceCache) AddVolume(volumeID string) error { + return fmt.Errorf("AddVolume is not implemented for Windows") +} + +func (d *DeviceCache) RemoveVolume(volumeID string) { + // Not implemented for Windows +} diff --git a/pkg/linkcache/types.go b/pkg/linkcache/types.go new file mode 100644 index 000000000..b8dcae3fb --- /dev/null +++ b/pkg/linkcache/types.go @@ -0,0 +1,15 @@ +package linkcache + +import "time" + +type deviceMapping struct { + symlink string + realPath string +} + +type DeviceCache struct { + volumes map[string]deviceMapping + period time.Duration + // dir is the directory to look for device symlinks + dir string +} From b88e5f4f2a1ee383546b7b59ebfd788fe03854be Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 15:05:05 -0700 Subject: [PATCH 12/23] Made test device caches in node_test.go --- pkg/gce-pd-csi-driver/node_test.go | 17 +++++++++---- pkg/linkcache/devices_linux.go | 39 ++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 2ac1b9e2e..0ac7805d3 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -44,11 +45,13 @@ const ( ) func getTestGCEDriver(t *testing.T) *GCEDriver { - return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) + return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})), + }) } -func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver { - return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) +func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver { + return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args) } func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver { @@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) { } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})), + }) ns := gceDriver.ns req := &csi.NodeGetVolumeStatsRequest{ @@ -1227,7 +1232,9 @@ func TestNodeStageVolume(t *testing.T) { )) } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})), + }) ns := gceDriver.ns ns.SysfsPath = tempDir + "/sys" _, err := ns.NodeStageVolume(context.Background(), tc.req) diff --git a/pkg/linkcache/devices_linux.go b/pkg/linkcache/devices_linux.go index a6f61edff..96e4732b8 100644 --- a/pkg/linkcache/devices_linux.go +++ b/pkg/linkcache/devices_linux.go @@ -21,10 +21,27 @@ func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName s return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) } - return newDeviceCacheForNode(ctx, period, node) + return newDeviceCacheForNode(period, node), nil } -func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.Node) (*DeviceCache, error) { +func TestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache { + return newDeviceCacheForNode(period, node) +} + +func TestNodeWithVolumes(volumes []string) *v1.Node { + volumesInUse := make([]v1.UniqueVolumeName, len(volumes)) + for i, volume := range volumes { + volumesInUse[i] = v1.UniqueVolumeName("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume) + } + + return &v1.Node{ + Status: v1.NodeStatus{ + VolumesInUse: volumesInUse, + }, + } +} + +func newDeviceCacheForNode(period time.Duration, node *v1.Node) *DeviceCache { deviceCache := &DeviceCache{ volumes: make(map[string]deviceMapping), period: period, @@ -35,11 +52,23 @@ func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.N // of the string (after the last "/") and call AddVolume for that for _, volume := range node.Status.VolumesInUse { klog.Infof("Adding volume %s to cache", string(volume)) - volumeID := strings.Split(string(volume), "^")[1] - deviceCache.AddVolume(volumeID) + vID, err := pvNameFromVolumeID(string(volume)) + if err != nil { + klog.Warningf("failure to retrieve name, skipping volume %q: %v", string(volume), err) + continue + } + deviceCache.AddVolume(vID) } - return deviceCache, nil + return deviceCache +} + +func pvNameFromVolumeID(volumeID string) (string, error) { + tokens := strings.Split(volumeID, "^") + if len(tokens) != 2 { + return "", fmt.Errorf("invalid volume ID, split on `^` returns %d tokens, expected 2", len(tokens)) + } + return tokens[1], nil } // Run since it needs an infinite loop to keep itself up to date From d76c44cb5de3c7cc8fd3e9a1c182ef489600045f Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 15:16:32 -0700 Subject: [PATCH 13/23] Fix sanity test --- test/sanity/sanity_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 89192f150..408dc047e 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -33,6 +33,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -77,6 +78,7 @@ func TestSanity(t *testing.T) { EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0, EnableDataCache: enableDataCache, + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{})), } // Initialize GCE Driver From 4abd5407c61d1a5762e2127f2b83d5b1d65d848c Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 15:35:40 -0700 Subject: [PATCH 14/23] Only warn on failure to create cache --- cmd/gce-pd-csi-driver/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 675fc9b7f..817502375 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -278,7 +278,7 @@ func handle() { deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) if err != nil { - klog.Fatalf("Failed to create device cache: %v", err.Error()) + klog.Warningf("Failed to create device cache: %v", err.Error()) } go deviceCache.Run(ctx) @@ -311,8 +311,6 @@ func handle() { } - klog.Infof("NOT BLOCKED") - err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) if err != nil { klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error()) From c4b69f3f6c47cba4b094fca2076d35a6d7d15bf4 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 15:37:45 -0700 Subject: [PATCH 15/23] Only warn on windows instantiation --- pkg/linkcache/devices_windows.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go index 59e8eac73..bfed9158d 100644 --- a/pkg/linkcache/devices_windows.go +++ b/pkg/linkcache/devices_windows.go @@ -6,10 +6,13 @@ import ( "context" "fmt" "time" + + "k8s.io/klog/v2" ) func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { - return nil, fmt.Errorf("NewDeviceCacheForNode is not implemented for Windows") + klog.Warningf("NewDeviceCacheForNode is not implemented for Windows") + return nil, nil } func (d *DeviceCache) Run(ctx context.Context) { From 042176ae273bc5f702ac92b23444650bd6ead332 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:02:19 -0700 Subject: [PATCH 16/23] Make non-implemented on windows an info --- pkg/linkcache/devices_windows.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go index bfed9158d..b767556ea 100644 --- a/pkg/linkcache/devices_windows.go +++ b/pkg/linkcache/devices_windows.go @@ -4,14 +4,13 @@ package linkcache import ( "context" - "fmt" "time" "k8s.io/klog/v2" ) func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { - klog.Warningf("NewDeviceCacheForNode is not implemented for Windows") + klog.Infof("NewDeviceCacheForNode is not implemented for Windows") return nil, nil } @@ -20,7 +19,8 @@ func (d *DeviceCache) Run(ctx context.Context) { } func (d *DeviceCache) AddVolume(volumeID string) error { - return fmt.Errorf("AddVolume is not implemented for Windows") + klog.Infof("AddVolume is not implemented for Windows") + return nil } func (d *DeviceCache) RemoveVolume(volumeID string) { From bc8defa91179aa393833fbaf9411bc3e0abcb401 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:44:19 -0700 Subject: [PATCH 17/23] Improved some error messages to provide better test failure feedback --- test/remote/setup-teardown.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/remote/setup-teardown.go b/test/remote/setup-teardown.go index 3026b28b0..14cbbbf19 100644 --- a/test/remote/setup-teardown.go +++ b/test/remote/setup-teardown.go @@ -73,7 +73,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes archiveName := fmt.Sprintf("e2e_driver_binaries_%s.tar.gz", uuid.NewUUID()) archivePath, err := CreateDriverArchive(archiveName, instance.cfg.Architecture, config.PkgPath, config.BinPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create driver archive: %v", err.Error()) } defer func() { err = os.Remove(archivePath) @@ -92,7 +92,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes // Upload archive to instance and run binaries driverPID, err := instance.UploadAndRun(archivePath, config.WorkspaceDir, config.RunDriverCmd) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to upload and run driver: %v", err.Error()) } // Create an SSH tunnel from port to port From 170e24b3784eb5839b357e542834dc157da61c41 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:47:18 -0700 Subject: [PATCH 18/23] Always print helpful logs in failing area --- test/remote/runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/remote/runner.go b/test/remote/runner.go index 860646e71..87543e433 100644 --- a/test/remote/runner.go +++ b/test/remote/runner.go @@ -29,7 +29,7 @@ import ( func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd string) (int, error) { // Create the temp staging directory - klog.V(4).Infof("Staging test binaries on %q", i.cfg.Name) + klog.Infof("Staging test binaries on %q", i.cfg.Name) // Do not sudo here, so that we can use scp to copy test archive to the directdory. if output, err := i.SSHNoSudo("mkdir", remoteWorkspace); err != nil { @@ -49,7 +49,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s fmt.Sprintf("cd %s", remoteWorkspace), fmt.Sprintf("tar -xzvf ./%s", archiveName), ) - klog.V(4).Infof("Extracting tar on %q", i.cfg.Name) + klog.Infof("Extracting tar on %q", i.cfg.Name) // Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but // we want the extracted files to be owned by the current user. if output, err := i.SSHNoSudo("sh", "-c", cmd); err != nil { @@ -57,7 +57,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s return -1, fmt.Errorf("failed to extract test archive: %v, output: %q", err.Error(), output) } - klog.V(4).Infof("Starting driver on %q", i.cfg.Name) + klog.Infof("Starting driver on %q", i.cfg.Name) // When the process is killed the driver should close the TCP endpoint, then we want to download the logs output, err := i.SSH(driverRunCmd) if err != nil { From be4b0459031f6cf8cc8e030dccdd305b94024776 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:11:28 -0700 Subject: [PATCH 19/23] Remove now unnecessary corp-helper when running from cloudtop --- test/remote/instance.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/remote/instance.go b/test/remote/instance.go index 554e7612e..84c56625f 100644 --- a/test/remote/instance.go +++ b/test/remote/instance.go @@ -235,7 +235,7 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { } if i.cfg.CloudtopHost { - output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project, "--", "-o", "ProxyCommand=corp-ssh-helper %h %p", "--", "echo").CombinedOutput() + output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project).CombinedOutput() if err != nil { klog.Errorf("Failed to bootstrap ssh (%v): %s", err, string(output)) return false, nil @@ -257,9 +257,8 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { return true, nil }) - // If instance didn't reach running state in time, return with error now. if err != nil { - return err + return fmt.Errorf("instance %v did not reach running state in time: %v", i.cfg.Name, err.Error()) } // Instance reached running state in time, make sure that cloud-init is complete From 6ef07a2c6e6200a71d5c88f6da4f6fce34690394 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:24:41 -0700 Subject: [PATCH 20/23] Only run device cache if successfully created --- cmd/gce-pd-csi-driver/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 817502375..d24d0ac53 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -279,8 +279,9 @@ func handle() { deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) if err != nil { klog.Warningf("Failed to create device cache: %v", err.Error()) + } else { + go deviceCache.Run(ctx) } - go deviceCache.Run(ctx) // TODO(2042): Move more of the constructor args into this struct nsArgs := &driver.NodeServerArgs{ From 3437efdb6dcd1c0e8ca46ad51533dfa13f4ba385 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:27:39 -0700 Subject: [PATCH 21/23] Replace verbosities --- test/remote/runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/remote/runner.go b/test/remote/runner.go index 87543e433..860646e71 100644 --- a/test/remote/runner.go +++ b/test/remote/runner.go @@ -29,7 +29,7 @@ import ( func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd string) (int, error) { // Create the temp staging directory - klog.Infof("Staging test binaries on %q", i.cfg.Name) + klog.V(4).Infof("Staging test binaries on %q", i.cfg.Name) // Do not sudo here, so that we can use scp to copy test archive to the directdory. if output, err := i.SSHNoSudo("mkdir", remoteWorkspace); err != nil { @@ -49,7 +49,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s fmt.Sprintf("cd %s", remoteWorkspace), fmt.Sprintf("tar -xzvf ./%s", archiveName), ) - klog.Infof("Extracting tar on %q", i.cfg.Name) + klog.V(4).Infof("Extracting tar on %q", i.cfg.Name) // Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but // we want the extracted files to be owned by the current user. if output, err := i.SSHNoSudo("sh", "-c", cmd); err != nil { @@ -57,7 +57,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s return -1, fmt.Errorf("failed to extract test archive: %v, output: %q", err.Error(), output) } - klog.Infof("Starting driver on %q", i.cfg.Name) + klog.V(4).Infof("Starting driver on %q", i.cfg.Name) // When the process is killed the driver should close the TCP endpoint, then we want to download the logs output, err := i.SSH(driverRunCmd) if err != nil { From 3824cbb806f1cab09c1285cad2e94b9b6c77498c Mon Sep 17 00:00:00 2001 From: Engin Akdemir Date: Tue, 5 Aug 2025 16:44:13 +0000 Subject: [PATCH 22/23] Add nil checks around the usage of the device cache --- pkg/gce-pd-csi-driver/node.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 9c1960cc4..f96bdac14 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -512,9 +512,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } } - err = ns.DeviceCache.AddVolume(volumeID) - if err != nil { - klog.Warningf("Error adding volume %s to cache: %v", volumeID, err) + if ns.DeviceCache != nil { + err = ns.DeviceCache.AddVolume(volumeID) + if err != nil { + klog.Warningf("Error adding volume %s to cache: %v", volumeID, err) + } } klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath) @@ -631,7 +633,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns } } - ns.DeviceCache.RemoveVolume(volumeID) + if ns.DeviceCache != nil { + ns.DeviceCache.RemoveVolume(volumeID) + } klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil From 61aeffd663c1f26f5d49ccaf8f5edc4f3af32490 Mon Sep 17 00:00:00 2001 From: Engin Akdemir Date: Thu, 7 Aug 2025 20:37:26 +0000 Subject: [PATCH 23/23] Add support for NVMe disk types by using deviceutils Change volumes map to use symlink as key and add volumeID and real path to the deviceMappings object, this is because there can be multiple symlinks per volume Filter out caching and logging info on non pd/hyperdisk volumes Failure to get real path at add volume time is no longer an error the real path may be unavailable at this time Fix the device_windows.go NewDeviceCacheForNode parameters --- cmd/gce-pd-csi-driver/main.go | 4 +- pkg/gce-pd-csi-driver/node_test.go | 6 +- pkg/linkcache/devices_linux.go | 101 +++++++++++++++++------------ pkg/linkcache/devices_windows.go | 3 +- pkg/linkcache/types.go | 17 +++-- test/sanity/sanity_test.go | 2 +- 6 files changed, 81 insertions(+), 52 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index d24d0ac53..d72a2c51f 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -98,6 +98,8 @@ var ( diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.") + diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks") + version string ) @@ -276,7 +278,7 @@ func handle() { klog.Fatalf("Failed to get node info from API server: %v", err.Error()) } - deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) + deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, *diskCacheSyncPeriod, *nodeName, driverName, deviceUtils) if err != nil { klog.Warningf("Failed to create device cache: %v", err.Error()) } else { diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 0ac7805d3..77ef9c39c 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -46,7 +46,7 @@ const ( func getTestGCEDriver(t *testing.T) *GCEDriver { return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{ - DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})), + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{defaultVolumeID})), }) } @@ -192,7 +192,7 @@ func TestNodeGetVolumeStats(t *testing.T) { mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ - DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})), + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{tc.volumeID})), }) ns := gceDriver.ns @@ -1233,7 +1233,7 @@ func TestNodeStageVolume(t *testing.T) { } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true}) gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ - DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})), + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{volumeID})), }) ns := gceDriver.ns ns.SysfsPath = tempDir + "/sys" diff --git a/pkg/linkcache/devices_linux.go b/pkg/linkcache/devices_linux.go index 96e4732b8..71e733e35 100644 --- a/pkg/linkcache/devices_linux.go +++ b/pkg/linkcache/devices_linux.go @@ -10,25 +10,26 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" ) const byIdDir = "/dev/disk/by-id" -func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) { node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) } - return newDeviceCacheForNode(period, node), nil + return newDeviceCacheForNode(period, node, driverName, deviceUtils), nil } -func TestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache { - return newDeviceCacheForNode(period, node) +func NewTestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache { + return newDeviceCacheForNode(period, node, "pd.csi.storage.gke.io", deviceutils.NewDeviceUtils()) } -func TestNodeWithVolumes(volumes []string) *v1.Node { +func NewTestNodeWithVolumes(volumes []string) *v1.Node { volumesInUse := make([]v1.UniqueVolumeName, len(volumes)) for i, volume := range volumes { volumesInUse[i] = v1.UniqueVolumeName("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume) @@ -41,36 +42,37 @@ func TestNodeWithVolumes(volumes []string) *v1.Node { } } -func newDeviceCacheForNode(period time.Duration, node *v1.Node) *DeviceCache { +func newDeviceCacheForNode(period time.Duration, node *v1.Node, driverName string, deviceUtils deviceutils.DeviceUtils) *DeviceCache { deviceCache := &DeviceCache{ - volumes: make(map[string]deviceMapping), - period: period, - dir: byIdDir, + symlinks: make(map[string]deviceMapping), + period: period, + deviceUtils: deviceUtils, + dir: byIdDir, } // Look at the status.volumesInUse field. For each, take the last section // of the string (after the last "/") and call AddVolume for that for _, volume := range node.Status.VolumesInUse { - klog.Infof("Adding volume %s to cache", string(volume)) - vID, err := pvNameFromVolumeID(string(volume)) - if err != nil { - klog.Warningf("failure to retrieve name, skipping volume %q: %v", string(volume), err) + volumeName := string(volume) + tokens := strings.Split(volumeName, "^") + if len(tokens) != 2 { + klog.V(5).Infof("Skipping volume %q because splitting volumeName on `^` returns %d tokens, expected 2", volumeName, len(tokens)) + continue + } + + // The first token is of the form "kubernetes.io/csi/" or just "". + // We should check if it contains the driver name we are interested in. + if !strings.Contains(tokens[0], driverName) { + klog.V(5).Infof("Skipping volume %q because it is not a %s volume.", volumeName, driverName) continue } - deviceCache.AddVolume(vID) + klog.Infof("Adding volume %s to cache", string(volume)) + deviceCache.AddVolume(tokens[1]) } return deviceCache } -func pvNameFromVolumeID(volumeID string) (string, error) { - tokens := strings.Split(volumeID, "^") - if len(tokens) != 2 { - return "", fmt.Errorf("invalid volume ID, split on `^` returns %d tokens, expected 2", len(tokens)) - } - return tokens[1], nil -} - // Run since it needs an infinite loop to keep itself up to date func (d *DeviceCache) Run(ctx context.Context) { klog.Infof("Starting device cache watcher for directory %s with period %s", d.dir, d.period) @@ -87,7 +89,7 @@ func (d *DeviceCache) Run(ctx context.Context) { case <-ticker.C: d.listAndUpdate() - klog.Infof("Cache contents: %+v", d.volumes) + klog.Infof("Cache contents: %+v", d.symlinks) } } } @@ -106,21 +108,32 @@ func (d *DeviceCache) AddVolume(volumeID string) error { return fmt.Errorf("error getting device name: %w", err) } - // Look at the dir for a symlink that matches the pvName - symlink := filepath.Join(d.dir, "google-"+deviceName) - klog.Infof("Looking for symlink %s", symlink) - - realPath, err := filepath.EvalSymlinks(symlink) - if err != nil { - klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) - return nil + symlinks := d.deviceUtils.GetDiskByIdPaths(deviceName, "") + if len(symlinks) == 0 { + return fmt.Errorf("no symlink paths found for volume %s", volumeID) } - klog.Infof("Found real path %s for volume %s", realPath, volumeID) + d.mutex.Lock() + defer d.mutex.Unlock() - d.volumes[volumeID] = deviceMapping{ - symlink: symlink, - realPath: realPath, + // We may have multiple symlinks for a given device, we should add all of them. + for _, symlink := range symlinks { + realPath, err := filepath.EvalSymlinks(symlink) + if err != nil { + // This is not an error, as the symlink may not have been created yet. + // Leave real_path empty; the periodic check will update it. + klog.V(5).Infof("Could not evaluate symlink %s, will retry: %v", symlink, err) + realPath = "" + } else { + klog.Infof("Found real path %s for volume %s", realPath, volumeID) + } + // The key is the symlink path. The value contains the evaluated + // real path and the original volumeID for better logging. + d.symlinks[symlink] = deviceMapping{ + volumeID: volumeID, + realPath: realPath, + } + klog.V(4).Infof("Added volume %s to cache with symlink %s", volumeID, symlink) } return nil @@ -129,25 +142,31 @@ func (d *DeviceCache) AddVolume(volumeID string) error { // Remove the volume from the cache. func (d *DeviceCache) RemoveVolume(volumeID string) { klog.Infof("Removing volume %s from cache", volumeID) - delete(d.volumes, volumeID) + d.mutex.Lock() + defer d.mutex.Unlock() + for symlink, device := range d.symlinks { + if device.volumeID == volumeID { + delete(d.symlinks, symlink) + } + } } func (d *DeviceCache) listAndUpdate() { - for volumeID, device := range d.volumes { + for symlink, device := range d.symlinks { // Evaluate the symlink - realPath, err := filepath.EvalSymlinks(device.symlink) + realPath, err := filepath.EvalSymlinks(symlink) if err != nil { - klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) + klog.Warningf("Error evaluating symlink for volume %s: %v", device.volumeID, err) continue } // Check if the realPath has changed if realPath != device.realPath { - klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", volumeID, device.symlink, device.realPath, realPath) + klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", device.volumeID, symlink, device.realPath, realPath) // Update the cache with the new realPath device.realPath = realPath - d.volumes[volumeID] = device + d.symlinks[symlink] = device } } } diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go index b767556ea..d7937bc0b 100644 --- a/pkg/linkcache/devices_windows.go +++ b/pkg/linkcache/devices_windows.go @@ -7,9 +7,10 @@ import ( "time" "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" ) -func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) { klog.Infof("NewDeviceCacheForNode is not implemented for Windows") return nil, nil } diff --git a/pkg/linkcache/types.go b/pkg/linkcache/types.go index b8dcae3fb..04d4688eb 100644 --- a/pkg/linkcache/types.go +++ b/pkg/linkcache/types.go @@ -1,15 +1,22 @@ package linkcache -import "time" +import ( + "sync" + "time" + + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" +) type deviceMapping struct { - symlink string + volumeID string realPath string } type DeviceCache struct { - volumes map[string]deviceMapping - period time.Duration + mutex sync.Mutex + symlinks map[string]deviceMapping + period time.Duration // dir is the directory to look for device symlinks - dir string + dir string + deviceUtils deviceutils.DeviceUtils } diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 408dc047e..5ce72493c 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -78,7 +78,7 @@ func TestSanity(t *testing.T) { EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0, EnableDataCache: enableDataCache, - DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{})), + DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{})), } // Initialize GCE Driver