|
| 1 | +/* |
| 2 | +Copyright 2025 The Kubernetes Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | +package e2e |
| 17 | + |
| 18 | +import ( |
| 19 | + "context" |
| 20 | + "fmt" |
| 21 | + "strings" |
| 22 | + "time" |
| 23 | + |
| 24 | + "github.com/aws/aws-sdk-go-v2/aws" |
| 25 | + "github.com/aws/aws-sdk-go-v2/config" |
| 26 | + "github.com/aws/aws-sdk-go-v2/service/ec2" |
| 27 | + |
| 28 | + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" |
| 29 | + elb "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancing" |
| 30 | + elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" |
| 31 | + |
| 32 | + v1 "k8s.io/api/core/v1" |
| 33 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 34 | + "k8s.io/apimachinery/pkg/util/wait" |
| 35 | + clientset "k8s.io/client-go/kubernetes" |
| 36 | + "k8s.io/kubernetes/test/e2e/framework" |
| 37 | +) |
| 38 | + |
| 39 | +// awsHelper provides AWS API operations for e2e tests |
| 40 | +type awsHelper struct { |
| 41 | + ctx context.Context |
| 42 | + ec2Client *ec2.Client |
| 43 | + elbClient *elb.Client |
| 44 | + elbv2Client *elbv2.Client |
| 45 | + |
| 46 | + // Cluster information |
| 47 | + clusterName string |
| 48 | + clusterTag string |
| 49 | + clusterTagValue string |
| 50 | + vpcID string |
| 51 | + awsRegion string |
| 52 | +} |
| 53 | + |
| 54 | +// newAWSHelper creates a new AWS helper with configured clients |
| 55 | +func newAWSHelper(ctx context.Context, cs clientset.Interface) (*awsHelper, error) { |
| 56 | + cfg, err := config.LoadDefaultConfig(ctx) |
| 57 | + framework.ExpectNoError(err, "unable to load AWS config") |
| 58 | + |
| 59 | + h := &awsHelper{ |
| 60 | + ctx: ctx, |
| 61 | + ec2Client: ec2.NewFromConfig(cfg), |
| 62 | + elbClient: elb.NewFromConfig(cfg), |
| 63 | + elbv2Client: elbv2.NewFromConfig(cfg), |
| 64 | + } |
| 65 | + |
| 66 | + framework.Logf("Discovering cluster tag") |
| 67 | + framework.ExpectNoError(h.discoverClusterTag(cs), "unable to find cluster tag") |
| 68 | + framework.Logf("Cluster tag discovered: %s", h.clusterTag) |
| 69 | + |
| 70 | + return h, nil |
| 71 | +} |
| 72 | + |
| 73 | +// discoverClusterTag discovers the cluster tag from a cluster. |
| 74 | +// The discover is done by looking up the EC2 instance tags with tag:Name prefix kubernetes.io/cluster. |
| 75 | +// The EC2 Instance ID is discovered from a cluster node object. |
| 76 | +// The cluster ID, VPC ID and cluster tag are discovered from the EC2 instance tags. |
| 77 | +// If is any error is found, the function returns an error. |
| 78 | +func (h *awsHelper) discoverClusterTag(cs clientset.Interface) error { |
| 79 | + nodes, err := cs.CoreV1().Nodes().List(h.ctx, metav1.ListOptions{}) |
| 80 | + if err != nil { |
| 81 | + return fmt.Errorf("failed to list nodes: %v", err) |
| 82 | + } |
| 83 | + |
| 84 | + var instanceID string |
| 85 | + for _, node := range nodes.Items { |
| 86 | + providerID := node.Spec.ProviderID |
| 87 | + if providerID == "" { |
| 88 | + continue |
| 89 | + } |
| 90 | + providerID = strings.Replace(providerID, "aws:///", "", 1) |
| 91 | + if len(strings.Split(providerID, "/")) < 2 { |
| 92 | + continue |
| 93 | + } |
| 94 | + h.awsRegion = strings.Split(providerID, "/")[0] |
| 95 | + instanceID = strings.Split(providerID, "/")[1] |
| 96 | + if !strings.HasPrefix(instanceID, "i-") { |
| 97 | + continue |
| 98 | + } |
| 99 | + break |
| 100 | + } |
| 101 | + |
| 102 | + instance, err := h.ec2Client.DescribeInstances(h.ctx, &ec2.DescribeInstancesInput{ |
| 103 | + InstanceIds: []string{instanceID}, |
| 104 | + }) |
| 105 | + if err != nil { |
| 106 | + return fmt.Errorf("failed to describe instances: %v", err) |
| 107 | + } |
| 108 | + |
| 109 | + clusterTagFound := false |
| 110 | + for _, reservation := range instance.Reservations { |
| 111 | + for _, tag := range reservation.Instances[0].Tags { |
| 112 | + if strings.HasPrefix(aws.ToString(tag.Key), "kubernetes.io/cluster") { |
| 113 | + h.clusterTag = aws.ToString(tag.Key) |
| 114 | + h.clusterTagValue = aws.ToString(tag.Value) |
| 115 | + clusterTagFound = true |
| 116 | + break |
| 117 | + } |
| 118 | + } |
| 119 | + if clusterTagFound { |
| 120 | + break |
| 121 | + } |
| 122 | + } |
| 123 | + |
| 124 | + if !clusterTagFound { |
| 125 | + return fmt.Errorf("cluster tag not found in the instance %s", instanceID) |
| 126 | + } |
| 127 | + |
| 128 | + h.clusterName = strings.Split(h.clusterTag, "/")[2] |
| 129 | + if h.clusterName == "" { |
| 130 | + return fmt.Errorf("cluster name not found in the cluster tag %s", h.clusterTag) |
| 131 | + } |
| 132 | + |
| 133 | + // extract VPC ID from the Instance |
| 134 | + for _, networkInterface := range instance.Reservations[0].Instances[0].NetworkInterfaces { |
| 135 | + h.vpcID = aws.ToString(networkInterface.VpcId) |
| 136 | + break |
| 137 | + } |
| 138 | + |
| 139 | + if h.vpcID == "" { |
| 140 | + return fmt.Errorf("VPC ID not found in the instance %s", instanceID) |
| 141 | + } |
| 142 | + |
| 143 | + return nil |
| 144 | +} |
| 145 | + |
| 146 | +// getLoadBalancerSecurityGroups gets security groups attached to a load balancer |
| 147 | +func (h *awsHelper) getLoadBalancerSecurityGroups(isNLB bool, lbDNSName string) ([]string, error) { |
| 148 | + if isNLB { |
| 149 | + if h.elbv2Client == nil { |
| 150 | + return nil, fmt.Errorf("elbv2Client is not initialized") |
| 151 | + } |
| 152 | + describeNLBs, err := h.elbv2Client.DescribeLoadBalancers(h.ctx, &elbv2.DescribeLoadBalancersInput{}) |
| 153 | + framework.ExpectNoError(err, "failed to describe load balancers to retrieve security groups") |
| 154 | + |
| 155 | + for _, lb := range describeNLBs.LoadBalancers { |
| 156 | + if strings.EqualFold(aws.ToString(lb.DNSName), lbDNSName) { |
| 157 | + return lb.SecurityGroups, nil |
| 158 | + } |
| 159 | + } |
| 160 | + return nil, fmt.Errorf("load balancer with DNS %s not found", lbDNSName) |
| 161 | + } |
| 162 | + |
| 163 | + // Get CLB ARN from DNS name |
| 164 | + if h.elbClient == nil { |
| 165 | + return nil, fmt.Errorf("elbClient is not initialized") |
| 166 | + } |
| 167 | + describeCLBs, err := h.elbClient.DescribeLoadBalancers(h.ctx, &elb.DescribeLoadBalancersInput{}) |
| 168 | + framework.ExpectNoError(err, "failed to describe load balancers to retrieve security groups") |
| 169 | + |
| 170 | + for _, lb := range describeCLBs.LoadBalancerDescriptions { |
| 171 | + if strings.EqualFold(aws.ToString(lb.DNSName), lbDNSName) { |
| 172 | + return lb.SecurityGroups, nil |
| 173 | + } |
| 174 | + } |
| 175 | + return nil, fmt.Errorf("load balancer with DNS %s not found", lbDNSName) |
| 176 | +} |
| 177 | + |
| 178 | +// isSecurityGroupManaged checks if a security group is managed by the controller |
| 179 | +// It checks for the cluster ownership tag to determine if the controller owns this security group |
| 180 | +func (h *awsHelper) isSecurityGroupManaged(sgID string) (bool, error) { |
| 181 | + sg, err := h.getSecurityGroup(sgID) |
| 182 | + if err != nil { |
| 183 | + return false, err |
| 184 | + } |
| 185 | + |
| 186 | + // Check for cluster ownership tag - security groups owned by the controller |
| 187 | + // have the cluster tag with "owned" value |
| 188 | + clusterTagKey := fmt.Sprintf("kubernetes.io/cluster/%s", h.clusterName) |
| 189 | + for _, tag := range sg.Tags { |
| 190 | + if aws.ToString(tag.Key) == clusterTagKey && |
| 191 | + aws.ToString(tag.Value) == "owned" { |
| 192 | + return true, nil |
| 193 | + } |
| 194 | + } |
| 195 | + return false, nil |
| 196 | +} |
| 197 | + |
| 198 | +// getSecurityGroup gets a security group by ID |
| 199 | +func (h *awsHelper) getSecurityGroup(sgID string) (*ec2types.SecurityGroup, error) { |
| 200 | + if h.ec2Client == nil { |
| 201 | + return nil, fmt.Errorf("ec2Client is not initialized") |
| 202 | + } |
| 203 | + result, err := h.ec2Client.DescribeSecurityGroups(h.ctx, &ec2.DescribeSecurityGroupsInput{ |
| 204 | + GroupIds: []string{sgID}, |
| 205 | + }) |
| 206 | + if err != nil { |
| 207 | + return nil, fmt.Errorf("unable to describe security group %q: %v", sgID, err) |
| 208 | + } |
| 209 | + if len(result.SecurityGroups) == 0 { |
| 210 | + return nil, fmt.Errorf("security group %s not found", sgID) |
| 211 | + } |
| 212 | + return &result.SecurityGroups[0], nil |
| 213 | +} |
| 214 | + |
| 215 | +// createSecurityGroup creates a new security group for testing purposes |
| 216 | +func (h *awsHelper) createSecurityGroup(name, description string) (string, error) { |
| 217 | + result, err := h.ec2Client.CreateSecurityGroup(h.ctx, &ec2.CreateSecurityGroupInput{ |
| 218 | + GroupName: aws.String(name), |
| 219 | + Description: aws.String(description), |
| 220 | + TagSpecifications: []ec2types.TagSpecification{ |
| 221 | + { |
| 222 | + ResourceType: ec2types.ResourceTypeSecurityGroup, |
| 223 | + Tags: []ec2types.Tag{ |
| 224 | + { |
| 225 | + Key: aws.String("Name"), |
| 226 | + Value: aws.String(name), |
| 227 | + }, |
| 228 | + { |
| 229 | + Key: aws.String(fmt.Sprintf("kubernetes.io/cluster/%s", h.clusterName)), |
| 230 | + Value: aws.String("shared"), |
| 231 | + }, |
| 232 | + }, |
| 233 | + }, |
| 234 | + }, |
| 235 | + VpcId: aws.String(h.vpcID), |
| 236 | + }) |
| 237 | + if err != nil { |
| 238 | + return "", fmt.Errorf("failed to create security group: %v", err) |
| 239 | + } |
| 240 | + |
| 241 | + return aws.ToString(result.GroupId), nil |
| 242 | +} |
| 243 | + |
| 244 | +// cleanup cleans up the e2e resources. |
| 245 | +func (e *e2eTestConfig) cleanup() { |
| 246 | + framework.Logf("Cleaning up e2e resources") |
| 247 | + |
| 248 | + // Cleanup security group |
| 249 | + if e.awsHelper != nil && e.byoSecurityGroupID != "" { |
| 250 | + err := e.awsHelper.waitForSecurityGroupDeletion(e.byoSecurityGroupID, 5*time.Minute) |
| 251 | + if err != nil { |
| 252 | + framework.Logf("Failed to delete security group %s during cleanup: %v", e.byoSecurityGroupID, err) |
| 253 | + } |
| 254 | + } |
| 255 | +} |
| 256 | + |
| 257 | +// authorizeSecurityGroupToPorts authorizes a security group to allow traffic to the service ports |
| 258 | +func (h *awsHelper) authorizeSecurityGroupToPorts(sgID string, ports []v1.ServicePort) error { |
| 259 | + if h.ec2Client == nil { |
| 260 | + return fmt.Errorf("ec2Client is not initialized") |
| 261 | + } |
| 262 | + |
| 263 | + if len(ports) == 0 { |
| 264 | + return nil |
| 265 | + } |
| 266 | + |
| 267 | + ingressRules := make([]ec2types.IpPermission, 0, len(ports)) |
| 268 | + for _, port := range ports { |
| 269 | + protocol := strings.ToLower(string(port.Protocol)) |
| 270 | + rule := ec2types.IpPermission{ |
| 271 | + FromPort: aws.Int32(int32(port.Port)), |
| 272 | + ToPort: aws.Int32(int32(port.Port)), |
| 273 | + IpProtocol: aws.String(protocol), |
| 274 | + IpRanges: []ec2types.IpRange{ |
| 275 | + { |
| 276 | + CidrIp: aws.String("0.0.0.0/0"), |
| 277 | + Description: aws.String(fmt.Sprintf("E2E test access for port %d", port.Port)), |
| 278 | + }, |
| 279 | + }, |
| 280 | + } |
| 281 | + ingressRules = append(ingressRules, rule) |
| 282 | + } |
| 283 | + _, err := h.ec2Client.AuthorizeSecurityGroupIngress(h.ctx, &ec2.AuthorizeSecurityGroupIngressInput{ |
| 284 | + GroupId: aws.String(sgID), |
| 285 | + IpPermissions: ingressRules, |
| 286 | + }) |
| 287 | + if err != nil { |
| 288 | + // Check if error is due to duplicate rules (which is actually okay) |
| 289 | + if strings.Contains(err.Error(), "InvalidPermission.Duplicate") { |
| 290 | + framework.Logf("Some rules already exist in security group %s (this is okay): %v", sgID, err) |
| 291 | + return nil |
| 292 | + } |
| 293 | + return fmt.Errorf("failed to authorize security group %s to ports %v: %v", sgID, ports, err) |
| 294 | + } |
| 295 | + |
| 296 | + return nil |
| 297 | +} |
| 298 | + |
| 299 | +// verifySecurityGroupRules verifies that the expected rules exist in the security group |
| 300 | +// This is helpful for debugging when rules don't appear to be created |
| 301 | +func (h *awsHelper) verifySecurityGroupRules(sgID string, expectedPorts []v1.ServicePort) error { |
| 302 | + if h.ec2Client == nil { |
| 303 | + return fmt.Errorf("ec2Client is not initialized") |
| 304 | + } |
| 305 | + |
| 306 | + sg, err := h.getSecurityGroup(sgID) |
| 307 | + if err != nil { |
| 308 | + return fmt.Errorf("failed to get security group %s: %v", sgID, err) |
| 309 | + } |
| 310 | + |
| 311 | + // Check if expected ports are covered |
| 312 | + for _, expectedPort := range expectedPorts { |
| 313 | + expectedProtocol := strings.ToLower(string(expectedPort.Protocol)) |
| 314 | + expectedPortNum := int32(expectedPort.Port) |
| 315 | + |
| 316 | + found := false |
| 317 | + for _, rule := range sg.IpPermissions { |
| 318 | + ruleProtocol := aws.ToString(rule.IpProtocol) |
| 319 | + fromPort := aws.ToInt32(rule.FromPort) |
| 320 | + toPort := aws.ToInt32(rule.ToPort) |
| 321 | + |
| 322 | + if ruleProtocol == expectedProtocol && fromPort <= expectedPortNum && expectedPortNum <= toPort { |
| 323 | + found = true |
| 324 | + break |
| 325 | + } |
| 326 | + } |
| 327 | + |
| 328 | + if !found { |
| 329 | + framework.Logf("WARNING: Expected rule for protocol=%s port=%d not found in security group %s", expectedProtocol, expectedPortNum, sgID) |
| 330 | + } |
| 331 | + } |
| 332 | + |
| 333 | + return nil |
| 334 | +} |
| 335 | + |
| 336 | +// waitForSecurityGroupDeletion attempts to delete a security group and waits for it to be deleted |
| 337 | +// It handles dependency violations when the SG is still attached to resources like load balancers |
| 338 | +func (h *awsHelper) waitForSecurityGroupDeletion(sgID string, timeout time.Duration) error { |
| 339 | + return wait.PollImmediate(1*time.Second, timeout, func() (bool, error) { |
| 340 | + _, err := h.getSecurityGroup(sgID) |
| 341 | + if err != nil { |
| 342 | + return true, nil |
| 343 | + } |
| 344 | + |
| 345 | + err = h.deleteSecurityGroup(sgID) |
| 346 | + if err != nil { |
| 347 | + // Check for dependency violation errors |
| 348 | + if strings.Contains(err.Error(), "DependencyViolation") || |
| 349 | + strings.Contains(err.Error(), "InvalidGroup.InUse") || |
| 350 | + strings.Contains(err.Error(), "resource has a dependent object") { |
| 351 | + return false, nil // Keep waiting |
| 352 | + } |
| 353 | + |
| 354 | + // Check if it's already deleted |
| 355 | + if strings.Contains(err.Error(), "InvalidGroup.NotFound") || |
| 356 | + strings.Contains(err.Error(), "InvalidGroupId.NotFound") { |
| 357 | + return true, nil |
| 358 | + } |
| 359 | + |
| 360 | + // For other errors, return the error |
| 361 | + return false, err |
| 362 | + } |
| 363 | + |
| 364 | + framework.Logf("Successfully deleted security group %s", sgID) |
| 365 | + return true, nil |
| 366 | + }) |
| 367 | +} |
| 368 | + |
| 369 | +// deleteSecurityGroup deletes a security group |
| 370 | +func (h *awsHelper) deleteSecurityGroup(sgID string) error { |
| 371 | + if _, err := h.ec2Client.DeleteSecurityGroup(h.ctx, &ec2.DeleteSecurityGroupInput{ |
| 372 | + GroupId: aws.String(sgID), |
| 373 | + }); err != nil { |
| 374 | + return fmt.Errorf("failed to delete security group %s: %v", sgID, err) |
| 375 | + } |
| 376 | + |
| 377 | + return nil |
| 378 | +} |
0 commit comments