Skip to content

Commit e6c9004

Browse files
authored
Merge pull request #2185 from k8s-infra-cherrypick-robot/cherry-pick-2141-to-release-1.21
[release-1.21] Cache and print devices for debugging future outages
2 parents f753762 + b281364 commit e6c9004

File tree

16 files changed

+337
-69
lines changed

16 files changed

+337
-69
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
3535
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
3636
driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver"
37+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
3738
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
3839
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
3940
)
@@ -97,6 +98,8 @@ var (
9798

9899
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.")
99100

101+
diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks")
102+
100103
version string
101104
)
102105

@@ -275,6 +278,13 @@ func handle() {
275278
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
276279
}
277280

281+
deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, *diskCacheSyncPeriod, *nodeName, driverName, deviceUtils)
282+
if err != nil {
283+
klog.Warningf("Failed to create device cache: %v", err.Error())
284+
} else {
285+
go deviceCache.Run(ctx)
286+
}
287+
278288
// TODO(2042): Move more of the constructor args into this struct
279289
nsArgs := &driver.NodeServerArgs{
280290
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
@@ -283,6 +293,7 @@ func handle() {
283293
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
284294
SysfsPath: "/sys",
285295
MetricsManager: metricsManager,
296+
DeviceCache: deviceCache,
286297
}
287298
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
288299

@@ -297,9 +308,10 @@ func handle() {
297308
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
298309
klog.Errorf("Data Cache setup failed: %v", err)
299310
}
300-
go driver.StartWatcher(*nodeName)
311+
go driver.StartWatcher(ctx, *nodeName)
301312
}
302313
}
314+
303315
}
304316

305317
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --run-node-service=false
148149
- --enable-multitenancy
149150
command:
150151
- /gce-pd-csi-driver

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,6 @@ imageTag:
4646
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
4747
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
4848
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
49-
newTag: "v1.17.3"
49+
newTag: "v1.17.4"
5050
---
5151

manifest_osversion.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -o xtrace
44

55
# The following is a workaround for issue https://github.com/moby/moby/issues/41417
66
# to manually inserver os.version information into docker manifest file
7-
# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here,
7+
# TODO: once docker manifest annotation for os.versions is available for the installed docker here,
88
# replace the following with annotation approach. https://github.com/docker/cli/pull/2578
99

1010
export DOCKER_CLI_EXPERIMENTAL=enabled
@@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS"
1414
IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES"
1515
MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION}
1616

17-
# translate from image tag to docker manifest foler format
17+
# translate from image tag to docker manifest folder format
1818
# e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2
1919
manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/")
2020
echo ${manifest_folder}

pkg/gce-pd-csi-driver/cache.go

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,14 @@ import (
77
"regexp"
88
"strconv"
99
"strings"
10-
"time"
1110

1211
csi "github.com/container-storage-interface/spec/lib/go/csi"
1312
fsnotify "github.com/fsnotify/fsnotify"
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/status"
16-
v1 "k8s.io/api/core/v1"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/util/wait"
19-
"k8s.io/client-go/kubernetes"
20-
"k8s.io/client-go/rest"
2115
"k8s.io/klog/v2"
2216
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
17+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
2318
)
2419

2520
const (
@@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
257252
}
258253

