Skip to content

Commit 11f3d56

Browse files
committed
CFE-846 : Add user defined tags to the GCP buckets created
1 parent 6310bd0 commit 11f3d56

File tree

4 files changed

+280
-4
lines changed

4 files changed

+280
-4
lines changed

manifests/01-registry-credentials-request-gcs.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ spec:
1818
kind: GCPProviderSpec
1919
predefinedRoles:
2020
- roles/storage.admin
21+
- roles/resourcemanager.tagUser
2122
skipServiceCheck: true
2223
serviceAccountNames:
2324
- cluster-image-registry-operator

pkg/storage/gcs/gcp_labels_tags.go

Lines changed: 256 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,67 @@
11
package gcs
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
7+
"net/http"
8+
"strings"
9+
"time"
510

6-
"k8s.io/klog/v2"
7-
11+
configv1 "github.com/openshift/api/config/v1"
12+
imageregistryv1 "github.com/openshift/api/imageregistry/v1"
13+
operatorapi "github.com/openshift/api/operator/v1"
814
configlisters "github.com/openshift/client-go/config/listers/config/v1"
15+
regopclient "github.com/openshift/cluster-image-registry-operator/pkg/client"
16+
"github.com/openshift/cluster-image-registry-operator/pkg/defaults"
917
"github.com/openshift/cluster-image-registry-operator/pkg/storage/util"
18+
19+
rscmgr "cloud.google.com/go/resourcemanager/apiv3"
20+
rscmgrpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb"
21+
"github.com/googleapis/gax-go/v2"
22+
"github.com/googleapis/gax-go/v2/apierror"
23+
"golang.org/x/time/rate"
24+
"google.golang.org/api/iterator"
25+
"google.golang.org/api/option"
26+
27+
"k8s.io/klog/v2"
1028
)
1129

1230
const (
1331
// ocpDefaultLabelFmt is the format string for the default label
1432
// added to the OpenShift created GCP resources.
1533
ocpDefaultLabelFmt = "kubernetes-io-cluster-%s"
34+
35+
// gcpTagsSuccessStatusReason is the operator condition status reason
36+
// for successful tag operations.
37+
gcpTagsSuccessStatusReason = "SuccessTaggingBucket"
38+
39+
// gcpTagsFailedStatusReason is the operator condition status reason
40+
// for failed tag operations.
41+
gcpTagsFailedStatusReason = "ErrorTaggingBucket"
42+
43+
// gcpMaxTagsPerResource is the maximum number of tags that can
44+
// be attached to a resource.
45+
gcpMaxTagsPerResource = 50
46+
47+
// gcpTagsRequestRateLimit is the tag request rate limit per second.
48+
gcpTagsRequestRateLimit = 8
49+
50+
// gcpTagsRequestTokenBucketSize is the burst/token bucket size used
51+
// for limiting API requests.
52+
gcpTagsRequestTokenBucketSize = 8
53+
54+
// resourceManagerHostSubPath is the endpoint for tag requests.
55+
resourceManagerHostSubPath = "cloudresourcemanager.googleapis.com"
56+
57+
// bucketParentPathFmt is the string format for the parent path of a bucket resource
58+
bucketParentPathFmt = "//storage.googleapis.com/projects/_/buckets/%s"
1659
)
1760

