Skip to content

Commit 61aeffd

Browse files
committed
Add support for NVMe disk types by using deviceutils
Change volumes map to use symlink as key and add volumeID and real path to the deviceMappings object, this is because there can be multiple symlinks per volume Filter out caching and logging info on non pd/hyperdisk volumes Failure to get real path at add volume time is no longer an error the real path may be unavailable at this time Fix the device_windows.go NewDeviceCacheForNode parameters
1 parent 3824cbb commit 61aeffd

File tree

6 files changed

+81
-52
lines changed

6 files changed

+81
-52
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ var (
9898

9999
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.")
100100

101+
diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks")
102+
101103
version string
102104
)
103105

@@ -276,7 +278,7 @@ func handle() {
276278
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
277279
}
278280

279-
deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName)
281+
deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, *diskCacheSyncPeriod, *nodeName, driverName, deviceUtils)
280282
if err != nil {
281283
klog.Warningf("Failed to create device cache: %v", err.Error())
282284
} else {

pkg/gce-pd-csi-driver/node_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const (
4646

4747
func getTestGCEDriver(t *testing.T) *GCEDriver {
4848
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{
49-
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})),
49+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{defaultVolumeID})),
5050
})
5151
}
5252

@@ -192,7 +192,7 @@ func TestNodeGetVolumeStats(t *testing.T) {
192192

193193
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
194194
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
195-
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})),
195+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{tc.volumeID})),
196196
})
197197
ns := gceDriver.ns
198198

@@ -1233,7 +1233,7 @@ func TestNodeStageVolume(t *testing.T) {
12331233
}
12341234
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true})
12351235
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
1236-
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})),
1236+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{volumeID})),
12371237
})
12381238
ns := gceDriver.ns
12391239
ns.SysfsPath = tempDir + "/sys"

pkg/linkcache/devices_linux.go

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,26 @@ import (
1010
v1 "k8s.io/api/core/v1"
1111
"k8s.io/klog/v2"
1212
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
13+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
1314
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
1415
)
1516

1617
const byIdDir = "/dev/disk/by-id"
1718

18-
func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) {
19+
func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) {
1920
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
2021
if err != nil {
2122
return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err)
2223
}
2324

24-
return newDeviceCacheForNode(period, node), nil
25+
return newDeviceCacheForNode(period, node, driverName, deviceUtils), nil
2526
}
2627

