Skip to content

Commit 4fc1f45

Browse files
committed
New implementation that is hooked into nodestage/unstage. Just linux
right now.
1 parent 2ad5fcd commit 4fc1f45

File tree

10 files changed

+219
-454
lines changed

10 files changed

+219
-454
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,12 @@ func handle() {
276276
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
277277
}
278278

279+
deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName)
280+
if err != nil {
281+
klog.Fatalf("Failed to create device cache: %v", err.Error())
282+
}
283+
go deviceCache.Run(ctx)
284+
279285
// TODO(2042): Move more of the constructor args into this struct
280286
nsArgs := &driver.NodeServerArgs{
281287
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
@@ -284,6 +290,7 @@ func handle() {
284290
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
285291
SysfsPath: "/sys",
286292
MetricsManager: metricsManager,
293+
DeviceCache: deviceCache,
287294
}
288295
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
289296

@@ -302,9 +309,10 @@ func handle() {
302309
}
303310
}
304311

305-
go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx)
306312
}
307313

314+
klog.Infof("NOT BLOCKED")
315+
308316
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
309317
if err != nil {
310318
klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --run-node-service=false
148149
- --enable-multitenancy
149150
command:
150151
- /gce-pd-csi-driver

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,14 @@ metadata:
4444
name: imagetag-gcepd-driver
4545
imageTag:
4646
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
47+
<<<<<<< HEAD
4748
# Don't change stable image without changing pdImagePlaceholder in
4849
# test/k8s-integration/main.go
4950
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
51+
=======
52+
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
53+
newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver
54+
>>>>>>> 264b82ed (New implementation that is hooked into nodestage/unstage. Just linux)
5055
newTag: "latest"
5156
---
5257

pkg/gce-pd-csi-driver/cache.go

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,14 @@ import (
77
"regexp"
88
"strconv"
99
"strings"
10-
"time"
1110

1211
csi "github.com/container-storage-interface/spec/lib/go/csi"
1312
fsnotify "github.com/fsnotify/fsnotify"
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/status"
16-
v1 "k8s.io/api/core/v1"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/util/wait"
19-
"k8s.io/client-go/kubernetes"
20-
"k8s.io/client-go/rest"
2115
"k8s.io/klog/v2"
2216
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
17+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
2318
)
2419

2520
const (
@@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
257252
}
258253

259254
func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) {
260-
cfg, err := rest.InClusterConfig()
261-
if err != nil {
262-
return 0, err
263-
}
264-
kubeClient, err := kubernetes.NewForConfig(cfg)
265-
if err != nil {
266-
return 0, err
267-
}
268-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
255+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
269256
if err != nil {
270257
return 0, err
271258
}
259+
272260
if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found {
273261
dataCacheCount, err := strconv.Atoi(val)
274262
if err != nil {
@@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
280268
return 0, nil
281269
}
282270

283-
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
284-
var nodeObj *v1.Node
285-
backoff := wait.Backoff{
286-
Duration: 1 * time.Second,
287-
Factor: 2.0,
288-
Steps: 5,
289-
}
290-
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
291-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
292-
if err != nil {
293-
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
294-
return false, nil
295-
}
296-
nodeObj = node
297-
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
298-
return true, nil
299-
})
300-
301-
if err != nil {
302-
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
303-
}
304-
return nodeObj, err
305-
}
306-
307271
func FetchRaidedLssdCountForDatacache() (int, error) {
308272
raidedPath, err := fetchRAIDedLocalSsdPath()
309273
if err != nil {

pkg/gce-pd-csi-driver/gce-pd-driver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
159159
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
160160
SysfsPath: args.SysfsPath,
161161
metricsManager: args.MetricsManager,
162+
DeviceCache: args.DeviceCache,
162163
}
163164
}
164165

@@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT
184185
maxLogChar = grpcLogCharCap
185186

186187
klog.V(4).Infof("Driver: %v", gceDriver.name)
187-
//Start the nonblocking GRPC
188+
// Start the nonblocking GRPC
188189
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
189190
// TODO(#34): Only start specific servers based on a flag.
190191
// In the future have this only run specific combinations of servers depending on which version this is.

pkg/gce-pd-csi-driver/node.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ import (
3232

3333
csi "github.com/container-storage-interface/spec/lib/go/csi"
3434

35-
"k8s.io/client-go/kubernetes"
36-
"k8s.io/client-go/rest"
3735
"k8s.io/klog/v2"
3836
"k8s.io/mount-utils"
3937

4038
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
4139
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
4240
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
41+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
42+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
4343
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
4444
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
4545
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs"
@@ -80,6 +80,8 @@ type GCENodeServer struct {
8080
csi.UnimplementedNodeServer
8181

8282
metricsManager *metrics.MetricsManager
83+
// A cache of the device paths for the volumes that are attached to the node.
84+
DeviceCache *linkcache.DeviceCache
8385
}
8486

8587
type NodeServerArgs struct {
@@ -97,6 +99,7 @@ type NodeServerArgs struct {
9799
SysfsPath string
98100

99101
MetricsManager *metrics.MetricsManager
102+
DeviceCache *linkcache.DeviceCache
100103
}
101104

102105
var _ csi.NodeServer = &GCENodeServer{}
@@ -507,6 +510,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
507510
}
508511
}
509512

513+
err = ns.DeviceCache.AddVolume(volumeID)
514+
if err != nil {
515+
klog.Warningf("Error adding volume %s to cache: %v", volumeID, err)
516+
}
517+
510518
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
511519
return &csi.NodeStageVolumeResponse{}, nil
512520
}
@@ -620,6 +628,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
620628
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
621629
}
622630
}
631+
632+
ns.DeviceCache.RemoveVolume(volumeID)
633+
623634
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
624635
return &csi.NodeUnstageVolumeResponse{}, nil
625636
}
@@ -875,15 +886,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
875886
}
876887

877888
func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
878-
cfg, err := rest.InClusterConfig()
879-
if err != nil {
880-
return 0, err
881-
}
882-
kubeClient, err := kubernetes.NewForConfig(cfg)
883-
if err != nil {
884-
return 0, err
885-
}
886-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
889+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
887890
if err != nil {
888891
return 0, err
889892
}

pkg/k8sclient/node.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package k8sclient
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/util/wait"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/rest"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) {
16+
cfg, err := rest.InClusterConfig()
17+
if err != nil {
18+
return nil, err
19+
}
20+
kubeClient, err := kubernetes.NewForConfig(cfg)
21+
if err != nil {
22+
return nil, err
23+
}
24+
return getNodeWithRetry(ctx, kubeClient, nodeName)
25+
}
26+
27+
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
28+
var nodeObj *v1.Node
29+
backoff := wait.Backoff{
30+
Duration: 1 * time.Second,
31+
Factor: 2.0,
32+
Steps: 5,
33+
}
34+
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
35+
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
36+
if err != nil {
37+
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
38+
return false, nil
39+
}
40+
nodeObj = node
41+
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
42+
return true, nil
43+
})
44+
45+
if err != nil {
46+
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
47+
}
48+
return nodeObj, err
49+
}

0 commit comments

Comments
 (0)