1
1
package gcs
2
2
3
3
import (
4
+ "context"
5
+ "errors"
4
6
"fmt"
7
+ "net/http"
8
+ "strings"
9
+ "time"
5
10
6
- "k8s.io/klog/v2"
7
-
11
+ configv1 "github.com/openshift/api/config/v1"
12
+ imageregistryv1 "github.com/openshift/api/imageregistry/v1"
13
+ operatorapi "github.com/openshift/api/operator/v1"
8
14
configlisters "github.com/openshift/client-go/config/listers/config/v1"
15
+ regopclient "github.com/openshift/cluster-image-registry-operator/pkg/client"
16
+ "github.com/openshift/cluster-image-registry-operator/pkg/defaults"
9
17
"github.com/openshift/cluster-image-registry-operator/pkg/storage/util"
18
+
19
+ rscmgr "cloud.google.com/go/resourcemanager/apiv3"
20
+ rscmgrpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb"
21
+ "github.com/googleapis/gax-go/v2"
22
+ "github.com/googleapis/gax-go/v2/apierror"
23
+ "golang.org/x/time/rate"
24
+ "google.golang.org/api/iterator"
25
+ "google.golang.org/api/option"
26
+
27
+ "k8s.io/klog/v2"
10
28
)
11
29
12
30
const (
13
31
// ocpDefaultLabelFmt is the format string for the default label
14
32
// added to the OpenShift created GCP resources.
15
33
ocpDefaultLabelFmt = "kubernetes-io-cluster-%s"
34
+
35
+ // gcpTagsSuccessStatusReason is the operator condition status reason
36
+ // for successful tag operations.
37
+ gcpTagsSuccessStatusReason = "SuccessTaggingBucket"
38
+
39
+ // gcpTagsFailedStatusReason is the operator condition status reason
40
+ // for failed tag operations.
41
+ gcpTagsFailedStatusReason = "ErrorTaggingBucket"
42
+
43
+ // gcpMaxTagsPerResource is the maximum number of tags that can
44
+ // be attached to a resource.
45
+ gcpMaxTagsPerResource = 50
46
+
47
+ // gcpTagsRequestRateLimit is the tag request rate limit per second.
48
+ gcpTagsRequestRateLimit = 8
49
+
50
+ // gcpTagsRequestTokenBucketSize is the burst/token bucket size used
51
+ // for limiting API requests.
52
+ gcpTagsRequestTokenBucketSize = 8
53
+
54
+ // resourceManagerHostSubPath is the endpoint for tag requests.
55
+ resourceManagerHostSubPath = "cloudresourcemanager.googleapis.com"
56
+
57
+ // bucketParentPathFmt is the string format for the parent path of a bucket resource
58
+ bucketParentPathFmt = "//storage.googleapis.com/projects/_/buckets/%s"
16
59
)
17
60
18
61
func getUserLabels (infraLister configlisters.InfrastructureLister ) (map [string ]string , error ) {
19
62
infra , err := util .GetInfrastructure (infraLister )
20
63
if err != nil {
21
- klog .Errorf ("getUserLabels: failed to read infrastructure/cluster resource: %w" , err )
22
- return nil , err
64
+ return nil , fmt .Errorf ("getUserLabels: failed to read infrastructure/cluster resource: %w" , err )
23
65
}
24
66
// add OCP default label along with user-defined labels
25
67
labels := map [string ]string {
@@ -35,3 +77,213 @@ func getUserLabels(infraLister configlisters.InfrastructureLister) (map[string]s
35
77
}
36
78
return labels , nil
37
79
}
80
+
81
+ // newLimiter returns token bucket based request rate limiter after initializing
82
+ // the passed values for limit, burst(or token bucket) size. If opted for emptyBucket
83
+ // all initial tokens are reserved for the first burst.
84
+ func newLimiter (limit , burst int , emptyBucket bool ) * rate.Limiter {
85
+ limiter := rate .NewLimiter (rate .Every (time .Second / time .Duration (limit )), burst )
86
+
87
+ if emptyBucket {
88
+ limiter .AllowN (time .Now (), burst )
89
+ }
90
+
91
+ return limiter
92
+ }
93
+
94
+ // toTagValueList converts the tags to an array containing tagValues
95
+ // NamespacedNames.
96
+ func toTagValueList (tags []configv1.GCPResourceTag ) []string {
97
+ if len (tags ) <= 0 {
98
+ return nil
99
+ }
100
+
101
+ list := make ([]string , 0 , len (tags ))
102
+ for _ , tag := range tags {
103
+ t := fmt .Sprintf ("%s/%s/%s" , tag .ParentID , tag .Key , tag .Value )
104
+ list = append (list , t )
105
+ }
106
+ return list
107
+ }
108
+
109
+ // getInfraResourceTagsList returns the user-defined tags present in the
110
+ // status sub-resource of Infrastructure.
111
+ func getInfraResourceTagsList (platformStatus * configv1.PlatformStatus ) []configv1.GCPResourceTag {
112
+ if platformStatus != nil && platformStatus .GCP != nil && platformStatus .GCP .ResourceTags != nil {
113
+ return platformStatus .GCP .ResourceTags
114
+ }
115
+ klog .V (1 ).Infof ("getInfraResourceTagsList: user-defined tag list is not provided" )
116
+ return nil
117
+ }
118
+
119
+ // getTagsList returns the list of tags to apply on the resources.
120
+ func getTagsList (platformStatus * configv1.PlatformStatus ) []string {
121
+ return toTagValueList (getInfraResourceTagsList (platformStatus ))
122
+ }
123
+
124
+ // getFilteredTagList returns the list of tags to apply on the resources after
125
+ // filtering the tags already existing on a given resource.
126
+ func getFilteredTagList (ctx context.Context , platformStatus * configv1.PlatformStatus , client * rscmgr.TagBindingsClient , parent string ) []string {
127
+ return filterTagList (ctx , client , parent , getTagsList (platformStatus ))
128
+ }
129
+
130
+ // filterTagList returns the filtered list of tags to apply on the resources.
131
+ func filterTagList (ctx context.Context , client * rscmgr.TagBindingsClient , parent string , tagList []string ) []string {
132
+ dupTags := make (map [string ]bool , len (tagList ))
133
+ for _ , k := range tagList {
134
+ dupTags [k ] = false
135
+ }
136
+
137
+ listBindingsReq := & rscmgrpb.ListEffectiveTagsRequest {
138
+ Parent : parent ,
139
+ }
140
+ bindings := client .ListEffectiveTags (ctx , listBindingsReq )
141
+ // a resource can have a maximum of {gcpMaxTagsPerResource} tags attached to it.
142
+ // Will iterate for {gcpMaxTagsPerResource} times in the worst case scenario, if
143
+ // none of the break conditions are met. Should the {gcpMaxTagsPerResource} be
144
+ // increased in future, it should not create an issue, since this is an optimization
145
+ // attempt to reduce the number the tag write calls by skipping already existing tags,
146
+ // since it has a quota restriction.
147
+ for i := 0 ; i < gcpMaxTagsPerResource ; i ++ {
148
+ binding , err := bindings .Next ()
149
+ if errors .Is (err , iterator .Done ) {
150
+ break
151
+ }
152
+ if err != nil || binding == nil {
153
+ klog .V (4 ).Infof ("failed to list effective tags on the %s bucket: %v: %v" , parent , binding , err )
154
+ break
155
+ }
156
+ tag := binding .GetNamespacedTagValue ()
157
+ if _ , exist := dupTags [tag ]; exist {
158
+ dupTags [tag ] = true
159
+ klog .V (4 ).Infof ("filterTagList: skipping tag %s already exists on the %s bucket" , tag , parent )
160
+ }
161
+ }
162
+
163
+ filteredTags := make ([]string , 0 , len (tagList ))
164
+ for tagValue , dup := range dupTags {
165
+ if ! dup {
166
+ filteredTags = append (filteredTags , tagValue )
167
+ }
168
+ }
169
+
170
+ return filteredTags
171
+ }
172
+
173
+ // getCreateCallOptions returns a list of additional call options to use for
174
+ // the create operations.
175
+ func getCreateCallOptions () []gax.CallOption {
176
+ return []gax.CallOption {
177
+ gax .WithRetry (func () gax.Retryer {
178
+ return gax .OnHTTPCodes (gax.Backoff {
179
+ Initial : 90 * time .Second ,
180
+ Max : 5 * time .Minute ,
181
+ Multiplier : 2 ,
182
+ },
183
+ http .StatusTooManyRequests )
184
+ }),
185
+ }
186
+ }
187
+
188
+ // getTagBindingsClient returns the client to be used for creating tag bindings to
189
+ // the resources.
190
+ func getTagBindingsClient (ctx context.Context , listers * regopclient.StorageListers , location string ) (* rscmgr.TagBindingsClient , error ) {
191
+ cfg , err := GetConfig (listers )
192
+ if err != nil {
193
+ return nil , fmt .Errorf ("getTagBindingsClient: failed to read gcp config: %w" , err )
194
+ }
195
+
196
+ endpoint := fmt .Sprintf ("https://%s-%s" , location , resourceManagerHostSubPath )
197
+ opts := []option.ClientOption {
198
+ option .WithCredentialsJSON ([]byte (cfg .KeyfileData )),
199
+ option .WithEndpoint (endpoint ),
200
+ }
201
+ return rscmgr .NewTagBindingsRESTClient (ctx , opts ... )
202
+ }
203
+
204
+ // addTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
205
+ // to the passed GCP bucket resource. It's wrapper around addUserTagsToStorageBucket()
206
+ // additionally updates status condition.
207
+ func addTagsToStorageBucket (ctx context.Context , cr * imageregistryv1.Config , listers * regopclient.StorageListers , bucketName , region string ) error {
208
+ if err := addUserTagsToStorageBucket (ctx , listers , bucketName , region ); err != nil {
209
+ util .UpdateCondition (cr , defaults .StorageTagged , operatorapi .ConditionFalse ,
210
+ gcpTagsFailedStatusReason , err .Error ())
211
+ return err
212
+ }
213
+ util .UpdateCondition (cr , defaults .StorageTagged , operatorapi .ConditionTrue ,
214
+ gcpTagsSuccessStatusReason ,
215
+ fmt .Sprintf ("Successfully added user-defined tags to %s storage bucket" , bucketName ))
216
+ return nil
217
+ }
218
+
219
+ // addUserTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
220
+ // to the passed GCP bucket resource.
221
+ func addUserTagsToStorageBucket (ctx context.Context , listers * regopclient.StorageListers , bucketName , region string ) error {
222
+ // Tags are not supported for buckets located in the us-east2 and us-east3 regions.
223
+ // https://cloud.google.com/storage/docs/tags-and-labels#tags
224
+ if strings .ToLower (region ) == "us-east2" ||
225
+ strings .ToLower (region ) == "us-east3" {
226
+ klog .Infof ("addUserTagsToStorageBucket: skip tagging bucket %s created in tags unsupported region %s" , bucketName , region )
227
+ return nil
228
+ }
229
+
230
+ infra , err := util .GetInfrastructure (listers .Infrastructures )
231
+ if err != nil {
232
+ return fmt .Errorf ("addUserTagsToStorageBucket: failed to read infrastructure/cluster resource: %w" , err )
233
+ }
234
+
235
+ client , err := getTagBindingsClient (ctx , listers , region )
236
+ if err != nil || client == nil {
237
+ return fmt .Errorf ("failed to create tag binding client for adding tags to %s bucket: %v" ,
238
+ bucketName , err )
239
+ }
240
+ defer client .Close ()
241
+
242
+ parent := fmt .Sprintf (bucketParentPathFmt , bucketName )
243
+ tagValues := getFilteredTagList (ctx , infra .Status .PlatformStatus , client , parent )
244
+ if len (tagValues ) <= 0 {
245
+ return nil
246
+ }
247
+
248
+ // GCP has a rate limit of 600 requests per minute, restricting
249
+ // here to 8 requests per second.
250
+ limiter := newLimiter (gcpTagsRequestRateLimit , gcpTagsRequestTokenBucketSize , true )
251
+
252
+ tagBindingReq := & rscmgrpb.CreateTagBindingRequest {
253
+ TagBinding : & rscmgrpb.TagBinding {
254
+ Parent : parent ,
255
+ },
256
+ }
257
+ errFlag := false
258
+ for _ , value := range tagValues {
259
+ if err := limiter .Wait (ctx ); err != nil {
260
+ errFlag = true
261
+ klog .Errorf ("rate limiting request to add %s tag to %s bucket failed: %v" ,
262
+ value , bucketName , err )
263
+ continue
264
+ }
265
+
266
+ tagBindingReq .TagBinding .TagValueNamespacedName = value
267
+ result , err := client .CreateTagBinding (ctx , tagBindingReq , getCreateCallOptions ()... )
268
+ if err != nil {
269
+ e , ok := err .(* apierror.APIError )
270
+ if ok && e .HTTPCode () == http .StatusConflict {
271
+ klog .Infof ("tag binding %s/%s already exists" , bucketName , value )
272
+ continue
273
+ }
274
+ errFlag = true
275
+ klog .Errorf ("request to add %s tag to %s bucket failed: %v" , value , bucketName , err )
276
+ continue
277
+ }
278
+
279
+ if _ , err = result .Wait (ctx ); err != nil {
280
+ errFlag = true
281
+ klog .Errorf ("failed to add %s tag to %s bucket: %v" , value , bucketName , err )
282
+ }
283
+ klog .V (1 ).Infof ("binding tag %s to %s bucket successful" , value , bucketName )
284
+ }
285
+ if errFlag {
286
+ return fmt .Errorf ("failed to add tag(s) to %s bucket" , bucketName )
287
+ }
288
+ return nil
289
+ }
0 commit comments