Skip to content

✨ Migrate Node packages to AWS SDK v2 #5584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bootstrap/eks/internal/userdata/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package userdata
import (
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go-v2/aws"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"k8s.io/utils/ptr"
Expand Down
12 changes: 12 additions & 0 deletions pkg/cloud/awserrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func Code(err error) (string, bool) {
if awserr, ok := err.(awserr.Error); ok {
return awserr.Code(), true
}

// Handle smithy errors from AWS SDK v2
if smithyErr := ParseSmithyError(err); smithyErr != nil && smithyErr.ErrorCode() != "" {
return smithyErr.ErrorCode(), true
}

return "", false
}

Expand All @@ -77,6 +83,12 @@ func Message(err error) string {
if awserr, ok := err.(awserr.Error); ok {
return awserr.Message()
}

// Handle smithy errors from AWS SDK v2
if smithyErr := ParseSmithyError(err); smithyErr != nil {
return smithyErr.ErrorMessage()
}

return ""
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/cloud/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

awsv2 "github.com/aws/aws-sdk-go-v2/aws"
awsclient "github.com/aws/aws-sdk-go/aws/client"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -34,8 +33,7 @@ import (

// Session represents an AWS session.
type Session interface {
Session() awsclient.ConfigProvider
SessionV2() awsv2.Config
Session() awsv2.Config
ServiceLimiter(service string) *throttle.ServiceLimiter
}

Expand Down
80 changes: 31 additions & 49 deletions pkg/cloud/scope/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,24 @@ import (
"github.com/aws/aws-sdk-go-v2/service/iam"
rgapi "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/ssm"
stsv2 "github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface"
"k8s.io/apimachinery/pkg/runtime"

"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/endpointsv2"
awslogs "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/logs"
awsmetrics "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/metrics"
awsmetricsv2 "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/metricsv2"
stsservice "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/sts"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/throttle"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger"
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/record"
"sigs.k8s.io/cluster-api-provider-aws/v2/version"
)

// NewASGClient creates a new ASG API client for a given session.
func NewASGClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *autoscaling.Client {
cfg := session.SessionV2()
cfg := session.Session()

autoscalingOpts := []func(*autoscaling.Options){
func(o *autoscaling.Options) {
Expand All @@ -68,7 +61,7 @@ func NewASGClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewEC2Client creates a new EC2 API client for a given session.
func NewEC2Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *ec2.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
ec2EndpointResolver := &endpointsv2.EC2EndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -92,7 +85,7 @@ func NewEC2Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewELBClient creates a new ELB API client for a given session.
func NewELBClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *elb.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.ELBEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -116,7 +109,7 @@ func NewELBClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewELBv2Client creates a new ELB v2 API client for a given session.
func NewELBv2Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *elbv2.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.ELBV2EndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -140,7 +133,7 @@ func NewELBv2Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger lo

// NewEventBridgeClient creates a new EventBridge API client for a given session.
func NewEventBridgeClient(scopeUser cloud.ScopeUsage, session cloud.Session, target runtime.Object) *eventbridge.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.EventBridgeEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -161,7 +154,7 @@ func NewEventBridgeClient(scopeUser cloud.ScopeUsage, session cloud.Session, tar

// NewSQSClient creates a new SQS API client for a given session.
func NewSQSClient(scopeUser cloud.ScopeUsage, session cloud.Session, target runtime.Object) *sqs.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.SQSEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -182,7 +175,7 @@ func NewSQSClient(scopeUser cloud.ScopeUsage, session cloud.Session, target runt

// NewGlobalSQSClient for creating a new SQS API client that isn't tied to a cluster.
func NewGlobalSQSClient(scopeUser cloud.ScopeUsage, session cloud.Session) *sqs.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.SQSEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -203,7 +196,7 @@ func NewGlobalSQSClient(scopeUser cloud.ScopeUsage, session cloud.Session) *sqs.

// NewResourgeTaggingClient creates a new Resource Tagging API client for a given session.
func NewResourgeTaggingClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *rgapi.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
endpointResolver := &endpointsv2.RGAPIEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -222,20 +215,27 @@ func NewResourgeTaggingClient(scopeUser cloud.ScopeUsage, session cloud.Session,
}

// NewSecretsManagerClient creates a new Secrets API client for a given session..
func NewSecretsManagerClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) secretsmanageriface.SecretsManagerAPI {
secretsClient := secretsmanager.New(session.Session(), aws.NewConfig().WithLogLevel(awslogs.GetAWSLogLevel(logger.GetLogger())).WithLogger(awslogs.NewWrapLogr(logger.GetLogger())))
secretsClient.Handlers.Build.PushFrontNamed(getUserAgentHandler())
secretsClient.Handlers.Sign.PushFront(session.ServiceLimiter(secretsClient.ServiceID).LimitRequest)
secretsClient.Handlers.CompleteAttempt.PushFront(awsmetrics.CaptureRequestMetrics(scopeUser.ControllerName()))
secretsClient.Handlers.CompleteAttempt.PushFront(session.ServiceLimiter(secretsClient.ServiceID).ReviewResponse)
secretsClient.Handlers.Complete.PushBack(recordAWSPermissionsIssue(target))

return secretsClient
func NewSecretsManagerClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *secretsmanager.Client {
cfg := session.Session()

secretsOpts := []func(*secretsmanager.Options){
func(o *secretsmanager.Options) {
o.Logger = logger.GetAWSLogger()
o.ClientLogMode = awslogs.GetAWSLogLevelV2(logger.GetLogger())
},
secretsmanager.WithAPIOptions(
awsmetricsv2.WithMiddlewares(scopeUser.ControllerName(), target),
awsmetricsv2.WithCAPAUserAgentMiddleware(),
throttle.WithServiceLimiterMiddleware(session.ServiceLimiter(secretsmanager.ServiceID)),
),
}

return secretsmanager.NewFromConfig(cfg, secretsOpts...)
}

// NewEKSClient creates a new EKS API client for a given session.
func NewEKSClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *eks.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
eksEndpointResolver := &endpointsv2.EKSEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -253,7 +253,7 @@ func NewEKSClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewIAMClient creates a new IAM API client for a given session.
func NewIAMClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *iam.Client {
cfg := session.SessionV2()
cfg := session.Session()

iamOpts := []func(*iam.Options){
func(o *iam.Options) {
Expand All @@ -271,7 +271,7 @@ func NewIAMClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewSTSClient creates a new STS API client for a given session.
func NewSTSClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) stsservice.STSClient {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
stsEndpointResolver := &endpointsv2.STSEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -294,7 +294,7 @@ func NewSTSClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewSSMClient creates a new Secrets API client for a given session.
func NewSSMClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *ssm.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
ssmEndpointResolver := &endpointsv2.SSMEndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -316,7 +316,7 @@ func NewSSMClient(scopeUser cloud.ScopeUsage, session cloud.Session, logger logg

// NewS3Client creates a new S3 API client for a given session.
func NewS3Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logger.Wrapper, target runtime.Object) *s3.Client {
cfg := session.SessionV2()
cfg := session.Session()
multiSvcEndpointResolver := endpointsv2.NewMultiServiceEndpointResolver()
s3EndpointResolver := &endpointsv2.S3EndpointResolver{
MultiServiceEndpointResolver: multiSvcEndpointResolver,
Expand All @@ -332,28 +332,10 @@ func NewS3Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logge
return s3.NewFromConfig(cfg, s3Opts...)
}

func recordAWSPermissionsIssue(target runtime.Object) func(r *request.Request) {
return func(r *request.Request) {
if awsErr, ok := r.Error.(awserr.Error); ok {
switch awsErr.Code() {
case "AuthFailure", "UnauthorizedOperation", "NoCredentialProviders":
record.Warnf(target, awsErr.Code(), "Operation %s failed with a credentials or permission issue", r.Operation.Name)
}
}
}
}

func getUserAgentHandler() request.NamedHandler {
return request.NamedHandler{
Name: "capa/user-agent",
Fn: request.MakeAddToUserAgentHandler("aws.cluster.x-k8s.io", version.Get().String()),
}
}

// AWSClients contains all the aws clients used by the scopes.
type AWSClients struct {
ELB *elb.Client
SecretsManager secretsmanageriface.SecretsManagerAPI
SecretsManager *secretsmanager.Client
ResourceTagging *rgapi.Client
ASG *autoscaling.Client
EC2 *ec2.Client
Expand Down
28 changes: 7 additions & 21 deletions pkg/cloud/scope/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func NewClusterScope(params ClusterScopeParams) (*ClusterScope, error) {
maxWaitActiveUpdateDelete: params.MaxWaitActiveUpdateDelete,
}

session, serviceLimiters, err := sessionForClusterWithRegion(params.Client, clusterScope, params.AWSCluster.Spec.Region, params.Endpoints, params.Logger)
if err != nil {
return nil, errors.Errorf("failed to create aws session: %v", err)
}

sessionv2, serviceLimitersv2, err := sessionForClusterWithRegionV2(params.Client, clusterScope, params.AWSCluster.Spec.Region, params.Endpoints, params.Logger)
if err != nil {
return nil, errors.Errorf("failed to create aws V2 session: %v", err)
Expand All @@ -93,10 +88,8 @@ func NewClusterScope(params ClusterScopeParams) (*ClusterScope, error) {
}

clusterScope.patchHelper = helper
clusterScope.session = session
clusterScope.sessionV2 = *sessionv2
clusterScope.serviceLimiters = serviceLimiters
clusterScope.serviceLimitersV2 = serviceLimitersv2
clusterScope.session = *sessionv2
clusterScope.serviceLimiters = serviceLimitersv2

return clusterScope, nil
}
Expand All @@ -110,11 +103,9 @@ type ClusterScope struct {
Cluster *clusterv1.Cluster
AWSCluster *infrav1.AWSCluster

session awsclient.ConfigProvider
sessionV2 awsv2.Config
serviceLimiters throttle.ServiceLimiters
serviceLimitersV2 throttle.ServiceLimiters
controllerName string
session awsv2.Config
serviceLimiters throttle.ServiceLimiters
controllerName string

tagUnmanagedNetworkResources bool
maxWaitActiveUpdateDelete time.Duration
Expand Down Expand Up @@ -361,16 +352,11 @@ func (s *ClusterScope) ClusterObj() cloud.ClusterObject {
return s.Cluster
}

// Session returns the AWS SDK session. Used for creating clients.
func (s *ClusterScope) Session() awsclient.ConfigProvider {
// Session returns the AWS SDK V2 session. Used for creating clients.
func (s *ClusterScope) Session() awsv2.Config {
return s.session
}

// SessionV2 returns the AWS SDK V2 session. Used for creating clients.
func (s *ClusterScope) SessionV2() awsv2.Config {
return s.sessionV2
}

// ServiceLimiter returns the AWS SDK session. Used for creating clients.
func (s *ClusterScope) ServiceLimiter(service string) *throttle.ServiceLimiter {
if sl, ok := s.serviceLimiters[service]; ok {
Expand Down
44 changes: 15 additions & 29 deletions pkg/cloud/scope/fargate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func NewFargateProfileScope(params FargateProfileScopeParams) (*FargateProfileSc
controllerName: params.ControllerName,
}

session, serviceLimiters, err := sessionForClusterWithRegion(params.Client, managedScope, params.ControlPlane.Spec.Region, params.Endpoints, params.Logger)
if err != nil {
return nil, errors.Errorf("failed to create aws session: %v", err)
}

sessionv2, serviceLimitersv2, err := sessionForClusterWithRegionV2(params.Client, managedScope, params.ControlPlane.Spec.Region, params.Endpoints, params.Logger)
if err != nil {
return nil, errors.Errorf("failed to create aws v2 session: %v", err)
Expand All @@ -86,18 +81,16 @@ func NewFargateProfileScope(params FargateProfileScopeParams) (*FargateProfileSc
}

return &FargateProfileScope{
Logger: *params.Logger,
Client: params.Client,
Cluster: params.Cluster,
ControlPlane: params.ControlPlane,
FargateProfile: params.FargateProfile,
patchHelper: helper,
session: session,
sessionV2: *sessionv2,
serviceLimiters: serviceLimiters,
serviceLimitersV2: serviceLimitersv2,
controllerName: params.ControllerName,
enableIAM: params.EnableIAM,
Logger: *params.Logger,
Client: params.Client,
Cluster: params.Cluster,
ControlPlane: params.ControlPlane,
FargateProfile: params.FargateProfile,
patchHelper: helper,
session: *sessionv2,
serviceLimiters: serviceLimitersv2,
controllerName: params.ControllerName,
enableIAM: params.EnableIAM,
}, nil
}

Expand All @@ -111,11 +104,9 @@ type FargateProfileScope struct {
ControlPlane *ekscontrolplanev1.AWSManagedControlPlane
FargateProfile *expinfrav1.AWSFargateProfile

session awsclient.ConfigProvider
sessionV2 awsv2.Config
serviceLimiters throttle.ServiceLimiters
serviceLimitersV2 throttle.ServiceLimiters
controllerName string
session awsv2.Config
serviceLimiters throttle.ServiceLimiters
controllerName string

enableIAM bool
}
Expand Down Expand Up @@ -225,16 +216,11 @@ func (s *FargateProfileScope) ClusterObj() cloud.ClusterObject {
return s.Cluster
}

// Session returns the AWS SDK session. Used for creating clients.
func (s *FargateProfileScope) Session() awsclient.ConfigProvider {
// Session returns the AWS SDK V2 session. Used for creating clients.
func (s *FargateProfileScope) Session() awsv2.Config {
return s.session
}

// SessionV2 returns the AWS SDK session. Used for creating clients.
func (s *FargateProfileScope) SessionV2() awsv2.Config {
return s.sessionV2
}

// ControllerName returns the name of the controller that
// created the FargateProfile.
func (s *FargateProfileScope) ControllerName() string {
Expand Down
Loading