Skip to content

Commit ce73264

Browse files
committed
fix metrics middlewares
Signed-off-by: Pankaj Walke <[email protected]>
1 parent 46f4cdb commit ce73264

File tree

6 files changed

+130
-78
lines changed

6 files changed

+130
-78
lines changed

pkg/cloud/identityv2/identity.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
66
You may obtain a copy of the License at
77
8-
http://www.apache.org/licenses/LICENSE-2.0
8+
http://www.apache.org/licenses/LICENSE-2.0
99
1010
Unless required by applicable law or agreed to in writing, software
1111
distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,6 +30,7 @@ import (
3030
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
3131
"github.com/aws/aws-sdk-go-v2/service/sts"
3232
corev1 "k8s.io/api/core/v1"
33+
awsmetricsv2 "sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/metricsv2"
3334

3435
infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
3536
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger"
@@ -68,7 +69,10 @@ func GetAssumeRoleCredentialsCache(roleIdentityProvider *AWSRolePrincipalTypePro
6869
return nil, err
6970
}
7071

71-
stsClient := sts.NewFromConfig(cfg)
72+
stsOpts := sts.WithAPIOptions(
73+
awsmetricsv2.WithMiddlewares("identity provider", roleIdentityProvider.Principal),
74+
awsmetricsv2.WithCAPAUserAgentMiddleware())
75+
stsClient := sts.NewFromConfig(cfg, stsOpts)
7276
credsProvider := stscreds.NewAssumeRoleProvider(stsClient, roleIdentityProvider.Principal.Spec.RoleArn, func(o *stscreds.AssumeRoleOptions) {
7377
if roleIdentityProvider.Principal.Spec.ExternalID != "" {
7478
o.ExternalID = aws.String(roleIdentityProvider.Principal.Spec.ExternalID)

pkg/cloud/metrics/metrics.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ func init() {
6969
metrics.Registry.MustRegister(awsCallRetries)
7070
}
7171

72-
// TODO: mjlshen rewrite this for aws-sdk-go-v2
73-
// https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/sdk-timing.html
74-
7572
// CaptureRequestMetrics will monitor and capture request metrics.
7673
func CaptureRequestMetrics(controller string) func(r *request.Request) {
7774
return func(r *request.Request) {

pkg/cloud/metricsv2/metrics.go

Lines changed: 119 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
66
You may obtain a copy of the License at
77
8-
http://www.apache.org/licenses/LICENSE-2.0
8+
http://www.apache.org/licenses/LICENSE-2.0
99
1010
Unless required by applicable law or agreed to in writing, software
1111
distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24-
"net/http"
2524
"strconv"
2625
"time"
2726

@@ -56,25 +55,19 @@ var (
5655
Name: metricRequestCountKey,
5756
Help: "Total number of AWS requests",
5857
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel, metricStatusCodeLabel, metricErrorCodeLabel})
58+
5959
awsRequestDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
6060
Subsystem: metricAWSSubsystem,
6161
Name: metricRequestDurationKey,
6262
Help: "Latency of HTTP requests to AWS",
6363
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel})
64+
6465
awsCallRetries = prometheus.NewHistogramVec(prometheus.HistogramOpts{
6566
Subsystem: metricAWSSubsystem,
6667
Name: metricAPICallRetries,
6768
Help: "Number of retries made against an AWS API",
6869
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
6970
}, []string{metricControllerLabel, metricServiceLabel, metricRegionLabel, metricOperationLabel})
70-
getRawResponse = func(metadata middleware.Metadata) *http.Response {
71-
switch res := awsmiddleware.GetRawResponse(metadata).(type) {
72-
case *http.Response:
73-
return res
74-
default:
75-
return nil
76-
}
77-
}
7871
)
7972

8073
func init() {
@@ -91,127 +84,124 @@ type RequestData struct {
9184
RequestEndTime time.Time
9285
StatusCode int
9386
ErrorCode string
94-
RequestCount int
9587
Service string
9688
OperationName string
9789
Region string
98-
UserAgent string
9990
Controller string
10091
Target runtime.Object
10192
Attempts int
10293
}
10394

10495
// WithMiddlewares adds instrumentation middleware stacks to AWS GO SDK V2 service clients.
105-
// Inspired by https://github.com/jonathan-innis/aws-sdk-go-prometheus/v2.
10696
func WithMiddlewares(controller string, target runtime.Object) func(stack *middleware.Stack) error {
10797
return func(stack *middleware.Stack) error {
10898
if err := stack.Initialize.Add(getMetricCollectionMiddleware(controller, target), middleware.Before); err != nil {
109-
return err
110-
}
111-
if err := stack.Build.Add(getAddToUserAgentMiddleware(), middleware.Before); err != nil {
112-
return err
99+
return fmt.Errorf("failed to add metric collection middleware: %w", err)
113100
}
114101
if err := stack.Finalize.Add(getRequestMetricContextMiddleware(), middleware.Before); err != nil {
115-
return err
102+
return fmt.Errorf("failed to add request metric context middleware: %w", err)
116103
}
117104
if err := stack.Finalize.Insert(getAttemptContextMiddleware(), "Retry", middleware.After); err != nil {
118-
return err
105+
return fmt.Errorf("failed to add attempt context middleware: %w", err)
119106
}
120-
return stack.Deserialize.Add(getRecordAWSPermissionsIssueMiddleware(target), middleware.After)
107+
return stack.Finalize.Add(getRecordAWSPermissionsIssueMiddleware(target), middleware.After)
121108
}
122109
}
123110

111+
// WithCAPAUserAgentMiddleware returns User Agent middleware stack for AWS GO SDK V2 sessions.
112+
func WithCAPAUserAgentMiddleware() func(*middleware.Stack) error {
113+
return awsmiddleware.AddUserAgentKeyValue("aws.cluster.x-k8s.io", version.Get().String())
114+
}
115+
124116
func getMetricCollectionMiddleware(controller string, target runtime.Object) middleware.InitializeMiddleware {
125117
return middleware.InitializeMiddlewareFunc("capa/MetricCollectionMiddleware", func(ctx context.Context, input middleware.InitializeInput, handler middleware.InitializeHandler) (middleware.InitializeOutput, middleware.Metadata, error) {
126118
ctx = initRequestContext(ctx, controller, target)
127119
request := getContext(ctx)
128120

129121
request.RequestStartTime = time.Now().UTC()
130122
out, metadata, err := handler.HandleInitialize(ctx, input)
131-
request.RequestEndTime = time.Now().UTC()
132123

124+
if responseAt, ok := awsmiddleware.GetResponseAt(metadata); ok {
125+
request.RequestEndTime = responseAt
126+
} else {
127+
request.RequestEndTime = time.Now().UTC()
128+
}
133129
request.CaptureRequestMetrics()
134-
135130
return out, metadata, err
136131
})
137132
}
138133

139134
func getRequestMetricContextMiddleware() middleware.FinalizeMiddleware {
140135
return middleware.FinalizeMiddlewareFunc("capa/RequestMetricContextMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
141136
request := getContext(ctx)
142-
request.Service = awsmiddleware.GetServiceID(ctx)
143-
request.OperationName = awsmiddleware.GetOperationName(ctx)
144-
request.Region = awsmiddleware.GetRegion(ctx)
145137

138+
if request != nil {
139+
request.Service = awsmiddleware.GetServiceID(ctx)
140+
request.OperationName = awsmiddleware.GetOperationName(ctx)
141+
request.Region = awsmiddleware.GetRegion(ctx)
142+
}
146143
return handler.HandleFinalize(ctx, input)
147144
})
148145
}
149146

150-
// For capturing retry count and status codes.
147+
// getAttemptContextMiddleware will capture StatusCode and ErrorCode from API call attempt.
148+
// This will result in the StatusCode and ErrorCode captured for last attempt when publishing to metrics.
151149
func getAttemptContextMiddleware() middleware.FinalizeMiddleware {
152150
return middleware.FinalizeMiddlewareFunc("capa/AttemptMetricContextMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
153151
request := getContext(ctx)
154-
request.Attempts++
152+
if request != nil {
153+
request.Attempts++
154+
}
155+
155156
out, metadata, err := handler.HandleFinalize(ctx, input)
156-
response := getRawResponse(metadata)
157157

158-
if response.Body != nil {
159-
defer response.Body.Close()
160-
}
158+
if request != nil {
159+
if rawResp := awsmiddleware.GetRawResponse(metadata); rawResp != nil {
160+
if httpResp, ok := rawResp.(*smithyhttp.Response); ok {
161+
request.StatusCode = httpResp.StatusCode
162+
}
163+
} else {
164+
request.StatusCode = -1
165+
}
161166

162-
// This will record only last attempts status code.
163-
// Can be further extended to capture status codes of all attempts
164-
if response != nil {
165-
request.StatusCode = response.StatusCode
166-
} else {
167-
request.StatusCode = -1
167+
if err != nil {
168+
request.ErrorCode, _, request.StatusCode = parseSmithyError(err)
169+
}
168170
}
169171

170172
return out, metadata, err
171173
})
172174
}
173-
func getRecordAWSPermissionsIssueMiddleware(target runtime.Object) middleware.DeserializeMiddleware {
174-
return middleware.DeserializeMiddlewareFunc("capa/RecordAWSPermissionsIssueMiddleware", func(ctx context.Context, input middleware.DeserializeInput, handler middleware.DeserializeHandler) (middleware.DeserializeOutput, middleware.Metadata, error) {
175-
output, metadata, err := handler.HandleDeserialize(ctx, input)
175+
176+
func getRecordAWSPermissionsIssueMiddleware(target runtime.Object) middleware.FinalizeMiddleware {
177+
return middleware.FinalizeMiddlewareFunc("capa/RecordAWSPermissionsIssueMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
178+
output, metadata, err := handler.HandleFinalize(ctx, input)
176179
if err != nil {
177-
var re *smithyhttp.ResponseError
178-
if errors.As(err, &re) {
179-
var ae smithy.APIError
180-
if errors.As(re.Err, &ae) {
181-
switch ae.ErrorCode() {
182-
case "AuthFailure", "UnauthorizedOperation", "NoCredentialProviders":
183-
record.Warnf(target, ae.ErrorCode(), "Operation %s failed with a credentials or permission issue", awsmiddleware.GetOperationName(ctx))
184-
}
180+
request := getContext(ctx)
181+
if request != nil {
182+
var errMessage string
183+
request.ErrorCode, errMessage, _ = parseSmithyError(err)
184+
switch request.ErrorCode {
185+
case "AccessDenied", "AuthFailure", "UnauthorizedOperation", "NoCredentialProviders",
186+
"ExpiredToken", "InvalidClientTokenId", "SignatureDoesNotMatch", "ValidationError":
187+
record.Warnf(target, request.ErrorCode,
188+
"Operation %s failed with a credentials or permission issue: %s",
189+
request.OperationName,
190+
errMessage,
191+
)
185192
}
186193
}
187194
}
188195
return output, metadata, err
189196
})
190197
}
191198

192-
func getAddToUserAgentMiddleware() middleware.BuildMiddleware {
193-
return middleware.BuildMiddlewareFunc("capa/AddUserAgentMiddleware", func(ctx context.Context, input middleware.BuildInput, handler middleware.BuildHandler) (middleware.BuildOutput, middleware.Metadata, error) {
194-
request := getContext(ctx)
195-
r, ok := input.Request.(*smithyhttp.Request)
196-
if !ok {
197-
return middleware.BuildOutput{}, middleware.Metadata{}, fmt.Errorf("unknown transport type %T", input.Request)
198-
}
199-
200-
if curUA := r.Header.Get("User-Agent"); curUA != "" {
201-
request.UserAgent = curUA + " " + request.UserAgent
202-
}
203-
r.Header.Set("User-Agent", request.UserAgent)
204-
205-
return handler.HandleBuild(ctx, input)
206-
})
207-
}
208-
209199
func initRequestContext(ctx context.Context, controller string, target runtime.Object) context.Context {
210200
if middleware.GetStackValue(ctx, requestContextKey{}) == nil {
211201
ctx = middleware.WithStackValue(ctx, requestContextKey{}, &RequestData{
212202
Controller: controller,
213203
Target: target,
214-
UserAgent: fmt.Sprintf("aws.cluster.x-k8s.io/%s", version.Get().String()),
204+
Attempts: 0,
215205
})
216206
}
217207
return ctx
@@ -225,12 +215,70 @@ func getContext(ctx context.Context) *RequestData {
225215
return rctx.(*RequestData)
226216
}
227217

218+
// parseSmithyError parse Smithy Error and returns errorCode, errorMessage and statusCode.
219+
func parseSmithyError(err error) (string, string, int) {
220+
var errCode, errMessage string
221+
var statusCode int
222+
223+
if err == nil {
224+
return errCode, errMessage, statusCode
225+
}
226+
227+
var ae smithy.APIError
228+
if errors.As(err, &ae) {
229+
errCode = ae.ErrorCode()
230+
errMessage = ae.Error()
231+
}
232+
233+
var re *smithyhttp.ResponseError
234+
if errors.As(err, &re) {
235+
if re.Response != nil {
236+
statusCode = re.Response.StatusCode
237+
}
238+
var innerAE smithy.APIError
239+
if re.Err != nil && errors.As(re.Err, &innerAE) {
240+
errCode = innerAE.ErrorCode()
241+
}
242+
}
243+
244+
return errCode, errMessage, statusCode
245+
}
246+
228247
// CaptureRequestMetrics will monitor and capture request metrics.
229248
func (r *RequestData) CaptureRequestMetrics() {
230-
requestDuration := r.RequestStartTime.Sub(r.RequestEndTime)
231-
retryCount := r.Attempts - 1
249+
if r.Service == "" || r.Region == "" || r.OperationName == "" || r.Controller == "" {
250+
return
251+
}
252+
253+
requestDuration := r.RequestEndTime.Sub(r.RequestStartTime)
254+
retryCount := max(r.Attempts-1, 0)
255+
statusCode := strconv.Itoa(r.StatusCode)
256+
errorCode := r.ErrorCode
257+
258+
if errorCode == "" && r.StatusCode >= 400 {
259+
errorCode = fmt.Sprintf("HTTP%d", r.StatusCode)
260+
}
261+
262+
awsRequestCount.WithLabelValues(
263+
r.Controller,
264+
r.Service,
265+
r.Region,
266+
r.OperationName,
267+
statusCode,
268+
errorCode,
269+
).Inc()
270+
271+
awsRequestDurationSeconds.WithLabelValues(
272+
r.Controller,
273+
r.Service,
274+
r.Region,
275+
r.OperationName,
276+
).Observe(requestDuration.Seconds())
232277

233-
awsRequestCount.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName, strconv.Itoa(r.StatusCode), r.ErrorCode).Inc()
234-
awsRequestDurationSeconds.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName).Observe(requestDuration.Seconds())
235-
awsCallRetries.WithLabelValues(r.Controller, r.Service, r.Region, r.OperationName).Observe(float64(retryCount))
278+
awsCallRetries.WithLabelValues(
279+
r.Controller,
280+
r.Service,
281+
r.Region,
282+
r.OperationName,
283+
).Observe(float64(retryCount))
236284
}

pkg/cloud/scope/clients.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func NewS3Client(scopeUser cloud.ScopeUsage, session cloud.Session, logger logge
213213
o.ClientLogMode = awslogs.GetAWSLogLevelV2(logger.GetLogger())
214214
o.EndpointResolverV2 = s3EndpointResolver
215215
},
216-
s3.WithAPIOptions(awsmetricsv2.WithMiddlewares(scopeUser.ControllerName(), target)),
216+
s3.WithAPIOptions(awsmetricsv2.WithMiddlewares(scopeUser.ControllerName(), target), awsmetricsv2.WithCAPAUserAgentMiddleware()),
217217
}
218218
return s3.NewFromConfig(cfg, s3Opts...)
219219
}

pkg/cloud/scope/session.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ func sessionForRegionV2(region string, _ []ServiceEndpoint) (*awsv2.Config, thro
141141
return &ns, sl, nil
142142
}
143143

144-
// TODO: mjlshen rewrite this for aws-sdk-go-v2.
145144
func sessionForClusterWithRegion(k8sClient client.Client, clusterScoper cloud.SessionMetadata, region string, endpoint []ServiceEndpoint, log logger.Wrapper) (*session.Session, throttle.ServiceLimiters, error) {
146145
log = log.WithName("identity")
147146
log.Trace("Creating an AWS Session")

pkg/cloud/services/s3/s3.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ func NewService(s3Scope scope.S3Scope) *Service {
8282
// ReconcileBucket reconciles the S3 bucket.
8383
func (s *Service) ReconcileBucket(ctx context.Context) error {
8484
if !s.bucketManagementEnabled() {
85+
bucketName := "abcdef-test-cluster-123456789"
86+
if err := s.createBucketIfNotExist(ctx, bucketName); err != nil {
87+
return errors.Wrap(err, "ensuring bucket exists")
88+
}
8589
return nil
8690
}
8791

0 commit comments

Comments
 (0)