1861
func getUserLabels(infraLister configlisters.InfrastructureLister) (map[string]string, error) {
1962
infra, err := util.GetInfrastructure(infraLister)
2063
if err != nil {
21-
klog.Errorf("getUserLabels: failed to read infrastructure/cluster resource: %w", err)
22-
return nil, err
64+
return nil, fmt.Errorf("getUserLabels: failed to read infrastructure/cluster resource: %w", err)
2365
}
2466
// add OCP default label along with user-defined labels
2567
labels := map[string]string{
@@ -35,3 +77,213 @@ func getUserLabels(infraLister configlisters.InfrastructureLister) (map[string]s
3577
}
3678
return labels, nil
3779
}
80+
81+
// newLimiter returns token bucket based request rate limiter after initializing
82+
// the passed values for limit, burst(or token bucket) size. If opted for emptyBucket
83+
// all initial tokens are reserved for the first burst.
84+
func newLimiter(limit, burst int, emptyBucket bool) *rate.Limiter {
85+
limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(limit)), burst)
86+
87+
if emptyBucket {
88+
limiter.AllowN(time.Now(), burst)
89+
}
90+
91+
return limiter
92+
}
93+
94+
// toTagValueList converts the tags to an array containing tagValues
95+
// NamespacedNames.
96+
func toTagValueList(tags []configv1.GCPResourceTag) []string {
97+
if len(tags) <= 0 {
98+
return nil
99+
}
100+
101+
list := make([]string, 0, len(tags))
102+
for _, tag := range tags {
103+
t := fmt.Sprintf("%s/%s/%s", tag.ParentID, tag.Key, tag.Value)
104+
list = append(list, t)
105+
}
106+
return list
107+
}
108+
109+
// getInfraResourceTagsList returns the user-defined tags present in the
110+
// status sub-resource of Infrastructure.
111+
func getInfraResourceTagsList(platformStatus *configv1.PlatformStatus) []configv1.GCPResourceTag {
112+
if platformStatus != nil && platformStatus.GCP != nil && platformStatus.GCP.ResourceTags != nil {
113+
return platformStatus.GCP.ResourceTags
114+
}
115+
klog.V(1).Infof("getInfraResourceTagsList: user-defined tag list is not provided")
116+
return nil
117+
}
118+
119+
// getTagsList returns the list of tags to apply on the resources.
120+
func getTagsList(platformStatus *configv1.PlatformStatus) []string {
121+
return toTagValueList(getInfraResourceTagsList(platformStatus))
122+
}
123+
124+
// getFilteredTagList returns the list of tags to apply on the resources after
125+
// filtering the tags already existing on a given resource.
126+
func getFilteredTagList(ctx context.Context, platformStatus *configv1.PlatformStatus, client *rscmgr.TagBindingsClient, parent string) []string {
127+
return filterTagList(ctx, client, parent, getTagsList(platformStatus))
128+
}
129+
130+
// filterTagList returns the filtered list of tags to apply on the resources.
131+
func filterTagList(ctx context.Context, client *rscmgr.TagBindingsClient, parent string, tagList []string) []string {
132+
dupTags := make(map[string]bool, len(tagList))
133+
for _, k := range tagList {
134+
dupTags[k] = false
135+
}
136+
137+
listBindingsReq := &rscmgrpb.ListEffectiveTagsRequest{
138+
Parent: parent,
139+
}
140+
bindings := client.ListEffectiveTags(ctx, listBindingsReq)
141+
// a resource can have a maximum of {gcpMaxTagsPerResource} tags attached to it.
142+
// Will iterate for {gcpMaxTagsPerResource} times in the worst case scenario, if
143+
// none of the break conditions are met. Should the {gcpMaxTagsPerResource} be
144+
// increased in future, it should not create an issue, since this is an optimization
145+
// attempt to reduce the number the tag write calls by skipping already existing tags,
146+
// since it has a quota restriction.
147+
for i := 0; i < gcpMaxTagsPerResource; i++ {
148+
binding, err := bindings.Next()
149+
if errors.Is(err, iterator.Done) {
150+
break
151+
}
152+
if err != nil || binding == nil {
153+
klog.V(4).Infof("failed to list effective tags on the %s bucket: %v: %v", parent, binding, err)
154+
break
155+
}
156+
tag := binding.GetNamespacedTagValue()
157+
if _, exist := dupTags[tag]; exist {
158+
dupTags[tag] = true
159+
klog.V(4).Infof("filterTagList: skipping tag %s already exists on the %s bucket", tag, parent)
160+
}
161+
}
162+
163+
filteredTags := make([]string, 0, len(tagList))
164+
for tagValue, dup := range dupTags {
165+
if !dup {
166+
filteredTags = append(filteredTags, tagValue)
167+
}
168+
}
169+
170+
return filteredTags
171+
}
172+
173+
// getCreateCallOptions returns a list of additional call options to use for
174+
// the create operations.
175+
func getCreateCallOptions() []gax.CallOption {
176+
return []gax.CallOption{
177+
gax.WithRetry(func() gax.Retryer {
178+
return gax.OnHTTPCodes(gax.Backoff{
179+
Initial: 90 * time.Second,
180+
Max: 5 * time.Minute,
181+
Multiplier: 2,
182+
},
183+
http.StatusTooManyRequests)
184+
}),
185+
}
186+
}
187+
188+
// getTagBindingsClient returns the client to be used for creating tag bindings to
189+
// the resources.
190+
func getTagBindingsClient(ctx context.Context, listers *regopclient.StorageListers, location string) (*rscmgr.TagBindingsClient, error) {
191+
cfg, err := GetConfig(listers)
192+
if err != nil {
193+
return nil, fmt.Errorf("getTagBindingsClient: failed to read gcp config: %w", err)
194+
}
195+
196+
endpoint := fmt.Sprintf("https://%s-%s", location, resourceManagerHostSubPath)
197+
opts := []option.ClientOption{
198+
option.WithCredentialsJSON([]byte(cfg.KeyfileData)),
199+
option.WithEndpoint(endpoint),
200+
}
201+
return rscmgr.NewTagBindingsRESTClient(ctx, opts...)
202+
}
203+
204+
// addTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
205+
// to the passed GCP bucket resource. It's wrapper around addUserTagsToStorageBucket()
206+
// additionally updates status condition.
207+
func addTagsToStorageBucket(ctx context.Context, cr *imageregistryv1.Config, listers *regopclient.StorageListers, bucketName, region string) error {
208+
if err := addUserTagsToStorageBucket(ctx, listers, bucketName, region); err != nil {
209+
util.UpdateCondition(cr, defaults.StorageTagged, operatorapi.ConditionFalse,
210+
gcpTagsFailedStatusReason, err.Error())
211+
return err
212+
}
213+
util.UpdateCondition(cr, defaults.StorageTagged, operatorapi.ConditionTrue,
214+
gcpTagsSuccessStatusReason,
215+
fmt.Sprintf("Successfully added user-defined tags to %s storage bucket", bucketName))
216+
return nil
217+
}
218+
219+
// addUserTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
220+
// to the passed GCP bucket resource.
221+
func addUserTagsToStorageBucket(ctx context.Context, listers *regopclient.StorageListers, bucketName, region string) error {
222+
// Tags are not supported for buckets located in the us-east2 and us-east3 regions.
223+
// https://cloud.google.com/storage/docs/tags-and-labels#tags
224+
if strings.ToLower(region) == "us-east2" ||
225+
strings.ToLower(region) == "us-east3" {
226+
klog.Infof("addUserTagsToStorageBucket: skip tagging bucket %s created in tags unsupported region %s", bucketName, region)
227+
return nil
228+
}
229+
230+
infra, err := util.GetInfrastructure(listers.Infrastructures)
231+
if err != nil {
232+
return fmt.Errorf("addUserTagsToStorageBucket: failed to read infrastructure/cluster resource: %w", err)
233+
}
234+
235+
client, err := getTagBindingsClient(ctx, listers, region)
236+
if err != nil || client == nil {
237+
return fmt.Errorf("failed to create tag binding client for adding tags to %s bucket: %v",
238+
bucketName, err)
239+
}
240+
defer client.Close()
241+
242+
parent := fmt.Sprintf(bucketParentPathFmt, bucketName)
243+
tagValues := getFilteredTagList(ctx, infra.Status.PlatformStatus, client, parent)
244+
if len(tagValues) <= 0 {
245+
return nil
246+
}
247+
248+
// GCP has a rate limit of 600 requests per minute, restricting
249+
// here to 8 requests per second.
250+
limiter := newLimiter(gcpTagsRequestRateLimit, gcpTagsRequestTokenBucketSize, true)
251+
252+
tagBindingReq := &rscmgrpb.CreateTagBindingRequest{
253+
TagBinding: &rscmgrpb.TagBinding{
254+
Parent: parent,
255+
},
256+
}
257+
errFlag := false
258+
for _, value := range tagValues {
259+
if err := limiter.Wait(ctx); err != nil {
260+
errFlag = true
261+
klog.Errorf("rate limiting request to add %s tag to %s bucket failed: %v",
262+
value, bucketName, err)
263+
continue
264+
}
265+
266+
tagBindingReq.TagBinding.TagValueNamespacedName = value
267+
result, err := client.CreateTagBinding(ctx, tagBindingReq, getCreateCallOptions()...)
268+
if err != nil {
269+
e, ok := err.(*apierror.APIError)
270+
if ok && e.HTTPCode() == http.StatusConflict {
271+
klog.Infof("tag binding %s/%s already exists", bucketName, value)
272+
continue
273+
}
274+
errFlag = true
275+
klog.Errorf("request to add %s tag to %s bucket failed: %v", value, bucketName, err)
276+
continue
277+
}
278+
279+
if _, err = result.Wait(ctx); err != nil {
280+
errFlag = true
281+
klog.Errorf("failed to add %s tag to %s bucket: %v", value, bucketName, err)
282+
}
283+
klog.V(1).Infof("binding tag %s to %s bucket successful", value, bucketName)
284+
}
285+
if errFlag {
286+
return fmt.Errorf("failed to add tag(s) to %s bucket", bucketName)
287+
}
288+
return nil
289+
}

