Skip to content

Cache and print devices for debugging future outages #2097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5e80117
Cache devices and their symlinks in node driver, periodically noting
julianKatz May 21, 2025
0a5f453
Some doc comment updates
julianKatz May 21, 2025
6c5fb86
Add unit tests
julianKatz May 21, 2025
67c90d0
improve partition unit test
julianKatz May 21, 2025
f13477d
Log on removal as well
julianKatz May 21, 2025
682267c
Updated unit tests to be clearer, relying on asserting linkCache
julianKatz May 21, 2025
4237b80
Remove unused broken function
julianKatz May 21, 2025
ab44f98
Move partition checking into the inner linkcache type. This makes it
julianKatz May 23, 2025
2ad5fcd
Log when linkcache Run is triggered
julianKatz May 23, 2025
4fc1f45
New implementation that is hooked into nodestage/unstage. Just linux
julianKatz Jun 6, 2025
6c599dd
Revert stable-master/image.yaml to master version
julianKatz May 28, 2025
b8f2da0
Made a no-op windows implementation of the linkcache package
julianKatz May 28, 2025
b241423
Made test device caches in node_test.go
julianKatz Jun 6, 2025
f1d1be0
Fix sanity test
julianKatz Jun 6, 2025
d55470a
Only warn on failure to create cache
julianKatz Jul 2, 2025
2f4ba12
Only warn on windows instantiation
julianKatz Jul 2, 2025
a7b3bbb
Make non-implemented on windows an info
julianKatz Jul 2, 2025
a64ceed
Fix stable-master/image.yaml
julianKatz Jul 2, 2025
1f6ace0
Improved some error messages to provide better test failure feedback
julianKatz Jul 2, 2025
833ef6e
Always print helpful logs in failing area
julianKatz Jul 2, 2025
b6a601b
Remove now unnecessary corp-helper when running from cloudtop
julianKatz Jul 3, 2025
99d9ff3
Only run device cache if successfully created
julianKatz Jul 3, 2025
5832ea4
Replace verbosities
julianKatz Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)
Expand Down Expand Up @@ -275,6 +276,13 @@ func handle() {
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
}

deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName)
if err != nil {
klog.Warningf("Failed to create device cache: %v", err.Error())
} else {
go deviceCache.Run(ctx)
}

// TODO(2042): Move more of the constructor args into this struct
nsArgs := &driver.NodeServerArgs{
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
Expand All @@ -283,6 +291,7 @@ func handle() {
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
SysfsPath: "/sys",
MetricsManager: metricsManager,
DeviceCache: deviceCache,
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)

Expand All @@ -297,9 +306,10 @@ func handle() {
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
klog.Errorf("Data Cache setup failed: %v", err)
}
go driver.StartWatcher(*nodeName)
go driver.StartWatcher(ctx, *nodeName)
}
}

}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/base/controller/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ spec:
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
- --enable-data-cache
- --run-node-service=false
- --enable-multitenancy
command:
- /gce-pd-csi-driver
Expand Down
7 changes: 3 additions & 4 deletions deploy/kubernetes/images/stable-master/image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ metadata:
name: imagetag-gcepd-driver
imageTag:
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
# Don't change stable image without changing pdImagePlaceholder in
# test/k8s-integration/main.go
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
newTag: "latest"
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
newTag: "v1.17.2"
---

4 changes: 2 additions & 2 deletions manifest_osversion.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -o xtrace

# The following is a workaround for issue https://github.com/moby/moby/issues/41417
# to manually inserver os.version information into docker manifest file
# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here,
# TODO: once docker manifest annotation for os.versions is available for the installed docker here,
# replace the following with annotation approach. https://github.com/docker/cli/pull/2578

export DOCKER_CLI_EXPERIMENTAL=enabled
Expand All @@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS"
IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES"
MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION}

# translate from image tag to docker manifest foler format
# translate from image tag to docker manifest folder format
# e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2
manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/")
echo ${manifest_folder}
Expand Down
51 changes: 9 additions & 42 deletions pkg/gce-pd-csi-driver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@ import (
"regexp"
"strconv"
"strings"
"time"

csi "github.com/container-storage-interface/spec/lib/go/csi"
fsnotify "github.com/fsnotify/fsnotify"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
)