259254
func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) {
260-
cfg, err := rest.InClusterConfig()
261-
if err != nil {
262-
return 0, err
263-
}
264-
kubeClient, err := kubernetes.NewForConfig(cfg)
265-
if err != nil {
266-
return 0, err
267-
}
268-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
255+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
269256
if err != nil {
270257
return 0, err
271258
}
259+
272260
if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found {
273261
dataCacheCount, err := strconv.Atoi(val)
274262
if err != nil {
@@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
280268
return 0, nil
281269
}
282270

283-
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
284-
var nodeObj *v1.Node
285-
backoff := wait.Backoff{
286-
Duration: 1 * time.Second,
287-
Factor: 2.0,
288-
Steps: 5,
289-
}
290-
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
291-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
292-
if err != nil {
293-
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
294-
return false, nil
295-
}
296-
nodeObj = node
297-
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
298-
return true, nil
299-
})
300-
301-
if err != nil {
302-
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
303-
}
304-
return nodeObj, err
305-
}
306-
307271
func FetchRaidedLssdCountForDatacache() (int, error) {
308272
raidedPath, err := fetchRAIDedLocalSsdPath()
309273
if err != nil {
@@ -617,7 +581,7 @@ func InitializeDataCacheNode(nodeId string) error {
617581
return nil
618582
}
619583

620-
func StartWatcher(nodeName string) {
584+
func StartWatcher(ctx context.Context, nodeName string) {
621585
dirToWatch := "/dev/"
622586
watcher, err := fsnotify.NewWatcher()
623587
if err != nil {
@@ -632,17 +596,20 @@ func StartWatcher(nodeName string) {
632596
}
633597
errorCh := make(chan error, 1)
634598
// Handle the error received from the watcher goroutine
635-
go watchDiskDetaches(watcher, nodeName, errorCh)
599+
go watchDiskDetaches(ctx, watcher, nodeName, errorCh)
636600

637601
select {
638602
case err := <-errorCh:
639603
klog.Errorf("watcher encountered an error: %v", err)
640604
}
641605
}
642606

643-
func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
607+
func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
644608
for {
645609
select {
610+
case <-ctx.Done():
611+
klog.Infof("Context done, stopping watcher")
612+
return nil
646613
// watch for errors
647614
case err := <-watcher.Errors:
648615
errorCh <- fmt.Errorf("disk update event errored: %v", err)

pkg/gce-pd-csi-driver/gce-pd-driver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
159159
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
160160
SysfsPath: args.SysfsPath,
161161
metricsManager: args.MetricsManager,
162+
DeviceCache: args.DeviceCache,
162163
}
163164
}
164165

@@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT
184185
maxLogChar = grpcLogCharCap
185186

186187
klog.V(4).Infof("Driver: %v", gceDriver.name)
187-
//Start the nonblocking GRPC
188+
// Start the nonblocking GRPC
188189
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
189190
// TODO(#34): Only start specific servers based on a flag.
190191
// In the future have this only run specific combinations of servers depending on which version this is.

pkg/gce-pd-csi-driver/node.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ import (
3232

3333
csi "github.com/container-storage-interface/spec/lib/go/csi"
3434

35-
"k8s.io/client-go/kubernetes"
36-
"k8s.io/client-go/rest"
3735
"k8s.io/klog/v2"
3836
"k8s.io/mount-utils"
3937

4038
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
4139
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
4240
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
41+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
42+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
4343
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
4444
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
4545
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs"
@@ -80,6 +80,8 @@ type GCENodeServer struct {
8080
csi.UnimplementedNodeServer
8181

8282
metricsManager *metrics.MetricsManager
83+
// A cache of the device paths for the volumes that are attached to the node.
84+
DeviceCache *linkcache.DeviceCache
8385
}
8486

8587
type NodeServerArgs struct {
@@ -97,6 +99,7 @@ type NodeServerArgs struct {
9799
SysfsPath string
98100

99101
MetricsManager *metrics.MetricsManager
102+
DeviceCache *linkcache.DeviceCache
100103
}
101104

102105
var _ csi.NodeServer = &GCENodeServer{}
@@ -509,6 +512,13 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
509512
}
510513
}
511514

515+
if ns.DeviceCache != nil {
516+
err = ns.DeviceCache.AddVolume(volumeID)
517+
if err != nil {
518+
klog.Warningf("Error adding volume %s to cache: %v", volumeID, err)
519+
}
520+
}
521+
512522
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
513523
return &csi.NodeStageVolumeResponse{}, nil
514524
}
@@ -622,6 +632,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
622632
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
623633
}
624634
}
635+
636+
if ns.DeviceCache != nil {
637+
ns.DeviceCache.RemoveVolume(volumeID)
638+
}
639+
625640
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
626641
return &csi.NodeUnstageVolumeResponse{}, nil
627642
}
@@ -899,15 +914,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
899914
}
900915

901916
func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
902-
cfg, err := rest.InClusterConfig()
903-
if err != nil {
904-
return 0, err
905-
}
906-
kubeClient, err := kubernetes.NewForConfig(cfg)
907-
if err != nil {
908-
return 0, err
909-
}
910-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
917+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
911918
if err != nil {
912919
return 0, err
913920
}

pkg/gce-pd-csi-driver/node_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/mount-utils"
3535
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3636
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
37+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
3738
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
3839
)
3940

@@ -44,11 +45,13 @@ const (
4445
)
4546

4647
func getTestGCEDriver(t *testing.T) *GCEDriver {
47-
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
48+
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{
49+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{defaultVolumeID})),
50+
})
4851
}
4952

50-
func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver {
51-
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
53+
func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver {
54+
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args)
5255
}
5356

5457
func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver {
@@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) {
188191
}
189192

190193
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
191-
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
194+
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
195+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{tc.volumeID})),
196+
})
192197
ns := gceDriver.ns
193198

194199
req := &csi.NodeGetVolumeStatsRequest{
@@ -1227,7 +1232,9 @@ func TestNodeStageVolume(t *testing.T) {
12271232
))
12281233
}
12291234
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true})
1230-
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
1235+
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
1236+
DeviceCache: linkcache.NewTestDeviceCache(1*time.Minute, linkcache.NewTestNodeWithVolumes([]string{volumeID})),
1237+
})
12311238
ns := gceDriver.ns
12321239
ns.SysfsPath = tempDir + "/sys"
12331240
_, err := ns.NodeStageVolume(context.Background(), tc.req)

pkg/k8sclient/node.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package k8sclient
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/util/wait"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/rest"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) {
16+
cfg, err := rest.InClusterConfig()
17+
if err != nil {
18+
return nil, err
19+
}
20+
kubeClient, err := kubernetes.NewForConfig(cfg)
21+
if err != nil {
22+
return nil, err
23+
}
24+
return getNodeWithRetry(ctx, kubeClient, nodeName)
25+
}
26+
27+
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
28+
var nodeObj *v1.Node
29+
backoff := wait.Backoff{
30+
Duration: 1 * time.Second,
31+
Factor: 2.0,
32+
Steps: 5,
33+
}
34+
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
35+
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
36+
if err != nil {
37+
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
38+
return false, nil
39+
}
40+
nodeObj = node
41+
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
42+
return true, nil
43+
})
44+
45+
if err != nil {
46+
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
47+
}
48+
return nodeObj, err
49+
}

0 commit comments

Comments
 (0)