@@ -7,7 +7,10 @@ import (
7
7
"strings"
8
8
9
9
csi "github.com/container-storage-interface/spec/lib/go/csi"
10
-
10
+ fsnotify "github.com/fsnotify/fsnotify"
11
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12
+ "k8s.io/client-go/kubernetes"
13
+ "k8s.io/client-go/rest"
11
14
"k8s.io/klog/v2"
12
15
13
16
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -42,7 +45,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
42
45
// Clean up Volume Group before adding the PD
43
46
reduceVolumeGroup (volumeGroupName , true )
44
47
} else {
45
- err := createVg (volumeGroupName , devicePath , raidedLocalSsdPath )
48
+ err := createVg (volumeGroupName , raidedLocalSsdPath )
46
49
if err != nil {
47
50
return mainDevicePath , err
48
51
}
@@ -241,7 +244,7 @@ func getLvName(suffix string, volumeId string) string {
241
244
return fmt .Sprintf ("%s-%s" , suffix , pvcName )
242
245
}
243
246
244
- func createVg (volumeGroupName string , devicePath string , raidedLocalSsds string ) error {
247
+ func createVg (volumeGroupName string , raidedLocalSsds string ) error {
245
248
args := []string {
246
249
"--zero" ,
247
250
"y" ,
@@ -366,3 +369,112 @@ func isCachingSetup(mainLvName string) (error, bool) {
366
369
}
367
370
return nil , false
368
371
}
372
+
373
+ func fetchChunkSizeKiB (cacheSize string ) (string , error ) {
374
+ var chunkSize float64
375
+
376
+ cacheSizeInt , err := common .ConvertGiStringToInt64 (cacheSize )
377
+ if err != nil {
378
+ return "0" , err
379
+ }
380
+ // Chunksize should be divisible by 32Kib so we need (chunksize/32*1024)*32*1024
381
+ chunkSize = (float64 (cacheSizeInt ) * GiB ) / float64 (maxAllowedChunks )
382
+ chunkSize = math .Round (chunkSize / (32 * KiB )) * (32 * KiB )
383
+ chunkSize = math .Min (math .Max (chunkSize , minChunkSize ), maxChunkSize ) / KiB
384
+ // default chunk size unit KiB
385
+ return strconv .FormatInt (int64 (chunkSize ), 10 ) + "KiB" , nil
386
+ }
387
+
388
+ func InitializeDataCacheNode (nodeId string ) error {
389
+ raidedLocalSsdPath , err := fetchRAIDedLocalSsdPath ()
390
+ if err != nil {
391
+ return err
392
+ }
393
+ volumeGroupName := getVolumeGroupName (nodeId )
394
+
395
+ vgExists := checkVgExists (volumeGroupName )
396
+ // Check if the required volume group already exists
397
+ if vgExists {
398
+ // Clean up Volume Group before adding the PD
399
+ reduceVolumeGroup (volumeGroupName , true )
400
+
401
+ // validate that raidedLSSD is part of VG
402
+ err = validateRaidedLSSDinVG (volumeGroupName , raidedLocalSsdPath )
403
+ if err != nil {
404
+ return fmt .Errorf ("failed validate local ssd in vg %v: %v" , volumeGroupName , err )
405
+ }
406
+ } else {
407
+ err := createVg (volumeGroupName , raidedLocalSsdPath )
408
+ if err != nil {
409
+ return err
410
+ }
411
+ }
412
+ return nil
413
+ }
414
+
415
+ func StartWatcher (nodeName string ) {
416
+ dirToWatch := "/dev/"
417
+ watcher , err := fsnotify .NewWatcher ()
418
+ if err != nil {
419
+ klog .V (2 ).ErrorS (err , "errored while creating watcher" )
420
+ }
421
+ klog .V (2 ).Infof ("Watcher started for directory %v" , dirToWatch )
422
+ defer watcher .Close ()
423
+
424
+ // out of the box fsnotify can watch a single file, or a single directory
425
+ if err := watcher .Add (dirToWatch ); err != nil {
426
+ klog .V (2 ).ErrorS (err , "errored while adding watcher directory" )
427
+ }
428
+ errorCh := make (chan error , 1 )
429
+ // Handle the error received from the watcher goroutine
430
+ go watchDiskDetaches (watcher , nodeName , errorCh )
431
+
432
+ select {
433
+ case err := <- errorCh :
434
+ klog .Errorf ("watcher encountered an error: %v" , err )
435
+ }
436
+ }
437
+
438
+ func watchDiskDetaches (watcher * fsnotify.Watcher , nodeName string , errorCh chan error ) error {
439
+ for {
440
+ select {
441
+ // watch for errors
442
+ case err := <- watcher .Errors :
443
+ errorCh <- fmt .Errorf ("disk update event errored: %v" , err )
444
+ // watch for events
445
+ case event := <- watcher .Events :
446
+ // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata.
447
+ // This might include some non-LVM changes, no harm in updating metadata multiple times.
448
+ reduceVolumeGroup (getVolumeGroupName (nodeName ), true )
449
+ klog .V (2 ).Infof ("disk attach/detach event %#v\n " , event )
450
+ }
451
+ }
452
+ }
453
+
454
+ func validateRaidedLSSDinVG (vgName string , lssdPath string ) error {
455
+ args := []string {
456
+ "--noheadings" ,
457
+ "-o" ,
458
+ "pv_name" ,
459
+ "--select" ,
460
+ "vg_name=" + vgName ,
461
+ }
462
+ info , err := common .RunCommand ("" /* pipedCmd */ , nil /* pipedCmdArg */ , "pvs" , args ... )
463
+ if err != nil {
464
+ return fmt .Errorf ("errored while checking physical volume details %v: %s" , err , info )
465
+ // On error info contains the error message which we cannot use for further steps
466
+ }
467
+
468
+ if ! strings .Contains (string (info ), lssdPath ) {
469
+ return addRaidedLSSDToVg (vgName , lssdPath )
470
+ }
471
+ return nil
472
+ }
473
+
474
+ func addRaidedLSSDToVg (vgName , lssdPath string ) error {
475
+ info , err := common .RunCommand ("" /* pipedCmd */ , nil /* pipedCmdArg */ , "vgextend" , []string {vgName , lssdPath }... )
476
+ if err != nil {
477
+ return fmt .Errorf ("errored while extending VGs %v: %s" , err , info )
478
+ }
479
+ return nil
480
+ }
0 commit comments