pkg/storage/gcs/gcs.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ func (d *driver) StorageChanged(cr *imageregistryv1.Config) bool {
208208
return true
209209
}
210210

211+
// Previous status condition was set to failed because, of tag operation failure,
212+
// means bucket creation was successful and will retry adding tags.
213+
if cond := util.FetchCondition(cr, defaults.StorageTagged); cond.Status != operatorapi.ConditionTrue {
214+
return true
215+
}
216+
211217
return false
212218
}
213219

@@ -314,6 +320,7 @@ func (d *driver) CreateStorage(cr *imageregistryv1.Config) error {
314320
cr.Spec.Storage.GCS = d.Config.DeepCopy()
315321
}
316322
}
323+
return addTagsToStorageBucket(d.Context, cr, d.Listers, d.Config.Bucket, d.Config.Region)
317324
} else {
318325
if !reflect.DeepEqual(cr.Status.Storage.GCS, d.Config) {
319326
cr.Status.Storage = imageregistryv1.ImageRegistryConfigStorage{
@@ -322,6 +329,12 @@ func (d *driver) CreateStorage(cr *imageregistryv1.Config) error {
322329
}
323330
}
324331

332+
// Previous status condition was set to failed because, of tag operation failure,
333+
// means bucket creation was successful and hence will only try adding tags.
334+
if cond := util.FetchCondition(cr, defaults.StorageTagged); cond.Status != operatorapi.ConditionTrue {
335+
return addTagsToStorageBucket(d.Context, cr, d.Listers, d.Config.Bucket, d.Config.Region)
336+
}
337+
325338
return nil
326339
}
327340

pkg/storage/util/util.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ func UpdateCondition(cr *imageregistryv1.Config, conditionType string, status op
5959
cr.Status.Conditions = conditions
6060
}
6161

62+
// FetchCondition will return the provided condition.
63+
func FetchCondition(cr *imageregistryv1.Config, conditionType string) (c operatorapi.OperatorCondition) {
64+
for _, c = range cr.Status.Conditions {
65+
if conditionType == c.Type {
66+
return c
67+
}
68+
}
69+
return
70+
}
71+
6272
// GetInfrastructure gets information about the cloud platform that the cluster is
6373
// installed on including the Type, Region, and other platform specific information.
6474
func GetInfrastructure(lister configlisters.InfrastructureLister) (*configv1.Infrastructure, error) {

0 commit comments

Comments
 (0)