Skip to content

Cache and print devices for debugging future outages #2141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dba53e0
Cache devices and their symlinks in node driver, periodically noting
julianKatz May 21, 2025
d3a824e
Some doc comment updates
julianKatz May 21, 2025
485beaf
Add unit tests
julianKatz May 21, 2025
afb2393
improve partition unit test
julianKatz May 21, 2025
643817f
Log on removal as well
julianKatz May 21, 2025
fa2d2f9
Updated unit tests to be clearer, relying on asserting linkCache
julianKatz May 21, 2025
95163b7
Remove unused broken function
julianKatz May 21, 2025
8d8d926
Move partition checking into the inner linkcache type. This makes it
julianKatz May 23, 2025
448b0ba
Log when linkcache Run is triggered
julianKatz May 23, 2025
2ef351c
New implementation that is hooked into nodestage/unstage. Just linux
julianKatz Jun 6, 2025
9ab0d2f
Made a no-op windows implementation of the linkcache package
julianKatz May 28, 2025
b88e5f4
Made test device caches in node_test.go
julianKatz Jun 6, 2025
d76c44c
Fix sanity test
julianKatz Jun 6, 2025
4abd540
Only warn on failure to create cache
julianKatz Jul 2, 2025
c4b69f3
Only warn on windows instantiation
julianKatz Jul 2, 2025
042176a
Make non-implemented on windows an info
julianKatz Jul 2, 2025
bc8defa
Improved some error messages to provide better test failure feedback
julianKatz Jul 2, 2025
170e24b
Always print helpful logs in failing area
julianKatz Jul 2, 2025
be4b045
Remove now unnecessary corp-helper when running from cloudtop
julianKatz Jul 3, 2025
6ef07a2
Only run device cache if successfully created
julianKatz Jul 3, 2025
3437efd
Replace verbosities
julianKatz Jul 3, 2025
3824cbb
Add nil checks around the usage of the device cache
cemakd Aug 5, 2025
61aeffd
Add support for NVMe disk types by using deviceutils
cemakd Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)
Expand Down Expand Up @@ -275,6 +276,13 @@ func handle() {
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
}

deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the time period configurable so that we can adjust it if it ends up spamming too many logs? In fact I would probably start the logging at 10 mins initially.

In the future, if we want to turn this into a metric, I would be more comfortable reducing the polling period but removing logs to avoid log spam.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added a flag disk-cache-sync-period with a default value of 10 minutes when unspecified.

if err != nil {
klog.Warningf("Failed to create device cache: %v", err.Error())
} else {
go deviceCache.Run(ctx)
}

// TODO(2042): Move more of the constructor args into this struct
nsArgs := &driver.NodeServerArgs{
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
Expand All @@ -283,6 +291,7 @@ func handle() {
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
SysfsPath: "/sys",
MetricsManager: metricsManager,
DeviceCache: deviceCache,
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)

Expand All @@ -297,9 +306,10 @@ func handle() {
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
klog.Errorf("Data Cache setup failed: %v", err)
}
go driver.StartWatcher(*nodeName)
go driver.StartWatcher(ctx, *nodeName)
}
}

}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/base/controller/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ spec:
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
- --enable-data-cache
- --run-node-service=false
- --enable-multitenancy
command:
- /gce-pd-csi-driver
Expand Down
2 changes: 1 addition & 1 deletion deploy/kubernetes/images/stable-master/image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ imageTag:
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
newTag: "v1.17.3"
newTag: "v1.17.4"
---

4 changes: 2 additions & 2 deletions manifest_osversion.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -o xtrace

# The following is a workaround for issue https://github.com/moby/moby/issues/41417
# to manually inserver os.version information into docker manifest file
# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here,
# TODO: once docker manifest annotation for os.versions is available for the installed docker here,
# replace the following with annotation approach. https://github.com/docker/cli/pull/2578

export DOCKER_CLI_EXPERIMENTAL=enabled
Expand All @@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS"
IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES"
MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION}

# translate from image tag to docker manifest foler format
# translate from image tag to docker manifest folder format
# e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2
manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/")
echo ${manifest_folder}
Expand Down
51 changes: 9 additions & 42 deletions pkg/gce-pd-csi-driver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@ import (
"regexp"
"strconv"
"strings"
"time"

csi "github.com/container-storage-interface/spec/lib/go/csi"
fsnotify "github.com/fsnotify/fsnotify"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
)

