@@ -10,25 +10,26 @@ import (
10
10
v1 "k8s.io/api/core/v1"
11
11
"k8s.io/klog/v2"
12
12
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
13
+ "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
13
14
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
14
15
)
15
16
16
17
const byIdDir = "/dev/disk/by-id"
17
18
18
- func NewDeviceCacheForNode (ctx context.Context , period time.Duration , nodeName string ) (* DeviceCache , error ) {
19
+ func NewDeviceCacheForNode (ctx context.Context , period time.Duration , nodeName string , driverName string , deviceUtils deviceutils. DeviceUtils ) (* DeviceCache , error ) {
19
20
node , err := k8sclient .GetNodeWithRetry (ctx , nodeName )
20
21
if err != nil {
21
22
return nil , fmt .Errorf ("failed to get node %s: %w" , nodeName , err )
22
23
}
23
24
24
- return newDeviceCacheForNode (period , node ), nil
25
+ return newDeviceCacheForNode (period , node , driverName , deviceUtils ), nil
25
26
}
26
27
27
- func TestDeviceCache (period time.Duration , node * v1.Node ) * DeviceCache {
28
- return newDeviceCacheForNode (period , node )
28
+ func NewTestDeviceCache (period time.Duration , node * v1.Node ) * DeviceCache {
29
+ return newDeviceCacheForNode (period , node , "pd.csi.storage.gke.io" , deviceutils . NewDeviceUtils () )
29
30
}
30
31
31
- func TestNodeWithVolumes (volumes []string ) * v1.Node {
32
+ func NewTestNodeWithVolumes (volumes []string ) * v1.Node {
32
33
volumesInUse := make ([]v1.UniqueVolumeName , len (volumes ))
33
34
for i , volume := range volumes {
34
35
volumesInUse [i ] = v1 .UniqueVolumeName ("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume )
@@ -41,36 +42,37 @@ func TestNodeWithVolumes(volumes []string) *v1.Node {
41
42
}
42
43
}
43
44
44
- func newDeviceCacheForNode (period time.Duration , node * v1.Node ) * DeviceCache {
45
+ func newDeviceCacheForNode (period time.Duration , node * v1.Node , driverName string , deviceUtils deviceutils. DeviceUtils ) * DeviceCache {
45
46
deviceCache := & DeviceCache {
46
- volumes : make (map [string ]deviceMapping ),
47
- period : period ,
48
- dir : byIdDir ,
47
+ symlinks : make (map [string ]deviceMapping ),
48
+ period : period ,
49
+ deviceUtils : deviceUtils ,
50
+ dir : byIdDir ,
49
51
}
50
52
51
53
// Look at the status.volumesInUse field. For each, take the last section
52
54
// of the string (after the last "/") and call AddVolume for that
53
55
for _ , volume := range node .Status .VolumesInUse {
54
- klog .Infof ("Adding volume %s to cache" , string (volume ))
55
- vID , err := pvNameFromVolumeID (string (volume ))
56
- if err != nil {
57
- klog .Warningf ("failure to retrieve name, skipping volume %q: %v" , string (volume ), err )
56
+ volumeName := string (volume )
57
+ tokens := strings .Split (volumeName , "^" )
58
+ if len (tokens ) != 2 {
59
+ klog .V (5 ).Infof ("Skipping volume %q because splitting volumeName on `^` returns %d tokens, expected 2" , volumeName , len (tokens ))
60
+ continue
61
+ }
62
+
63
+ // The first token is of the form "kubernetes.io/csi/<driver-name>" or just "<driver-name>".
64
+ // We should check if it contains the driver name we are interested in.
65
+ if ! strings .Contains (tokens [0 ], driverName ) {
66
+ klog .V (5 ).Infof ("Skipping volume %q because it is not a %s volume." , volumeName , driverName )
58
67
continue
59
68
}
60
- deviceCache .AddVolume (vID )
69
+ klog .Infof ("Adding volume %s to cache" , string (volume ))
70
+ deviceCache .AddVolume (tokens [1 ])
61
71
}
62
72
63
73
return deviceCache
64
74
}
65
75
66
- func pvNameFromVolumeID (volumeID string ) (string , error ) {
67
- tokens := strings .Split (volumeID , "^" )
68
- if len (tokens ) != 2 {
69
- return "" , fmt .Errorf ("invalid volume ID, split on `^` returns %d tokens, expected 2" , len (tokens ))
70
- }
71
- return tokens [1 ], nil
72
- }
73
-
74
76
// Run since it needs an infinite loop to keep itself up to date
75
77
func (d * DeviceCache ) Run (ctx context.Context ) {
76
78
klog .Infof ("Starting device cache watcher for directory %s with period %s" , d .dir , d .period )
@@ -87,7 +89,7 @@ func (d *DeviceCache) Run(ctx context.Context) {
87
89
case <- ticker .C :
88
90
d .listAndUpdate ()
89
91
90
- klog .Infof ("Cache contents: %+v" , d .volumes )
92
+ klog .Infof ("Cache contents: %+v" , d .symlinks )
91
93
}
92
94
}
93
95
}
@@ -106,21 +108,32 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
106
108
return fmt .Errorf ("error getting device name: %w" , err )
107
109
}
108
110
109
- // Look at the dir for a symlink that matches the pvName
110
- symlink := filepath .Join (d .dir , "google-" + deviceName )
111
- klog .Infof ("Looking for symlink %s" , symlink )
112
-
113
- realPath , err := filepath .EvalSymlinks (symlink )
114
- if err != nil {
115
- klog .Warningf ("Error evaluating symlink for volume %s: %v" , volumeID , err )
116
- return nil
111
+ symlinks := d .deviceUtils .GetDiskByIdPaths (deviceName , "" )
112
+ if len (symlinks ) == 0 {
113
+ return fmt .Errorf ("no symlink paths found for volume %s" , volumeID )
117
114
}
118
115
119
- klog .Infof ("Found real path %s for volume %s" , realPath , volumeID )
116
+ d .mutex .Lock ()
117
+ defer d .mutex .Unlock ()
120
118
121
- d .volumes [volumeID ] = deviceMapping {
122
- symlink : symlink ,
123
- realPath : realPath ,
119
+ // We may have multiple symlinks for a given device, we should add all of them.
120
+ for _ , symlink := range symlinks {
121
+ realPath , err := filepath .EvalSymlinks (symlink )
122
+ if err != nil {
123
+ // This is not an error, as the symlink may not have been created yet.
124
+ // Leave real_path empty; the periodic check will update it.
125
+ klog .V (5 ).Infof ("Could not evaluate symlink %s, will retry: %v" , symlink , err )
126
+ realPath = ""
127
+ } else {
128
+ klog .Infof ("Found real path %s for volume %s" , realPath , volumeID )
129
+ }
130
+ // The key is the symlink path. The value contains the evaluated
131
+ // real path and the original volumeID for better logging.
132
+ d .symlinks [symlink ] = deviceMapping {
133
+ volumeID : volumeID ,
134
+ realPath : realPath ,
135
+ }
136
+ klog .V (4 ).Infof ("Added volume %s to cache with symlink %s" , volumeID , symlink )
124
137
}
125
138
126
139
return nil
@@ -129,25 +142,31 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
129
142
// Remove the volume from the cache.
130
143
func (d * DeviceCache ) RemoveVolume (volumeID string ) {
131
144
klog .Infof ("Removing volume %s from cache" , volumeID )
132
- delete (d .volumes , volumeID )
145
+ d .mutex .Lock ()
146
+ defer d .mutex .Unlock ()
147
+ for symlink , device := range d .symlinks {
148
+ if device .volumeID == volumeID {
149
+ delete (d .symlinks , symlink )
150
+ }
151
+ }
133
152
}
134
153
135
154
func (d * DeviceCache ) listAndUpdate () {
136
- for volumeID , device := range d .volumes {
155
+ for symlink , device := range d .symlinks {
137
156
// Evaluate the symlink
138
- realPath , err := filepath .EvalSymlinks (device . symlink )
157
+ realPath , err := filepath .EvalSymlinks (symlink )
139
158
if err != nil {
140
- klog .Warningf ("Error evaluating symlink for volume %s: %v" , volumeID , err )
159
+ klog .Warningf ("Error evaluating symlink for volume %s: %v" , device . volumeID , err )
141
160
continue
142
161
}
143
162
144
163
// Check if the realPath has changed
145
164
if realPath != device .realPath {
146
- klog .Warningf ("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s" , volumeID , device . symlink , device .realPath , realPath )
165
+ klog .Warningf ("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s" , device . volumeID , symlink , device .realPath , realPath )
147
166
148
167
// Update the cache with the new realPath
149
168
device .realPath = realPath
150
- d .volumes [ volumeID ] = device
169
+ d .symlinks [ symlink ] = device
151
170
}
152
171
}
153
172
}
0 commit comments