Skip to content

Replace Get operations with polling #2144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ var (
attachDiskBackoffJitter = flag.Float64("attach-disk-backoff-jitter", 0.0, "Jitter for attachDisk backoff")
attachDiskBackoffSteps = flag.Int("attach-disk-backoff-steps", 24, "Steps for attachDisk backoff")
attachDiskBackoffCap = flag.Duration("attach-disk-backoff-cap", 0, "Cap for attachDisk backoff")
waitForOpBackoffDuration = flag.Duration("wait-op-backoff-duration", 3*time.Second, "Duration for wait for operation backoff")
waitForOpBackoffDuration = flag.Duration("wait-op-backoff-duration", 2*time.Minute, "Duration for wait for operation backoff")
waitForOpBackoffFactor = flag.Float64("wait-op-backoff-factor", 0.0, "Factor for wait for operation backoff")
waitForOpBackoffJitter = flag.Float64("wait-op-backoff-jitter", 0.0, "Jitter for wait for operation backoff")
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 3, "Steps for wait for operation backoff")
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")

enableDeviceInUseCheck = flag.Bool("enable-device-in-use-check-on-node-unstage", true, "If set to true, block NodeUnstageVolume requests until the specified device is not in use")
Expand Down
41 changes: 27 additions & 14 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ var AttachDiskBackoff = wait.Backoff{
Cap: 0}

// WaitForOpBackoff is backoff used to wait for Global, Regional or Zonal operation to complete.
// Default values are similar to Poll every 3 seconds with 5 minute timeout.
// Default values are similar to Poll every 2 minutes with 6 minute timeout.
var WaitForOpBackoff = wait.Backoff{
Duration: 3 * time.Second,
Duration: 2 * time.Minute,
Factor: 0.0,
Jitter: 0.0,
Steps: 100,
Steps: 3,
Cap: 0}

// Custom error type to propagate error messages up to clients.
Expand Down Expand Up @@ -1064,39 +1064,52 @@ func (cloud *CloudProvider) getRegionalDiskTypeURI(project string, region, diskT
}

func (cloud *CloudProvider) waitForZonalOp(ctx context.Context, project, opName string, zone string) error {
// The v1 API can query for v1, alpha, or beta operations.
return wait.ExponentialBackoff(WaitForOpBackoff, func() (bool, error) {
pollOp, err := cloud.service.ZoneOperations.Get(project, zone, opName).Context(ctx).Do()
waitOp, err := cloud.service.ZoneOperations.Wait(project, zone, opName).Context(ctx).Do()
// In case of service unavailable do not propogate the error so ExponentialBackoff will retry
if err != nil && waitOp.HttpErrorStatusCode == 503 {
klog.Errorf("WaitForZonalOp(op: %s, zone: %#v, err: %v) failed to poll the operation", opName, zone, err)
return false, nil
}
if err != nil {
klog.Errorf("WaitForOp(op: %s, zone: %#v) failed to poll the operation", opName, zone)
klog.Errorf("WaitForZonalOp(op: %s, zone: %#v, err: %v) failed to poll the operation", opName, zone, err)
return false, err
}
done, err := opIsDone(pollOp)
done, err := opIsDone(waitOp)
return done, err
})
}

func (cloud *CloudProvider) waitForRegionalOp(ctx context.Context, project, opName string, region string) error {
// The v1 API can query for v1, alpha, or beta operations.
return wait.ExponentialBackoff(WaitForOpBackoff, func() (bool, error) {
pollOp, err := cloud.service.RegionOperations.Get(project, region, opName).Context(ctx).Do()
waitOp, err := cloud.service.RegionOperations.Wait(project, region, opName).Context(ctx).Do()
// In case of service unavailable do not propogate the error so ExponentialBackoff will retry
if err != nil && waitOp.HttpErrorStatusCode == 503 {
klog.Errorf("WaitForRegionalOp(op: %s, region: %#v, err: %v) failed to poll the operation", opName, region, err)
return false, nil
}
if err != nil {
klog.Errorf("WaitForOp(op: %s, region: %#v) failed to poll the operation", opName, region)
klog.Errorf("WaitForRegionalOp(op: %s, region: %#v, err: %v) failed to poll the operation", opName, region, err)
return false, err
}
done, err := opIsDone(pollOp)
done, err := opIsDone(waitOp)
return done, err
})
}

func (cloud *CloudProvider) waitForGlobalOp(ctx context.Context, project, opName string) error {
return wait.ExponentialBackoff(WaitForOpBackoff, func() (bool, error) {
pollOp, err := cloud.service.GlobalOperations.Get(project, opName).Context(ctx).Do()
waitOp, err := cloud.service.GlobalOperations.Wait(project, opName).Context(ctx).Do()
// In case of service unavailable do not propogate the error so ExponentialBackoff will retry
if err != nil && waitOp.HttpErrorStatusCode == 503 {
klog.Errorf("WaitForGlobalOp(op: %s, err: %v) failed to poll the operation", opName, err)
return false, nil
}
if err != nil {
klog.Errorf("waitForGlobalOp(op: %s) failed to poll the operation", opName)
klog.Errorf("waitForGlobalOp(op: %s, err: %v) failed to poll the operation", opName, err)
return false, err
}
done, err := opIsDone(pollOp)
done, err := opIsDone(waitOp)
return done, err
})
}
Expand Down
4 changes: 4 additions & 0 deletions test/k8s-integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ func generateGCETestSkip(testParams *testParameters) string {
skipString := "\\[Disruptive\\]|\\[Serial\\]"
// Skip mount options test until we fix the invalid mount options for xfs.
skipString = skipString + "|csi-gcepd-sc-xfs.*provisioning.should.provision.storage.with.mount.options"
// Skip VolumeAttributesClass tests while it's a beta feature.
skipString = skipString + "|\\[Feature:VolumeAttributesClass\\]"

v := apimachineryversion.MustParseSemantic(testParams.clusterVersion)

Expand Down Expand Up @@ -745,6 +747,8 @@ func generateGKETestSkip(testParams *testParameters) string {

// Skip mount options test until we fix the invalid mount options for xfs.
skipString = skipString + "|csi-gcepd-sc-xfs.*provisioning.should.provision.storage.with.mount.options"
// Skip VolumeAttributesClass tests while it's a beta feature.
skipString = skipString + "|\\[Feature:VolumeAttributesClass\\]"

// Skip rwop test when node version is less than 1.32. Test was added only
// in 1.32 and above, see tags in
Expand Down