Skip to content

Commit 3be4b59

Browse files
committed
Add cluster ping to publish
If the IP address for a cluster is not reachable, it takes a very long time for the mount to time out, holding up K8S operations and preventing quick deletion of incorrect volumes. This change adds ping operation to ensure that the node is able to communicate with the expected cluster before attempting the mount operation. The result is cached for 30 seconds to prevent overwhelming the API if many volumes are being published in a short period of time.
1 parent 9dc546e commit 3be4b59

File tree

6 files changed

+146
-7
lines changed

6 files changed

+146
-7
lines changed

pkg/azurelustre/azurelustre.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
csicommon "sigs.k8s.io/azurelustre-csi-driver/pkg/csi-common"
4242
"sigs.k8s.io/azurelustre-csi-driver/pkg/util"
4343
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"
44+
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
4445
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
4546
azureconfig "sigs.k8s.io/cloud-provider-azure/pkg/provider/config"
4647
)
@@ -151,6 +152,8 @@ type Driver struct {
151152
resourceGroup string
152153
location string
153154
dynamicProvisioner DynamicProvisionerInterface
155+
commandRunner util.CommandRunnerInterface
156+
pingCache azcache.Resource
154157

155158
removeNotReadyTaint bool
156159
kubeClient kubernetes.Interface
@@ -170,6 +173,7 @@ func NewDriver(options *DriverOptions) *Driver {
170173
enableAzureLustreMockDynProv: options.EnableAzureLustreMockDynProv,
171174
workingMountDir: options.WorkingMountDir,
172175
removeNotReadyTaint: options.RemoveNotReadyTaint,
176+
commandRunner: &util.DefaultCommandRunner{},
173177
}
174178
d.Name = options.DriverName
175179
d.Version = driverVersion
@@ -263,6 +267,10 @@ func NewDriver(options *DriverOptions) *Driver {
263267
}
264268
}
265269

270+
if d.pingCache, err = azcache.NewTimedCache(20*time.Second, d.pingCluster, false); err != nil {
271+
klog.Fatalf("%v", err)
272+
}
273+
266274
return &d
267275
}
268276

pkg/azurelustre/azurelustre_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4040
"k8s.io/apimachinery/pkg/util/wait"
4141
kubefake "k8s.io/client-go/kubernetes/fake"
42+
"sigs.k8s.io/azurelustre-csi-driver/pkg/util"
43+
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
4244
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
4345
)
4446

@@ -56,6 +58,7 @@ const (
5658
clusterRequestFailureName = "testShouldFail"
5759
driverDefaultLocation = "defaultFakeLocation"
5860
emptyZonesLocation = "emptyZonesLocation"
61+
fakeCacheTTL = 1 * time.Second
5962
)
6063