const (
Expand Down Expand Up @@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
}

func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return 0, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return 0, err
}
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
if err != nil {
return 0, err
}

if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found {
dataCacheCount, err := strconv.Atoi(val)
if err != nil {
Expand All @@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
return 0, nil
}

func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
var nodeObj *v1.Node
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Steps: 5,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
return false, nil
}
nodeObj = node
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
return true, nil
})

if err != nil {
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
}
return nodeObj, err
}

func FetchRaidedLssdCountForDatacache() (int, error) {
raidedPath, err := fetchRAIDedLocalSsdPath()
if err != nil {
Expand Down Expand Up @@ -615,7 +579,7 @@ func InitializeDataCacheNode(nodeId string) error {
return nil
}

func StartWatcher(nodeName string) {
func StartWatcher(ctx context.Context, nodeName string) {
dirToWatch := "/dev/"
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -630,17 +594,20 @@ func StartWatcher(nodeName string) {
}
errorCh := make(chan error, 1)
// Handle the error received from the watcher goroutine
go watchDiskDetaches(watcher, nodeName, errorCh)
go watchDiskDetaches(ctx, watcher, nodeName, errorCh)

select {
case err := <-errorCh:
klog.Errorf("watcher encountered an error: %v", err)
}
}

func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
for {
select {
case <-ctx.Done():
klog.Infof("Context done, stopping watcher")
return nil
// watch for errors
case err := <-watcher.Errors:
errorCh <- fmt.Errorf("disk update event errored: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
SysfsPath: args.SysfsPath,
metricsManager: args.MetricsManager,
DeviceCache: args.DeviceCache,
}
}

Expand All @@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT
maxLogChar = grpcLogCharCap

klog.V(4).Infof("Driver: %v", gceDriver.name)
//Start the nonblocking GRPC
// Start the nonblocking GRPC
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
// TODO(#34): Only start specific servers based on a flag.
// In the future have this only run specific combinations of servers depending on which version this is.
Expand Down
25 changes: 14 additions & 11 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

csi "github.com/container-storage-interface/spec/lib/go/csi"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/mount-utils"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs"
Expand Down Expand Up @@ -80,6 +80,8 @@ type GCENodeServer struct {
csi.UnimplementedNodeServer

metricsManager *metrics.MetricsManager
// A cache of the device paths for the volumes that are attached to the node.
DeviceCache *linkcache.DeviceCache
}

type NodeServerArgs struct {
Expand All @@ -97,6 +99,7 @@ type NodeServerArgs struct {
SysfsPath string

MetricsManager *metrics.MetricsManager
DeviceCache *linkcache.DeviceCache
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -507,6 +510,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
}
}

err = ns.DeviceCache.AddVolume(volumeID)
if err != nil {
klog.Warningf("Error adding volume %s to cache: %v", volumeID, err)
}

klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -620,6 +628,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
}
}

ns.DeviceCache.RemoveVolume(volumeID)

klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -875,15 +886,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
}

func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return 0, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return 0, err
}
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
if err != nil {
return 0, err
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/mount-utils"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

Expand All @@ -44,11 +45,13 @@ const (
)

func getTestGCEDriver(t *testing.T) *GCEDriver {
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})),
})
}

func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver {
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver {
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args)
}

func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver {
Expand Down Expand Up @@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) {
}

mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})),
})
ns := gceDriver.ns

req := &csi.NodeGetVolumeStatsRequest{
Expand Down Expand Up @@ -1147,7 +1152,9 @@ func TestNodeStageVolume(t *testing.T) {
))
}
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})),
})
ns := gceDriver.ns
ns.SysfsPath = tempDir + "/sys"
_, err := ns.NodeStageVolume(context.Background(), tc.req)
Expand Down
49 changes: 49 additions & 0 deletions pkg/k8sclient/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package k8sclient

import (
"context"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
return getNodeWithRetry(ctx, kubeClient, nodeName)
}

func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
var nodeObj *v1.Node
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Steps: 5,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
return false, nil
}
nodeObj = node
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
return true, nil
})

if err != nil {
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
}
return nodeObj, err
}
Loading