27-
func TestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache {
28-
return newDeviceCacheForNode(period, node)
28+
func NewTestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache {
29+
return newDeviceCacheForNode(period, node, "pd.csi.storage.gke.io", deviceutils.NewDeviceUtils())
2930
}
3031

31-
func TestNodeWithVolumes(volumes []string) *v1.Node {
32+
func NewTestNodeWithVolumes(volumes []string) *v1.Node {
3233
volumesInUse := make([]v1.UniqueVolumeName, len(volumes))
3334
for i, volume := range volumes {
3435
volumesInUse[i] = v1.UniqueVolumeName("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume)
@@ -41,36 +42,37 @@ func TestNodeWithVolumes(volumes []string) *v1.Node {
4142
}
4243
}
4344

44-
func newDeviceCacheForNode(period time.Duration, node *v1.Node) *DeviceCache {
45+
func newDeviceCacheForNode(period time.Duration, node *v1.Node, driverName string, deviceUtils deviceutils.DeviceUtils) *DeviceCache {
4546
deviceCache := &DeviceCache{
46-
volumes: make(map[string]deviceMapping),
47-
period: period,
48-
dir: byIdDir,
47+
symlinks: make(map[string]deviceMapping),
48+
period: period,
49+
deviceUtils: deviceUtils,
50+
dir: byIdDir,
4951
}
5052

5153
// Look at the status.volumesInUse field. For each, take the last section
5254
// of the string (after the last "/") and call AddVolume for that
5355
for _, volume := range node.Status.VolumesInUse {
54-
klog.Infof("Adding volume %s to cache", string(volume))
55-
vID, err := pvNameFromVolumeID(string(volume))
56-
if err != nil {
57-
klog.Warningf("failure to retrieve name, skipping volume %q: %v", string(volume), err)
56+
volumeName := string(volume)
57+
tokens := strings.Split(volumeName, "^")
58+
if len(tokens) != 2 {
59+
klog.V(5).Infof("Skipping volume %q because splitting volumeName on `^` returns %d tokens, expected 2", volumeName, len(tokens))
60+
continue
61+
}
62+
63+
// The first token is of the form "kubernetes.io/csi/<driver-name>" or just "<driver-name>".
64+
// We should check if it contains the driver name we are interested in.
65+
if !strings.Contains(tokens[0], driverName) {
66+
klog.V(5).Infof("Skipping volume %q because it is not a %s volume.", volumeName, driverName)
5867
continue
5968
}
60-
deviceCache.AddVolume(vID)
69+
klog.Infof("Adding volume %s to cache", string(volume))
70+
deviceCache.AddVolume(tokens[1])
6171
}
6272

6373
return deviceCache
6474
}
6575

66-
func pvNameFromVolumeID(volumeID string) (string, error) {
67-
tokens := strings.Split(volumeID, "^")
68-
if len(tokens) != 2 {
69-
return "", fmt.Errorf("invalid volume ID, split on `^` returns %d tokens, expected 2", len(tokens))
70-
}
71-
return tokens[1], nil
72-
}
73-
7476
// Run since it needs an infinite loop to keep itself up to date
7577
func (d *DeviceCache) Run(ctx context.Context) {
7678
klog.Infof("Starting device cache watcher for directory %s with period %s", d.dir, d.period)
@@ -87,7 +89,7 @@ func (d *DeviceCache) Run(ctx context.Context) {
8789
case <-ticker.C:
8890
d.listAndUpdate()
8991

90-
klog.Infof("Cache contents: %+v", d.volumes)
92+
klog.Infof("Cache contents: %+v", d.symlinks)
9193
}
9294
}
9395
}
@@ -106,21 +108,32 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
106108
return fmt.Errorf("error getting device name: %w", err)
107109
}
108110

109-
// Look at the dir for a symlink that matches the pvName
110-
symlink := filepath.Join(d.dir, "google-"+deviceName)
111-
klog.Infof("Looking for symlink %s", symlink)
112-
113-
realPath, err := filepath.EvalSymlinks(symlink)
114-
if err != nil {
115-
klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err)
116-
return nil
111+
symlinks := d.deviceUtils.GetDiskByIdPaths(deviceName, "")
112+
if len(symlinks) == 0 {
113+
return fmt.Errorf("no symlink paths found for volume %s", volumeID)
117114
}
118115

119-
klog.Infof("Found real path %s for volume %s", realPath, volumeID)
116+
d.mutex.Lock()
117+
defer d.mutex.Unlock()
120118

121-
d.volumes[volumeID] = deviceMapping{
122-
symlink: symlink,
123-
realPath: realPath,
119+
// We may have multiple symlinks for a given device, we should add all of them.
120+
for _, symlink := range symlinks {
121+
realPath, err := filepath.EvalSymlinks(symlink)
122+
if err != nil {
123+
// This is not an error, as the symlink may not have been created yet.
124+
// Leave real_path empty; the periodic check will update it.
125+
klog.V(5).Infof("Could not evaluate symlink %s, will retry: %v", symlink, err)
126+
realPath = ""
127+
} else {
128+
klog.Infof("Found real path %s for volume %s", realPath, volumeID)
129+
}
130+
// The key is the symlink path. The value contains the evaluated
131+
// real path and the original volumeID for better logging.
132+
d.symlinks[symlink] = deviceMapping{
133+
volumeID: volumeID,
134+
realPath: realPath,
135+
}
136+
klog.V(4).Infof("Added volume %s to cache with symlink %s", volumeID, symlink)
124137
}
125138

126139
return nil
@@ -129,25 +142,31 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
129142
// Remove the volume from the cache.
130143
func (d *DeviceCache) RemoveVolume(volumeID string) {
131144
klog.Infof("Removing volume %s from cache", volumeID)
132-
delete(d.volumes, volumeID)
145+
d.mutex.Lock()
146+
defer d.mutex.Unlock()
147+
for symlink, device := range d.symlinks {
148+
if device.volumeID == volumeID {
149+
delete(d.symlinks, symlink)
150+
}
151+
}
133152
}
134153

135154
func (d *DeviceCache) listAndUpdate() {
136-
for volumeID, device := range d.volumes {
155+
for symlink, device := range d.symlinks {
137156
// Evaluate the symlink
138-
realPath, err := filepath.EvalSymlinks(device.symlink)
157+
realPath, err := filepath.EvalSymlinks(symlink)
139158
if err != nil {
140-
klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err)
159+
klog.Warningf("Error evaluating symlink for volume %s: %v", device.volumeID, err)
141160
continue
142161
}
143162

144163
// Check if the realPath has changed
145164
if realPath != device.realPath {
146-
klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", volumeID, device.symlink, device.realPath, realPath)
165+
klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", device.volumeID, symlink, device.realPath, realPath)
147166

148167
// Update the cache with the new realPath
149168
device.realPath = realPath
150-
d.volumes[volumeID] = device
169+
d.symlinks[symlink] = device
151170
}
152171
}
153172
}

pkg/linkcache/devices_windows.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
"time"
88

99
"k8s.io/klog/v2"
10+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
1011
)
1112

12-
func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) {
13+
func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string, driverName string, deviceUtils deviceutils.DeviceUtils) (*DeviceCache, error) {
1314
klog.Infof("NewDeviceCacheForNode is not implemented for Windows")
1415
return nil, nil
1516
}

pkg/linkcache/types.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
package linkcache
22

3-
import "time"
3+
import (
4+
"sync"
5+
"time"
6+
7+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
8+
)
49

510
type deviceMapping struct {
6-
symlink string
11+
volumeID string
712
realPath string
813
}
914

1015
type DeviceCache struct {
11-
volumes map[string]deviceMapping
12-
period time.Duration
16+
mutex sync.Mutex
17+
symlinks map[string]deviceMapping
18+
period time.Duration
1319
// dir is the directory to look for device symlinks
14-
dir string
20+
dir string
21+
deviceUtils deviceutils.DeviceUtils
1522
}

test/sanity/sanity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestSanity(t *testing.T) {
7878
EnableDeviceInUseCheck: true,
7979
DeviceInUseTimeout: 0,
8080
EnableDataCache: enableDataCache,
81-
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{})),
81+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{})),
8282
}
8383

8484
// Initialize GCE Driver

0 commit comments

Comments
 (0)