Skip to content

Commit f91d448

Browse files
committed
Add tests for out of band attachments
1 parent 77e070f commit f91d448

File tree

3 files changed

+331
-0
lines changed

3 files changed

+331
-0
lines changed

test/e2e/storage/pd.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,37 @@ var _ = utils.SIGDescribe("Pod Disks", func() {
451451
ginkgo.By("delete a PD")
452452
framework.ExpectNoError(e2epv.DeletePDWithRetry("non-exist"))
453453
})
454+
455+
// This test is marked to run as serial so as device selection on AWS does not
456+
// conflict with other concurrent attach operations.
457+
ginkgo.It("[Serial] attach on previously attached volumes should work", func() {
458+
e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
459+
ginkgo.By("creating PD")
460+
diskName, err := e2epv.CreatePDWithRetry()
461+
framework.ExpectNoError(err, "Error creating PD")
462+
463+
// this should be safe to do because if attach fails then detach will be considered
464+
// successful and we will delete the volume.
465+
defer func() {
466+
detachAndDeletePDs(diskName, []types.NodeName{host0Name})
467+
}()
468+
469+
ginkgo.By("Attaching volume to a node")
470+
err = attachPD(host0Name, diskName)
471+
framework.ExpectNoError(err, "Error attaching PD")
472+
473+
pod := testPDPod([]string{diskName}, host0Name /*readOnly*/, false, 1)
474+
ginkgo.By("Creating test pod with same volume")
475+
_, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
476+
framework.ExpectNoError(err, "Failed to create pod")
477+
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespaceSlow(f.ClientSet, pod.Name, f.Namespace.Name))
478+
479+
ginkgo.By("deleting the pod")
480+
framework.ExpectNoError(podClient.Delete(context.TODO(), pod.Name, *metav1.NewDeleteOptions(0)), "Failed to delete pod")
481+
framework.Logf("deleted pod %q", pod.Name)
482+
ginkgo.By("waiting for PD to detach")
483+
framework.ExpectNoError(waitForPDDetach(diskName, host0Name))
484+
})
454485
})
455486

456487
func countReadyNodes(c clientset.Interface, hostName types.NodeName) int {
@@ -474,6 +505,7 @@ func verifyPDContentsViaContainer(namespace string, f *framework.Framework, podN
474505
}
475506
}
476507

508+
// TODO: move detachPD to standard cloudprovider functions so as these tests can run on other cloudproviders too
477509
func detachPD(nodeName types.NodeName, pdName string) error {
478510
if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
479511
gceCloud, err := gce.GetGCECloud()
@@ -512,6 +544,38 @@ func detachPD(nodeName types.NodeName, pdName string) error {
512544
}
513545
}
514546

547+
// TODO: move attachPD to standard cloudprovider functions so as these tests can run on other cloudproviders too
548+
func attachPD(nodeName types.NodeName, pdName string) error {
549+
if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
550+
gceCloud, err := gce.GetGCECloud()
551+
if err != nil {
552+
return err
553+
}
554+
err = gceCloud.AttachDisk(pdName, nodeName, false /*readOnly*/, false /*regional*/)
555+
if err != nil {
556+
framework.Logf("Error attaching PD %q: %v", pdName, err)
557+
}
558+
return err
559+
560+
} else if framework.TestContext.Provider == "aws" {
561+
awsSession, err := session.NewSession()
562+
if err != nil {
563+
return fmt.Errorf("error creating session: %v", err)
564+
}
565+
client := ec2.New(awsSession)
566+
tokens := strings.Split(pdName, "/")
567+
awsVolumeID := tokens[len(tokens)-1]
568+
ebsUtil := utils.NewEBSUtil(client)
569+
err = ebsUtil.AttachDisk(awsVolumeID, string(nodeName))
570+
if err != nil {
571+
return fmt.Errorf("error attaching volume %s to node %s: %v", awsVolumeID, nodeName, err)
572+
}
573+
return nil
574+
} else {
575+
return fmt.Errorf("Provider does not support volume attaching")
576+
}
577+
}
578+
515579
// Returns pod spec suitable for api Create call. Handles gce, gke and aws providers only and
516580
// escapes if a different provider is supplied.
517581
// The first container name is hard-coded to "mycontainer". Subsequent containers are named:

