Skip to content

Commit 33d1650

Browse files
Added AWS TAGS reconcilation
1 parent f6587e5 commit 33d1650

File tree

4 files changed

+446
-7
lines changed

4 files changed

+446
-7
lines changed

pkg/defaults/defaults.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ const (
121121

122122
// CloudCABundleKey is the name of the CA bundle to use when interacting with the cloud API.
123123
CloudCABundleKey = "ca-bundle.pem"
124+
125+
// InfrastructureResourceName is the name of the infrastructure config resource
126+
InfrastructureResourceName = "cluster"
124127
)
125128

126129
var (

pkg/operator/aws.go

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
package operator
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"regexp"
8+
"strings"
9+
"time"
10+
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
k8sruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/apimachinery/pkg/util/wait"
14+
kubeinformers "k8s.io/client-go/informers"
15+
"k8s.io/client-go/tools/cache"
16+
"k8s.io/client-go/util/workqueue"
17+
"k8s.io/klog/v2"
18+
19+
configv1 "github.com/openshift/api/config/v1"
20+
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
21+
configinformers "github.com/openshift/client-go/config/informers/externalversions"
22+
imageregistryv1client "github.com/openshift/client-go/imageregistry/clientset/versioned/typed/imageregistry/v1"
23+
imageregistryinformers "github.com/openshift/client-go/imageregistry/informers/externalversions"
24+
routeinformers "github.com/openshift/client-go/route/informers/externalversions"
25+
"github.com/openshift/library-go/pkg/operator/configobserver/featuregates"
26+
"github.com/openshift/library-go/pkg/operator/events"
27+
28+
regopclient "github.com/openshift/cluster-image-registry-operator/pkg/client"
29+
"github.com/openshift/cluster-image-registry-operator/pkg/defaults"
30+
"github.com/openshift/cluster-image-registry-operator/pkg/storage/s3"
31+
)
32+
33+
// AWSController is for storing internal data required for
34+
// performing AWS controller operations
35+
type AWSController struct {
36+
infraConfigClient configv1client.InfrastructureInterface
37+
imageRegistryConfigClient imageregistryv1client.ConfigInterface
38+
listers *regopclient.Listers
39+
featureGateAccessor featuregates.FeatureGateAccess
40+
41+
event events.Recorder
42+
cachesToSync []cache.InformerSynced
43+
queue workqueue.RateLimitingInterface
44+
}
45+
46+
// tagKeyRegex is used to check that the keys and values of a tag contain only valid characters.
47+
var tagKeyRegex = regexp.MustCompile(`^[0-9A-Za-z_.:/=+-@]{1,128}$`)
48+
49+
// tagValRegex is used to check that the keys and values of a tag contain only valid characters.
50+
var tagValRegex = regexp.MustCompile(`^[0-9A-Za-z_.:/=+-@]{0,256}$`)
51+
52+
// kubernetesNamespaceRegex is used to check that a tag key is not in the kubernetes.io namespace.
53+
var kubernetesNamespaceRegex = regexp.MustCompile(`^([^/]*\.)?kubernetes.io/`)
54+
55+
// openshiftNamespaceRegex is used to check that a tag key is not in the openshift.io namespace.
56+
var openshiftNamespaceRegex = regexp.MustCompile(`^([^/]*\.)?openshift.io/`)
57+
58+
// NewAWSController is for obtaining AWSController object
59+
// required for invoking AWS controller methods.
60+
func NewAWSController(
61+
infraConfigClient configv1client.InfrastructureInterface,
62+
imageRegistryConfigClient imageregistryv1client.ConfigInterface,
63+
kubeInformerFactory kubeinformers.SharedInformerFactory,
64+
regopInformerFactory imageregistryinformers.SharedInformerFactory,
65+
routeInformerFactory routeinformers.SharedInformerFactory,
66+
configInformerFactory configinformers.SharedInformerFactory,
67+
openshiftConfigKubeInformerFactory kubeinformers.SharedInformerFactory,
68+
openshiftConfigManagedKubeInformerFactory kubeinformers.SharedInformerFactory,
69+
eventRecorder events.Recorder,
70+
featureGateAccessor featuregates.FeatureGateAccess,
71+
) (*AWSController, error) {
72+
c := &AWSController{
73+
infraConfigClient: infraConfigClient,
74+
imageRegistryConfigClient: imageRegistryConfigClient,
75+
featureGateAccessor: featureGateAccessor,
76+
event: eventRecorder,
77+
queue: workqueue.NewNamedRateLimitingQueue(
78+
workqueue.DefaultControllerRateLimiter(),
79+
"AWSController"),
80+
}
81+
82+
infraConfig := configInformerFactory.Config().V1().Infrastructures()
83+
// list of Listers requied by S3 package NewDriver method
84+
c.listers = &regopclient.Listers{
85+
Deployments: kubeInformerFactory.Apps().V1().Deployments().
86+
Lister().Deployments(defaults.ImageRegistryOperatorNamespace),
87+
Services: kubeInformerFactory.Core().V1().Services().
88+
Lister().Services(defaults.ImageRegistryOperatorNamespace),
89+
ConfigMaps: kubeInformerFactory.Core().V1().ConfigMaps().
90+
Lister().ConfigMaps(defaults.ImageRegistryOperatorNamespace),
91+
ServiceAccounts: kubeInformerFactory.Core().V1().ServiceAccounts().
92+
Lister().ServiceAccounts(defaults.ImageRegistryOperatorNamespace),
93+
PodDisruptionBudgets: kubeInformerFactory.Policy().V1().PodDisruptionBudgets().
94+
Lister().PodDisruptionBudgets(defaults.ImageRegistryOperatorNamespace),
95+
Routes: routeInformerFactory.Route().V1().Routes().
96+
Lister().Routes(defaults.ImageRegistryOperatorNamespace),
97+
ClusterRoles: kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
98+
ClusterRoleBindings: kubeInformerFactory.Rbac().V1().ClusterRoleBindings().Lister(),
99+
ProxyConfigs: configInformerFactory.Config().V1().Proxies().Lister(),
100+
RegistryConfigs: regopInformerFactory.Imageregistry().V1().Configs().Lister(),
101+
StorageListers: regopclient.StorageListers{
102+
Secrets: kubeInformerFactory.Core().V1().Secrets().
103+
Lister().Secrets(defaults.ImageRegistryOperatorNamespace),
104+
OpenShiftConfig: openshiftConfigKubeInformerFactory.Core().V1().ConfigMaps().
105+
Lister().ConfigMaps(defaults.OpenShiftConfigNamespace),
106+
OpenShiftConfigManaged: openshiftConfigManagedKubeInformerFactory.Core().V1().ConfigMaps().
107+
Lister().ConfigMaps(defaults.OpenShiftConfigManagedNamespace),
108+
Infrastructures: infraConfig.Lister(),
109+
},
110+
}
111+
112+
_, err := infraConfig.Informer().AddEventHandler(c.eventHandler())
113+
if err != nil {
114+
return nil, err
115+
}
116+
c.cachesToSync = append(c.cachesToSync, infraConfig.Informer().HasSynced)
117+
return c, nil
118+
}
119+
120+
// eventHandler is the callback method for handling events from informer
121+
func (c *AWSController) eventHandler() cache.ResourceEventHandler {
122+
const workQueueKey = "aws"
123+
return cache.ResourceEventHandlerFuncs{
124+
AddFunc: func(obj interface{}) {
125+
infra, ok := obj.(*configv1.Infrastructure)
126+
if !ok || infra == nil {
127+
return
128+
}
129+
if infra.Status.PlatformStatus.AWS != nil && len(infra.Status.PlatformStatus.AWS.ResourceTags) != 0 {
130+
c.queue.Add(workQueueKey)
131+
return
132+
}
133+
},
134+
UpdateFunc: func(prev, cur interface{}) {
135+
oldInfra, ok := prev.(*configv1.Infrastructure)
136+
if !ok || oldInfra == nil {
137+
return
138+
}
139+
newInfra, ok := cur.(*configv1.Infrastructure)
140+
if !ok || newInfra == nil {
141+
return
142+
}
143+
if oldInfra.Status.PlatformStatus.AWS != nil && newInfra.Status.PlatformStatus.AWS != nil {
144+
if !reflect.DeepEqual(oldInfra.Status.PlatformStatus.AWS.ResourceTags, newInfra.Status.PlatformStatus.AWS.ResourceTags) {
145+
c.queue.Add(workQueueKey)
146+
return
147+
}
148+
}
149+
},
150+
}
151+
}
152+
153+
// Run is the main method for starting the AWS controller
154+
func (c *AWSController) Run(ctx context.Context) {
155+
defer k8sruntime.HandleCrash()
156+
defer c.queue.ShutDown()
157+
158+
klog.Infof("Starting AWS Controller")
159+
if !cache.WaitForCacheSync(ctx.Done(), c.cachesToSync...) {
160+
return
161+
}
162+
163+
go wait.Until(c.runWorker, time.Second, ctx.Done())
164+
165+
klog.Infof("Started AWS Controller")
166+
<-ctx.Done()
167+
klog.Infof("Shutting down AWS Controller")
168+
}
169+
170+
func (c *AWSController) runWorker() {
171+
for c.processNextWorkItem() {
172+
}
173+
}
174+
175+
// processNextWorkItem is for prcessing the event received
176+
// which blocks until a new item is received
177+
func (c *AWSController) processNextWorkItem() bool {
178+
obj, shutdown := c.queue.Get()
179+
if shutdown {
180+
return false
181+
}
182+
defer c.queue.Done(obj)
183+
184+
klog.V(5).Infof("AWSController: got event from workqueue")
185+
if err := c.sync(); err != nil {
186+
c.queue.AddRateLimited(workqueueKey)
187+
klog.Errorf("AWSController: failed to process event: %s, requeuing", err)
188+
} else {
189+
c.queue.Forget(obj)
190+
klog.V(5).Infof("AWSController: event from workqueue successfully processed")
191+
}
192+
return true
193+
}
194+
195+
// sync method is defined for handling the operations required
196+
// on receiving a informer event.
197+
// Fetches image registry config data, required for obtaining
198+
// the S3 bucket configuration and creating a driver out of it
199+
func (c *AWSController) sync() error {
200+
return c.syncTags()
201+
}
202+
203+
// syncTags fetches user tags from Infrastructure resource, which
204+
// is then compared with the tags configured for the created S3 bucket
205+
// fetched using the driver object passed and updates if any new tags.
206+
func (c *AWSController) syncTags() error {
207+
cr, err := c.imageRegistryConfigClient.Get(
208+
context.Background(),
209+
defaults.ImageRegistryResourceName,
210+
metav1.GetOptions{},
211+
)
212+
if err != nil {
213+
return err
214+
}
215+
216+
// if s3 storage config is missing, must be
217+
// non-AWS platform, so not treating it as error
218+
if cr.Spec.Storage.S3 == nil {
219+
return nil
220+
}
221+
222+
// make a copy to avoid changing the cached data
223+
cr = cr.DeepCopy()
224+
225+
infra, err := c.infraConfigClient.Get(
226+
context.Background(),
227+
defaults.InfrastructureResourceName,
228+
metav1.GetOptions{},
229+
)
230+
if err != nil {
231+
klog.Errorf("failed to fetch Infrastructure resource: %v", err)
232+
return err
233+
}
234+
235+
// tags deletion is not supported. Should the user remove it from
236+
// PlatformStatus will be looked up for retaining the tag
237+
infraTagSet := make(map[string]string)
238+
mergePlatformStatusTags(infra, infraTagSet)
239+
klog.V(5).Infof("tags read from Infrastructure resource: %v", infraTagSet)
240+
241+
// Create a driver with the current configuration
242+
ctx := context.Background()
243+
driver := s3.NewDriver(ctx, cr.Spec.Storage.S3, &c.listers.StorageListers, c.featureGateAccessor)
244+
245+
s3TagSet, err := driver.GetStorageTags()
246+
if err != nil {
247+
klog.Errorf("failed to fetch storage tags: %v", err)
248+
return err
249+
}
250+
klog.V(5).Infof("tags read from storage resource: %v", s3TagSet)
251+
252+
tagUpdatedCount := compareS3InfraTagSet(s3TagSet, infraTagSet)
253+
if tagUpdatedCount > 0 {
254+
if err := driver.PutStorageTags(s3TagSet); err != nil {
255+
klog.Errorf("failed to update/delete tagset of %s s3 bucket: %v", driver.ID(), err)
256+
c.event.Warningf("UpdateAWSTags",
257+
"Failed to update/delete tagset of %s s3 bucket", driver.ID())
258+
}
259+
klog.Infof("successfully updated/deleted %d tags, tagset: %+v", tagUpdatedCount, s3TagSet)
260+
c.event.Eventf("UpdateAWSTags",
261+
"Successfully updated/deleted tagset of %s s3 bucket", driver.ID())
262+
}
263+
264+
return nil
265+
}
266+
267+
// mergePlatformStatusTags is for reading and merging user tags present in both
268+
// Platform Status of Infrastructure config.
269+
// There could be scenarios(upgrade, user deletes) where user tags could be missing
270+
// from the Platform Status, hence using Status too to avoid said scenarios.
271+
// If a tag exists in Status.
272+
func mergePlatformStatusTags(infra *configv1.Infrastructure, infraTagSet map[string]string) {
273+
for _, statusTags := range infra.Status.PlatformStatus.AWS.ResourceTags {
274+
if err := validateUserTag(statusTags.Key, statusTags.Value); err != nil {
275+
klog.Warningf("validation failed for tag(%s:%s): %v", statusTags.Key, statusTags.Value, err)
276+
continue
277+
}
278+
infraTagSet[statusTags.Key] = statusTags.Value
279+
}
280+
}
281+
282+
// compareS3InfraTagSet is for comparing the tags obtained from S3 bucket and Infrastructure CR
283+
// to find if any new tags have been deleted, added or existing tags modified.
284+
func compareS3InfraTagSet(s3TagSet map[string]string, infraTagSet map[string]string) (tagUpdatedCount int) {
285+
for key, value := range infraTagSet {
286+
// If a tag is value is empty, it's marked for deletion
287+
// and is deleted from the list obtained from S3 bucket
288+
if value == "" {
289+
klog.V(5).Infof("%s tag will be deleted", key)
290+
delete(s3TagSet, key)
291+
tagUpdatedCount++
292+
continue
293+
}
294+
val, ok := s3TagSet[key]
295+
if !ok || val != value {
296+
klog.V(5).Infof("%s tag will be added/updated with value %s", key, value)
297+
s3TagSet[key] = value
298+
tagUpdatedCount++
299+
}
300+
}
301+
return
302+
}
303+
304+
// validateUserTag is for validating the user defined tags in Infrastructure CR
305+
func validateUserTag(key, value string) error {
306+
if !tagKeyRegex.MatchString(key) {
307+
return fmt.Errorf("key has invalid characters or length")
308+
}
309+
if strings.EqualFold(key, "Name") {
310+
return fmt.Errorf("key cannot be customized by user")
311+
}
312+
if !tagValRegex.MatchString(value) {
313+
return fmt.Errorf("value has invalid characters or length")
314+
}
315+
if kubernetesNamespaceRegex.MatchString(key) {
316+
return fmt.Errorf("key is in the kubernetes.io namespace")
317+
}
318+
if openshiftNamespaceRegex.MatchString(key) {
319+
return fmt.Errorf("key is in the openshift.io namespace")
320+
}
321+
return nil
322+
}

0 commit comments

Comments
 (0)