const (
Expand Down Expand Up @@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
}

func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return 0, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return 0, err
}
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
if err != nil {
return 0, err
}

if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found {
dataCacheCount, err := strconv.Atoi(val)
if err != nil {
Expand All @@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
return 0, nil
}

func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
var nodeObj *v1.Node
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Steps: 5,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
return false, nil
}
nodeObj = node
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
return true, nil
})

if err != nil {
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
}
return nodeObj, err
}

func FetchRaidedLssdCountForDatacache() (int, error) {
raidedPath, err := fetchRAIDedLocalSsdPath()
if err != nil {
Expand Down Expand Up @@ -615,7 +579,7 @@ func InitializeDataCacheNode(nodeId string) error {
return nil
}

func StartWatcher(nodeName string) {
func StartWatcher(ctx context.Context, nodeName string) {
dirToWatch := "/dev/"
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -630,17 +594,20 @@ func StartWatcher(nodeName string) {
}
errorCh := make(chan error, 1)
// Handle the error received from the watcher goroutine
go watchDiskDetaches(watcher, nodeName, errorCh)
go watchDiskDetaches(ctx, watcher, nodeName, errorCh)

select {
case err := <-errorCh:
klog.Errorf("watcher encountered an error: %v", err)
}
}

func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
for {
select {
case <-ctx.Done():
klog.Infof("Context done, stopping watcher")
return nil
// watch for errors
case err := <-watcher.Errors:
errorCh <- fmt.Errorf("disk update event errored: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
SysfsPath: args.SysfsPath,
metricsManager: args.MetricsManager,
DeviceCache: args.DeviceCache,
}
}

Expand All @@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT
maxLogChar = grpcLogCharCap

klog.V(4).Infof("Driver: %v", gceDriver.name)
//Start the nonblocking GRPC
// Start the nonblocking GRPC
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
// TODO(#34): Only start specific servers based on a flag.
// In the future have this only run specific combinations of servers depending on which version this is.
Expand Down
29 changes: 18 additions & 11 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

csi "github.com/container-storage-interface/spec/lib/go/csi"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/mount-utils"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs"
Expand Down Expand Up @@ -80,6 +80,8 @@ type GCENodeServer struct {
csi.UnimplementedNodeServer

metricsManager *metrics.MetricsManager
// A cache of the device paths for the volumes that are attached to the node.
DeviceCache *linkcache.DeviceCache
}

type NodeServerArgs struct {
Expand All @@ -97,6 +99,7 @@ type NodeServerArgs struct {
SysfsPath string

MetricsManager *metrics.MetricsManager
DeviceCache *linkcache.DeviceCache
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -509,6 +512,13 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
}
}

if ns.DeviceCache != nil {
err = ns.DeviceCache.AddVolume(volumeID)
if err != nil {
klog.Warningf("Error adding volume %s to cache: %v", volumeID, err)
}
}

klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -622,6 +632,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
}
}

if ns.DeviceCache != nil {
ns.DeviceCache.RemoveVolume(volumeID)
}

klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
}
Expand Down Expand Up @@ -899,15 +914,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
}

func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return 0, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return 0, err
}
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
if err != nil {
return 0, err
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/mount-utils"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

Expand All @@ -44,11 +45,13 @@ const (
)

func getTestGCEDriver(t *testing.T) *GCEDriver {
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})),
})
}

func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver {
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{})
func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver {
return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args)
}

func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver {
Expand Down Expand Up @@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) {
}

mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})),
})
ns := gceDriver.ns

req := &csi.NodeGetVolumeStatsRequest{
Expand Down Expand Up @@ -1227,7 +1232,9 @@ func TestNodeStageVolume(t *testing.T) {
))
}
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{
DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})),
})
ns := gceDriver.ns
ns.SysfsPath = tempDir + "/sys"
_, err := ns.NodeStageVolume(context.Background(), tc.req)
Expand Down
49 changes: 49 additions & 0 deletions pkg/k8sclient/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package k8sclient

import (
"context"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
return getNodeWithRetry(ctx, kubeClient, nodeName)
}

func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
var nodeObj *v1.Node
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Steps: 5,
}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
return false, nil
}
nodeObj = node
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
return true, nil
})

if err != nil {
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
}
return nodeObj, err
}
Loading