test/e2e/storage/utils/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
srcs = [
88
"create.go",
99
"deployment.go",
10+
"ebs.go",
1011
"framework.go",
1112
"host_exec.go",
1213
"local.go",
@@ -36,9 +37,12 @@ go_library(
3637
"//test/e2e/framework/ssh:go_default_library",
3738
"//test/e2e/framework/testfiles:go_default_library",
3839
"//test/utils/image:go_default_library",
40+
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
41+
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
3942
"//vendor/github.com/onsi/ginkgo:go_default_library",
4043
"//vendor/github.com/onsi/gomega:go_default_library",
4144
"//vendor/github.com/pkg/errors:go_default_library",
45+
"//vendor/k8s.io/klog/v2:go_default_library",
4246
"//vendor/k8s.io/utils/exec:go_default_library",
4347
],
4448
)

test/e2e/storage/utils/ebs.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package utils
18+
19+
import (
20+
"fmt"
21+
"strings"
22+
"time"
23+
24+
"github.com/aws/aws-sdk-go/aws"
25+
"github.com/aws/aws-sdk-go/service/ec2"
26+
"k8s.io/apimachinery/pkg/util/wait"
27+
"k8s.io/klog/v2"
28+
)
29+
30+
const (
31+
volumeAttachmentStatusPollDelay = 2 * time.Second
32+
volumeAttachmentStatusFactor = 2
33+
volumeAttachmentStatusSteps = 6
34+
35+
// represents expected attachment status of a volume after attach
36+
volumeAttachedStatus = "attached"
37+
38+
// represents expected attachment status of a volume after detach
39+
volumeDetachedStatus = "detached"
40+
)
41+
42+
// EBSUtil provides functions to interact with EBS volumes
43+
type EBSUtil struct {
44+
client *ec2.EC2
45+
validDevices []string
46+
}
47+
48+
// NewEBSUtil returns an instance of EBSUtil which can be used to
49+
// to interact with EBS volumes
50+
func NewEBSUtil(client *ec2.EC2) *EBSUtil {
51+
ebsUtil := &EBSUtil{client: client}
52+
validDevices := []string{}
53+
for _, firstChar := range []rune{'b', 'c'} {
54+
for i := 'a'; i <= 'z'; i++ {
55+
dev := string([]rune{firstChar, i})
56+
validDevices = append(validDevices, dev)
57+
}
58+
}
59+
ebsUtil.validDevices = validDevices
60+
return ebsUtil
61+
}
62+
63+
// AttachDisk attaches an EBS volume to a node.
64+
func (ebs *EBSUtil) AttachDisk(volumeID string, nodeName string) error {
65+
instance, err := findInstanceByNodeName(nodeName, ebs.client)
66+
if err != nil {
67+
return fmt.Errorf("error finding node %s: %v", nodeName, err)
68+
}
69+
err = ebs.waitForAvailable(volumeID)
70+
if err != nil {
71+
return fmt.Errorf("error waiting volume %s to be available: %v", volumeID, err)
72+
}
73+
74+
device, err := ebs.findFreeDevice(instance)
75+
if err != nil {
76+
return fmt.Errorf("error finding free device on node %s: %v", nodeName, err)
77+
}
78+
hostDevice := "/dev/xvd" + string(device)
79+
attachInput := &ec2.AttachVolumeInput{
80+
VolumeId: &volumeID,
81+
InstanceId: instance.InstanceId,
82+
Device: &hostDevice,
83+
}
84+
_, err = ebs.client.AttachVolume(attachInput)
85+
if err != nil {
86+
return fmt.Errorf("error attaching volume %s to node %s: %v", volumeID, nodeName, err)
87+
}
88+
return ebs.waitForAttach(volumeID)
89+
}
90+
91+
func (ebs *EBSUtil) findFreeDevice(instance *ec2.Instance) (string, error) {
92+
deviceMappings := map[string]string{}
93+
94+
for _, blockDevice := range instance.BlockDeviceMappings {
95+
name := aws.StringValue(blockDevice.DeviceName)
96+
name = strings.TrimPrefix(name, "/dev/sd")
97+
name = strings.TrimPrefix(name, "/dev/xvd")
98+
if len(name) < 1 || len(name) > 2 {
99+
klog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
100+
}
101+
102+
deviceMappings[name] = aws.StringValue(blockDevice.Ebs.VolumeId)
103+
}
104+
105+
for _, device := range ebs.validDevices {
106+
if _, found := deviceMappings[device]; !found {
107+
return device, nil
108+
}
109+
}
110+
return "", fmt.Errorf("no available device")
111+
}
112+
113+
func (ebs *EBSUtil) waitForAttach(volumeID string) error {
114+
backoff := wait.Backoff{
115+
Duration: volumeAttachmentStatusPollDelay,
116+
Factor: volumeAttachmentStatusFactor,
117+
Steps: volumeAttachmentStatusSteps,
118+
}
119+
time.Sleep(volumeAttachmentStatusPollDelay)
120+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
121+
info, err := ebs.describeVolume(volumeID)
122+
if err != nil {
123+
return false, err
124+
}
125+
126+
if len(info.Attachments) > 1 {
127+
// Shouldn't happen; log so we know if it is
128+
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
129+
}
130+
attachmentStatus := ""
131+
for _, a := range info.Attachments {
132+
if attachmentStatus != "" {
133+
// Shouldn't happen; log so we know if it is
134+
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
135+
}
136+
if a.State != nil {
137+
attachmentStatus = *a.State
138+
} else {
139+
// Shouldn't happen; log so we know if it is
140+
klog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
141+
}
142+
}
143+
if attachmentStatus == "" {
144+
attachmentStatus = volumeDetachedStatus
145+
}
146+
if attachmentStatus == volumeAttachedStatus {
147+
// Attachment is in requested state, finish waiting
148+
return true, nil
149+
}
150+
return false, nil
151+
})
152+
return err
153+
}
154+
155+
func (ebs *EBSUtil) waitForAvailable(volumeID string) error {
156+
backoff := wait.Backoff{
157+
Duration: volumeAttachmentStatusPollDelay,
158+
Factor: volumeAttachmentStatusFactor,
159+
Steps: volumeAttachmentStatusSteps,
160+
}
161+
time.Sleep(volumeAttachmentStatusPollDelay)
162+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
163+
info, err := ebs.describeVolume(volumeID)
164+
if err != nil {
165+
return false, err
166+
}
167+
volumeState := aws.StringValue(info.State)
168+
if volumeState != ec2.VolumeStateAvailable {
169+
return false, nil
170+
}
171+
return true, nil
172+
})
173+
return err
174+
}
175+
176+
// Gets the full information about this volume from the EC2 API
177+
func (ebs *EBSUtil) describeVolume(volumeID string) (*ec2.Volume, error) {
178+
request := &ec2.DescribeVolumesInput{
179+
VolumeIds: []*string{&volumeID},
180+
}
181+
182+
results := []*ec2.Volume{}
183+
var nextToken *string
184+
for {
185+
response, err := ebs.client.DescribeVolumes(request)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
results = append(results, response.Volumes...)
191+
192+
nextToken = response.NextToken
193+
if aws.StringValue(nextToken) == "" {
194+
break
195+
}
196+
request.NextToken = nextToken
197+
}
198+
199+
if len(results) == 0 {
200+
return nil, fmt.Errorf("no volumes found")
201+
}
202+
if len(results) > 1 {
203+
return nil, fmt.Errorf("multiple volumes found")
204+
}
205+
return results[0], nil
206+
}
207+
208+
func newEc2Filter(name string, value string) *ec2.Filter {
209+
filter := &ec2.Filter{
210+
Name: aws.String(name),
211+
Values: []*string{
212+
aws.String(value),
213+
},
214+
}
215+
return filter
216+
}
217+
218+
func findInstanceByNodeName(nodeName string, cloud *ec2.EC2) (*ec2.Instance, error) {
219+
filters := []*ec2.Filter{
220+
newEc2Filter("private-dns-name", nodeName),
221+
}
222+
223+
request := &ec2.DescribeInstancesInput{
224+
Filters: filters,
225+
}
226+
227+
instances, err := describeInstances(request, cloud)
228+
if err != nil {
229+
return nil, err
230+
}
231+
if len(instances) == 0 {
232+
return nil, nil
233+
}
234+
if len(instances) > 1 {
235+
return nil, fmt.Errorf("multiple instances found for name: %s", nodeName)
236+
}
237+
return instances[0], nil
238+
}
239+
240+
func describeInstances(request *ec2.DescribeInstancesInput, cloud *ec2.EC2) ([]*ec2.Instance, error) {
241+
// Instances are paged
242+
results := []*ec2.Instance{}
243+
var nextToken *string
244+
245+
for {
246+
response, err := cloud.DescribeInstances(request)
247+
if err != nil {
248+
return nil, fmt.Errorf("error listing AWS instances: %v", err)
249+
}
250+
251+
for _, reservation := range response.Reservations {
252+
results = append(results, reservation.Instances...)
253+
}
254+
255+
nextToken = response.NextToken
256+
if nextToken == nil || len(*nextToken) == 0 {
257+
break
258+
}
259+
request.NextToken = nextToken
260+
}
261+
262+
return results, nil
263+
}

0 commit comments

Comments
 (0)