6164
func NewFakeDriver() *Driver {
@@ -72,10 +75,39 @@ func NewFakeDriver() *Driver {
7275
driver.location = driverDefaultLocation
7376
driver.resourceGroup = "defaultFakeResourceGroup"
7477
driver.dynamicProvisioner = &FakeDynamicProvisioner{}
78+
driver.commandRunner = NewFakeCommandRunner(false)
79+
80+
getter := func(_ context.Context, _ string) (interface{}, error) {
81+
return true, nil
82+
}
83+
84+
driver.pingCache, _ = azcache.NewTimedCache(fakeCacheTTL, getter, false) //nolint:errcheck // TimedCache never returns error here
85+
driver.pingCache.Set("key", nil)
7586

7687
return driver
7788
}
7889

90+
type FakeCommandRunner struct {
91+
util.CommandRunnerInterface
92+
CalledCommands []string
93+
cmdShouldFail bool
94+
}
95+
96+
func NewFakeCommandRunner(shouldFail bool) *FakeCommandRunner {
97+
return &FakeCommandRunner{
98+
CalledCommands: []string{},
99+
cmdShouldFail: shouldFail,
100+
}
101+
}
102+
103+
func (f *FakeCommandRunner) RunWithTimeout(_ context.Context, _ time.Duration, cmd string, args ...string) (string, error) {
104+
f.CalledCommands = append(f.CalledCommands, fmt.Sprintf("%s %s", cmd, strings.Join(args, " ")))
105+
if f.cmdShouldFail {
106+
return "", errors.New("error occurred calling command: " + cmd)
107+
}
108+
return "fake command output", nil
109+
}
110+
79111
type FakeDynamicProvisioner struct {
80112
DynamicProvisionerInterface
81113
Filesystems []*AmlFilesystemProperties

pkg/azurelustre/nodeserver.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,41 @@ import (
3131
"k8s.io/kubernetes/pkg/volume"
3232
mount "k8s.io/mount-utils"
3333
volumehelper "sigs.k8s.io/azurelustre-csi-driver/pkg/util"
34+
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
3435
"sigs.k8s.io/cloud-provider-azure/pkg/metrics"
3536
)
3637

38+
// pingCluster used as the getter function for pingCache. Pings the MGS IP address to see if it's reachable.
39+
// Allows the ping to timeout after 5 seconds to avoid long delays
40+
func (d *Driver) pingCluster(ctx context.Context, mgsIPAddress string) (interface{}, error) {
41+
pingAddr := fmt.Sprintf("%s@tcp", mgsIPAddress)
42+
_, err := d.commandRunner.RunWithTimeout(ctx, 5*time.Second, "lnetctl", "ping", pingAddr)
43+
if err != nil {
44+
klog.Warningf("lnetctl ping to %s failed with error: %v", pingAddr, err)
45+
return false, nil
46+
}
47+
return true, nil
48+
}
49+
50+
// getCachedClusterPing checks whether the MGS IP address is reachable by looking it up in the pingCache.
51+
// If not found or expired, it will ping the MGS IP address and update the cache.
52+
func (d *Driver) getCachedClusterPing(ctx context.Context, mgsIPAddress string) error {
53+
cacheData, err := d.pingCache.Get(ctx, mgsIPAddress, azcache.CacheReadTypeDefault)
54+
if err != nil {
55+
return status.Errorf(codes.Internal, "error getting pingCache value for MGS IP %q: %v", mgsIPAddress, err)
56+
}
57+
if pinged, ok := cacheData.(bool); !ok {
58+
return status.Error(codes.Internal, "type assertion failed for pingCache value")
59+
} else if !pinged {
60+
return status.Errorf(codes.InvalidArgument,
61+
"MGS IP address %q didn't respond to lnetctl ping: Is the cluster IP address correct and the network configured correctly?", mgsIPAddress)
62+
}
63+
return nil
64+
}
65+
3766
// NodePublishVolume mount the volume from staging to target path
3867
func (d *Driver) NodePublishVolume(
39-
_ context.Context,
68+
ctx context.Context,
4069
req *csi.NodePublishVolumeRequest,
4170
) (*csi.NodePublishVolumeResponse, error) {
4271
mc := metrics.NewMetricContext(azureLustreCSIDriverName,
@@ -64,13 +93,13 @@ func (d *Driver) NodePublishVolume(
6493
"Target path not provided")
6594
}
6695

67-
context := req.GetVolumeContext()
68-
if context == nil {
96+
volumeContext := req.GetVolumeContext()
97+
if volumeContext == nil {
6998
return nil, status.Error(codes.InvalidArgument,
7099
"Volume context must be provided")
71100
}
72101

73-
vol, err := getVolume(volumeID, context)
102+
vol, err := getVolume(volumeID, volumeContext)
74103
if err != nil {
75104
return nil, err
76105
}
@@ -88,12 +117,20 @@ func (d *Driver) NodePublishVolume(
88117
mc.ObserveOperationWithResult(isOperationSucceeded)
89118
}()
90119

120+
if !d.enableAzureLustreMockMount {
121+
klog.V(2).Infof("NodePublishVolume: ensuring MGS IP address %s responds to ping before attempting mount", vol.mgsIPAddress)
122+
if err := d.getCachedClusterPing(ctx, vol.mgsIPAddress); err != nil {
123+
return nil, err
124+
}
125+
klog.V(2).Infof("NodePublishVolume: ping to MGS IP address %s successful", vol.mgsIPAddress)
126+
}
127+
91128
source := getSourceString(vol.mgsIPAddress, vol.azureLustreName)
92129

93130
mountOptions, readOnly := getMountOptions(req, userMountFlags)
94131

95132
if len(vol.subDir) > 0 && !d.enableAzureLustreMockMount {
96-
interpolatedSubDir := interpolateSubDirVariables(context, vol)
133+
interpolatedSubDir := interpolateSubDirVariables(volumeContext, vol)
97134

98135
if isSubpath := ensureStrictSubpath(interpolatedSubDir); !isSubpath {
99136
return nil, status.Error(

pkg/azurelustre/nodeserver_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,11 +521,25 @@ func TestNodePublishVolume(t *testing.T) {
521521
d.volumeLocks.Release(lockKey)
522522
},
523523
},
524+
{
525+
desc: "Error failed ping to MGS",
526+
setup: func(d *Driver) {
527+
d.pingCache.Set("1.1.1.1", false)
528+
},
529+
req: csi.NodePublishVolumeRequest{
530+
VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
531+
VolumeId: "vol_1#lustrefs#1.1.1.1#",
532+
TargetPath: targetTest,
533+
VolumeContext: map[string]string{"mgs-ip-address": "1.1.1.1", "fs-name": "lustrefs"},
534+
},
535+
expectedErr: status.Error(codes.InvalidArgument, "MGS IP address \"1.1.1.1\" didn't respond to lnetctl ping: Is the cluster IP address correct and the network configured correctly?"),
536+
expectedMountpoints: nil,
537+
expectedMountActions: []mount.FakeAction{},
538+
},
524539
}
525540

526-
d := NewFakeDriver()
527-
528541
for i := range tests {
542+
d := NewFakeDriver()
529543
test := &tests[i]
530544

531545
fakeMounter := &fakeMounter{}

pkg/util/util.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ limitations under the License.
1717
package util
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"os"
23+
"os/exec"
2224
"strings"
2325
"sync"
26+
"time"
2427
)
2528

2629
const (
@@ -87,6 +90,26 @@ func MakeDir(pathname string) error {
8790
return nil
8891
}
8992

93+
type CommandRunnerInterface interface {
94+
RunWithTimeout(ctx context.Context, timeout time.Duration, cmd string, args ...string) (string, error)
95+
}
96+
97+
type DefaultCommandRunner struct {
98+
CommandRunnerInterface
99+
}
100+
101+
func (r *DefaultCommandRunner) RunWithTimeout(ctx context.Context, timeout time.Duration, cmd string, args ...string) (string, error) {
102+
cmdTimeout, cmdCancel := context.WithTimeout(ctx, timeout)
103+
defer cmdCancel()
104+
105+
command := exec.CommandContext(cmdTimeout, cmd, args...)
106+
output, err := command.CombinedOutput()
107+
if err != nil {
108+
return string(output), err
109+
}
110+
return string(output), nil
111+
}
112+
90113
// LockMap used to lock on entries
91114
type LockMap struct {
92115
sync.Mutex

pkg/util/util_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package util
1818

1919
import (
20+
"context"
2021
"errors"
2122
"os"
2223
"reflect"
@@ -42,6 +43,30 @@ func TestRoundUpGiB(t *testing.T) {
4243
}
4344
}
4445

46+
func TestCommandRunnerSuccess(t *testing.T) {
47+
runner := &DefaultCommandRunner{}
48+
49+
output, err := runner.RunWithTimeout(context.Background(), 1*time.Second, "echo", "hello")
50+
require.NoError(t, err)
51+
require.Equal(t, "hello\n", output)
52+
}
53+
54+
func TestCommandRunnerTimeout(t *testing.T) {
55+
runner := &DefaultCommandRunner{}
56+
output, err := runner.RunWithTimeout(context.Background(), 1*time.Second, "sleep", "10")
57+
require.ErrorContains(t, err, "killed")
58+
require.Empty(t, output, "Expected no output on timeout")
59+
}
60+
61+
func TestCommandRunnerError(t *testing.T) {
62+
runner := &DefaultCommandRunner{}
63+
64+
nonexistentPath := "./non-existent-path"
65+
output, err := runner.RunWithTimeout(context.Background(), 1*time.Second, "ls", nonexistentPath)
66+
require.Error(t, err)
67+
require.Contains(t, output, nonexistentPath)
68+
}
69+
4570
func TestSimpleLockEntry(t *testing.T) {
4671
testLockMap := NewLockMap()
4772

0 commit comments

Comments
 (0)