@@ -7,10 +7,15 @@ import (
7
7
"regexp"
8
8
"strconv"
9
9
"strings"
10
+ "time"
10
11
11
12
csi "github.com/container-storage-interface/spec/lib/go/csi"
12
13
fsnotify "github.com/fsnotify/fsnotify"
14
+ "google.golang.org/grpc/codes"
15
+ "google.golang.org/grpc/status"
16
+ v1 "k8s.io/api/core/v1"
13
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
+ "k8s.io/apimachinery/pkg/util/wait"
14
19
"k8s.io/client-go/kubernetes"
15
20
"k8s.io/client-go/rest"
16
21
"k8s.io/klog/v2"
@@ -42,7 +47,7 @@ func fetchRAIDedLocalSsdPath() (string, error) {
42
47
return "" , fmt .Errorf ("Error getting RAIDed device path for Data Cache %v, output:%v" , err , string (info ))
43
48
}
44
49
infoString := strings .TrimSpace (string (info ))
45
- infoSlice := strings .Split (infoString , " " )
50
+ infoSlice := strings .Fields (infoString )
46
51
47
52
// We want to get the second element in the array (sample: ARRAY /dev/md126 metadata=1.2 name=csi-driver-data-cache UUID=*),
48
53
// which is the path to the RAIDed device
@@ -162,7 +167,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
162
167
}
163
168
err , isCached := isCachingSetup (mainLvName )
164
169
if err != nil {
165
- klog .Errorf ("faild to check if caching ius setup for LV, continuing to setup caching." )
170
+ klog .Errorf ("failed to check if caching is setup for LV, continuing to setup caching." )
166
171
}
167
172
cacheLvName := getLvName (cacheSuffix , volumeId )
168
173
if isCached {
@@ -199,6 +204,9 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
199
204
}
200
205
info , err = common .RunCommand ("" /* pipedCmd */ , nil /* pipedCmdArg */ , "lvcreate" , args ... )
201
206
if err != nil {
207
+ if strings .Contains (err .Error (), "insufficient free space" ) {
208
+ return mainDevicePath , status .Error (codes .InvalidArgument , fmt .Sprintf ("Error setting up cache: %v" , err .Error ()))
209
+ }
202
210
return mainDevicePath , fmt .Errorf ("Errored while creating cache %w: %s" , err , info )
203
211
}
204
212
}
@@ -215,7 +223,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
215
223
req .GetPublishContext ()[common .ContextDataCacheMode ],
216
224
volumeGroupName + "/" + mainLvName ,
217
225
"--chunksize" ,
218
- chunkSize , // default unit is KiB
226
+ chunkSize ,
219
227
"--force" ,
220
228
"-y" ,
221
229
}
@@ -250,18 +258,15 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
250
258
251
259
func GetDataCacheCountFromNodeLabel (ctx context.Context , nodeName string ) (int , error ) {
252
260
cfg , err := rest .InClusterConfig ()
253
- // We want to capture API errors with node label fetching, so return -1
254
- // in those cases instead of 0.
255
261
if err != nil {
256
262
return 0 , err
257
263
}
258
264
kubeClient , err := kubernetes .NewForConfig (cfg )
259
265
if err != nil {
260
266
return 0 , err
261
267
}
262
- node , err := kubeClient . CoreV1 (). Nodes (). Get ( ctx , nodeName , metav1. GetOptions {} )
268
+ node , err := getNodeWithRetry ( ctx , kubeClient , nodeName )
263
269
if err != nil {
264
- // We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
265
270
return 0 , err
266
271
}
267
272
if val , found := node .GetLabels ()[fmt .Sprintf (common .NodeLabelPrefix , common .DataCacheLssdCountLabel )]; found {
@@ -272,10 +277,33 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
272
277
klog .V (4 ).Infof ("Number of local SSDs requested for Data Cache: %v" , dataCacheCount )
273
278
return dataCacheCount , nil
274
279
}
275
- // This will be returned for a non-Data-Cache node pool
276
280
return 0 , nil
277
281
}
278
282
283
+ func getNodeWithRetry (ctx context.Context , kubeClient * kubernetes.Clientset , nodeName string ) (* v1.Node , error ) {
284
+ var nodeObj * v1.Node
285
+ backoff := wait.Backoff {
286
+ Duration : 1 * time .Second ,
287
+ Factor : 2.0 ,
288
+ Steps : 5 ,
289
+ }
290
+ err := wait .ExponentialBackoffWithContext (ctx , backoff , func () (bool , error ) {
291
+ node , err := kubeClient .CoreV1 ().Nodes ().Get (ctx , nodeName , metav1.GetOptions {})
292
+ if err != nil {
293
+ klog .Warningf ("Error getting node %s: %v, retrying...\n " , nodeName , err )
294
+ return false , nil
295
+ }
296
+ nodeObj = node
297
+ klog .V (4 ).Infof ("Successfully retrieved node info %s\n " , nodeName )
298
+ return true , nil
299
+ })
300
+
301
+ if err != nil {
302
+ klog .Errorf ("Failed to get node %s after retries: %v\n " , nodeName , err )
303
+ }
304
+ return nodeObj , err
305
+ }
306
+
279
307
func FetchRaidedLssdCountForDatacache () (int , error ) {
280
308
raidedPath , err := fetchRAIDedLocalSsdPath ()
281
309
if err != nil {
@@ -342,7 +370,7 @@ func FetchAllLssds() ([]string, error) {
342
370
for _ , ssd := range infoList {
343
371
ssd = strings .TrimSpace (ssd )
344
372
if strings .HasPrefix (ssd , "/dev/nvme" ) {
345
- ssdDetails := strings .Split (ssd , " " )
373
+ ssdDetails := strings .Fields (ssd )
346
374
lssd := re .MatchString (ssdDetails [1 ])
347
375
if lssd {
348
376
diskList = append (diskList , strings .TrimSpace (ssdDetails [0 ]))
@@ -355,7 +383,7 @@ func FetchAllLssds() ([]string, error) {
355
383
return diskList , nil
356
384
}
357
385
358
- func FetchLSSDsWihtEmptyMountPoint () ([]string , error ) {
386
+ func FetchLSSDsWithEmptyMountPoint () ([]string , error ) {
359
387
info , err := common .RunCommand ("grep" , []string {"-E" , `^\S+\s*$` } /* pipeCmdArg */ , "lsblk" , []string {"-o" , "NAME,MOUNTPOINT" , "-pdn" }... )
360
388
if err != nil {
361
389
return nil , fmt .Errorf ("Error while fetching disks with no mount point: %v; err:%v" , info , err )
@@ -376,6 +404,7 @@ func checkVgExists(volumeGroupName string) bool {
376
404
return false
377
405
}
378
406
// Check if the required volume group already exists
407
+ klog .Infof ("check vg exists output: %v, volumeGroupName: %v" , string (info ), volumeGroupName )
379
408
return strings .Contains (string (info ), volumeGroupName )
380
409
}
381
410
@@ -462,7 +491,6 @@ func createVg(volumeGroupName string, raidedLocalSsds string) error {
462
491
463
492
func reduceVolumeGroup (volumeGroupName string , force bool ) {
464
493
if ! checkVgExists (volumeGroupName ) {
465
- klog .V (2 ).Infof ("Volume group %v not found, no further action needed" , volumeGroupName )
466
494
return
467
495
}
468
496
args := []string {
@@ -618,23 +646,17 @@ func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan
618
646
errorCh <- fmt .Errorf ("disk update event errored: %v" , err )
619
647
// watch for events
620
648
case <- watcher .Events :
621
- vgName := getVolumeGroupName (nodeName )
622
- if ! checkVgExists (vgName ) {
623
- // If the volume group doesn't exist, there's nothing to update.
624
- // Continue to the next event.
625
- continue
626
- }
627
649
// In case of an event i.e. creation or deletion of any new PV, we update the VG metadata.
628
650
// This might include some non-LVM changes, no harm in updating metadata multiple times.
629
651
args := []string {
630
652
"--updatemetadata" ,
631
- vgName ,
653
+ getVolumeGroupName ( nodeName ) ,
632
654
}
633
655
_ , err := common .RunCommand ("" /* pipedCmd */ , nil /* pipedCmdArg */ , "vgck" , args ... )
634
656
if err != nil {
635
657
klog .Errorf ("Error updating volume group's metadata: %v" , err )
636
658
}
637
- reduceVolumeGroup (vgName , true )
659
+ reduceVolumeGroup (getVolumeGroupName ( nodeName ) , true )
638
660
}
639
661
}
640
662
}
0 commit comments