Skip to content

Commit 8c08b64

Browse files
Merge pull request #1121 from anirudhAgniRedhat/AWS_TAG_DAY2_reconcile
CFE-1129: Added AWS TAGS reconciliation
2 parents bf2a0d5 + b60d767 commit 8c08b64

File tree

4 files changed

+412
-6
lines changed

4 files changed

+412
-6
lines changed

pkg/defaults/defaults.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ const (
121121

122122
// CloudCABundleKey is the name of the CA bundle to use when interacting with the cloud API.
123123
CloudCABundleKey = "ca-bundle.pem"
124+
125+
// InfrastructureResourceName is the name of the infrastructure config resource
126+
InfrastructureResourceName = "cluster"
124127
)
125128

126129
var (

pkg/operator/awstagcontroller.go

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
package operator
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"regexp"
8+
"strings"
9+
"time"
10+
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
k8sruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/apimachinery/pkg/util/wait"
14+
kubeinformers "k8s.io/client-go/informers"
15+
"k8s.io/client-go/tools/cache"
16+
"k8s.io/client-go/util/workqueue"
17+
"k8s.io/klog/v2"
18+
19+
configv1 "github.com/openshift/api/config/v1"
20+
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
21+
configinformers "github.com/openshift/client-go/config/informers/externalversions"
22+
imageregistryv1client "github.com/openshift/client-go/imageregistry/clientset/versioned/typed/imageregistry/v1"
23+
imageregistryinformers "github.com/openshift/client-go/imageregistry/informers/externalversions"
24+
routeinformers "github.com/openshift/client-go/route/informers/externalversions"
25+
"github.com/openshift/library-go/pkg/operator/configobserver/featuregates"
26+
"github.com/openshift/library-go/pkg/operator/events"
27+
28+
regopclient "github.com/openshift/cluster-image-registry-operator/pkg/client"
29+
"github.com/openshift/cluster-image-registry-operator/pkg/defaults"
30+
"github.com/openshift/cluster-image-registry-operator/pkg/storage/s3"
31+
)
32+
33+
// AWSTagController is for storing internal data required for
34+
// performing AWS controller operations
35+
type AWSTagController struct {
36+
infraConfigClient configv1client.InfrastructureInterface
37+
imageRegistryConfigClient imageregistryv1client.ConfigInterface
38+
listers *regopclient.Listers
39+
featureGateAccessor featuregates.FeatureGateAccess
40+
41+
event events.Recorder
42+
cachesToSync []cache.InformerSynced
43+
queue workqueue.RateLimitingInterface
44+
}
45+
46+
// tagKeyRegex is used to check that the keys and values of a tag contain only valid characters.
47+
var tagKeyRegex = regexp.MustCompile(`^[0-9A-Za-z_.:/=+-@]{1,128}$`)
48+
49+
// tagValRegex is used to check that the keys and values of a tag contain only valid characters.
50+
var tagValRegex = regexp.MustCompile(`^[0-9A-Za-z_.:/=+-@]{0,256}$`)
51+
52+
// kubernetesNamespaceRegex is used to check that a tag key is not in the kubernetes.io namespace.
53+
var kubernetesNamespaceRegex = regexp.MustCompile(`^([^/]*\.)?kubernetes.io/`)
54+
55+
// openshiftNamespaceRegex is used to check that a tag key is not in the openshift.io namespace.
56+
var openshiftNamespaceRegex = regexp.MustCompile(`^([^/]*\.)?openshift.io/`)
57+
58+
// NewAWSTagController is for obtaining AWSTagController object
59+
// required for invoking AWS Tag controller methods.
60+
func NewAWSTagController(
61+
infraConfigClient configv1client.InfrastructureInterface,
62+
imageRegistryConfigClient imageregistryv1client.ConfigInterface,
63+
kubeInformerFactory kubeinformers.SharedInformerFactory,
64+
regopInformerFactory imageregistryinformers.SharedInformerFactory,
65+
routeInformerFactory routeinformers.SharedInformerFactory,
66+
configInformerFactory configinformers.SharedInformerFactory,
67+
openshiftConfigKubeInformerFactory kubeinformers.SharedInformerFactory,
68+
openshiftConfigManagedKubeInformerFactory kubeinformers.SharedInformerFactory,
69+
eventRecorder events.Recorder,
70+
featureGateAccessor featuregates.FeatureGateAccess,
71+
) (*AWSTagController, error) {
72+
c := &AWSTagController{
73+
infraConfigClient: infraConfigClient,
74+
imageRegistryConfigClient: imageRegistryConfigClient,
75+
featureGateAccessor: featureGateAccessor,
76+
event: eventRecorder,
77+
queue: workqueue.NewNamedRateLimitingQueue(
78+
workqueue.DefaultControllerRateLimiter(),
79+
"AWSTagController"),
80+
}
81+
82+
infraConfig := configInformerFactory.Config().V1().Infrastructures()
83+
// list of Listers requied by S3 package NewDriver method
84+
c.listers = &regopclient.Listers{
85+
Deployments: kubeInformerFactory.Apps().V1().Deployments().
86+
Lister().Deployments(defaults.ImageRegistryOperatorNamespace),
87+
Services: kubeInformerFactory.Core().V1().Services().
88+
Lister().Services(defaults.ImageRegistryOperatorNamespace),
89+
ConfigMaps: kubeInformerFactory.Core().V1().ConfigMaps().
90+
Lister().ConfigMaps(defaults.ImageRegistryOperatorNamespace),
91+
ServiceAccounts: kubeInformerFactory.Core().V1().ServiceAccounts().
92+
Lister().ServiceAccounts(defaults.ImageRegistryOperatorNamespace),
93+
PodDisruptionBudgets: kubeInformerFactory.Policy().V1().PodDisruptionBudgets().
94+
Lister().PodDisruptionBudgets(defaults.ImageRegistryOperatorNamespace),
95+
Routes: routeInformerFactory.Route().V1().Routes().
96+
Lister().Routes(defaults.ImageRegistryOperatorNamespace),
97+
ClusterRoles: kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
98+
ClusterRoleBindings: kubeInformerFactory.Rbac().V1().ClusterRoleBindings().Lister(),
99+
ProxyConfigs: configInformerFactory.Config().V1().Proxies().Lister(),
100+
RegistryConfigs: regopInformerFactory.Imageregistry().V1().Configs().Lister(),
101+
StorageListers: regopclient.StorageListers{
102+
Secrets: kubeInformerFactory.Core().V1().Secrets().
103+
Lister().Secrets(defaults.ImageRegistryOperatorNamespace),
104+
OpenShiftConfig: openshiftConfigKubeInformerFactory.Core().V1().ConfigMaps().
105+
Lister().ConfigMaps(defaults.OpenShiftConfigNamespace),
106+
OpenShiftConfigManaged: openshiftConfigManagedKubeInformerFactory.Core().V1().ConfigMaps().
107+
Lister().ConfigMaps(defaults.OpenShiftConfigManagedNamespace),
108+
Infrastructures: infraConfig.Lister(),
109+
},
110+
}
111+
112+
_, err := infraConfig.Informer().AddEventHandler(c.eventHandler())
113+
if err != nil {
114+
return nil, err
115+
}
116+
c.cachesToSync = append(c.cachesToSync, infraConfig.Informer().HasSynced)
117+
return c, nil
118+
}
119+
120+
// eventHandler is the callback method for handling events from informer
121+
func (c *AWSTagController) eventHandler() cache.ResourceEventHandler {
122+
const workQueueKey = "aws"
123+
return cache.ResourceEventHandlerFuncs{
124+
AddFunc: func(obj interface{}) {
125+
infra, ok := obj.(*configv1.Infrastructure)
126+
if !ok || infra == nil {
127+
return
128+
}
129+
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.AWS != nil &&
130+
len(infra.Status.PlatformStatus.AWS.ResourceTags) != 0 {
131+
c.queue.Add(workQueueKey)
132+
return
133+
}
134+
},
135+
UpdateFunc: func(prev, cur interface{}) {
136+
oldInfra, ok := prev.(*configv1.Infrastructure)
137+
if !ok || oldInfra == nil {
138+
return
139+
}
140+
newInfra, ok := cur.(*configv1.Infrastructure)
141+
if !ok || newInfra == nil {
142+
return
143+
}
144+
if oldInfra.Status.PlatformStatus != nil && oldInfra.Status.PlatformStatus.AWS != nil &&
145+
newInfra.Status.PlatformStatus != nil && newInfra.Status.PlatformStatus.AWS != nil {
146+
if !reflect.DeepEqual(oldInfra.Status.PlatformStatus.AWS.ResourceTags, newInfra.Status.PlatformStatus.AWS.ResourceTags) {
147+
c.queue.Add(workQueueKey)
148+
return
149+
}
150+
}
151+
},
152+
}
153+
}
154+
155+
// Run is the main method for starting the AWS controller
156+
func (c *AWSTagController) Run(ctx context.Context) {
157+
defer k8sruntime.HandleCrash()
158+
defer c.queue.ShutDown()
159+
160+
klog.Infof("Starting AWS Tag Controller")
161+
if !cache.WaitForCacheSync(ctx.Done(), c.cachesToSync...) {
162+
return
163+
}
164+
165+
go wait.Until(c.runWorker, time.Second, ctx.Done())
166+
167+
klog.Infof("Started AWS Tag Controller")
168+
<-ctx.Done()
169+
klog.Infof("Shutting down AWS Tag Controller")
170+
}
171+
172+
func (c *AWSTagController) runWorker() {
173+
for c.processNextWorkItem() {
174+
}
175+
}
176+
177+
// processNextWorkItem is for prcessing the event received
178+
// which blocks until a new item is received
179+
func (c *AWSTagController) processNextWorkItem() bool {
180+
obj, shutdown := c.queue.Get()
181+
if shutdown {
182+
return false
183+
}
184+
defer c.queue.Done(obj)
185+
186+
klog.V(5).Infof("AWSTagController: got event from workqueue")
187+
if err := c.sync(); err != nil {
188+
c.queue.AddRateLimited(workqueueKey)
189+
klog.Errorf("AWSTagController: failed to process event: %s, requeuing", err)
190+
} else {
191+
c.queue.Forget(obj)
192+
klog.V(5).Infof("AWSTagController: event from workqueue successfully processed")
193+
}
194+
return true
195+
}
196+
197+
// sync method is defined for handling the operations required
198+
// on receiving a informer event.
199+
// Fetches image registry config data, required for obtaining
200+
// the S3 bucket configuration and creating a driver out of it
201+
func (c *AWSTagController) sync() error {
202+
return c.syncTags()
203+
}
204+
205+
// syncTags fetches user tags from Infrastructure resource, which
206+
// is then compared with the tags configured for the created S3 bucket
207+
// fetched using the driver object passed and updates if any new tags.
208+
func (c *AWSTagController) syncTags() error {
209+
cr, err := c.imageRegistryConfigClient.Get(
210+
context.Background(),
211+
defaults.ImageRegistryResourceName,
212+
metav1.GetOptions{},
213+
)
214+
if err != nil {
215+
return err
216+
}
217+
218+
// if s3 storage config is missing, must be
219+
// non-AWS platform, so not treating it as error
220+
if cr.Spec.Storage.S3 == nil {
221+
return nil
222+
}
223+
224+
// make a copy to avoid changing the cached data
225+
cr = cr.DeepCopy()
226+
227+
infra, err := c.infraConfigClient.Get(
228+
context.Background(),
229+
defaults.InfrastructureResourceName,
230+
metav1.GetOptions{},
231+
)
232+
if err != nil {
233+
klog.Errorf("failed to fetch Infrastructure resource: %v", err)
234+
return err
235+
}
236+
237+
// Filtering tags based on validation
238+
infraTagSet := filterPlatformStatusTags(infra)
239+
klog.V(5).Infof("tags read from Infrastructure resource: %v", infraTagSet)
240+
241+
// Create a driver with the current configuration
242+
ctx := context.Background()
243+
driver := s3.NewDriver(ctx, cr.Spec.Storage.S3, &c.listers.StorageListers, c.featureGateAccessor)
244+
245+
s3TagSet, err := driver.GetStorageTags()
246+
if err != nil {
247+
klog.Errorf("failed to fetch storage tags: %v", err)
248+
return err
249+
}
250+
klog.Infof("tags read from storage resource: %v", s3TagSet)
251+
252+
tagUpdatedCount := syncInfraTags(s3TagSet, infraTagSet)
253+
if tagUpdatedCount > 0 {
254+
if err := driver.PutStorageTags(s3TagSet); err != nil {
255+
klog.Errorf("failed to update/append tagset of %s s3 bucket: %v", driver.ID(), err)
256+
c.event.Warningf("UpdateAWSTags",
257+
"Failed to update/append tagset of %s s3 bucket", driver.ID())
258+
}
259+
klog.Infof("successfully updated/appended %d tags, tagset: %+v", tagUpdatedCount, s3TagSet)
260+
c.event.Eventf("UpdateAWSTags",
261+
"Successfully updated tagset of %s s3 bucket", driver.ID())
262+
}
263+
264+
return nil
265+
}
266+
267+
// filterPlatformStatusTags is for reading and filter user tags present in
268+
// Platform Status of Infrastructure config.
269+
func filterPlatformStatusTags(infra *configv1.Infrastructure) map[string]string {
270+
infraTagSet := map[string]string{}
271+
for _, statusTags := range infra.Status.PlatformStatus.AWS.ResourceTags {
272+
if err := validateUserTag(statusTags.Key, statusTags.Value); err != nil {
273+
klog.Warningf("validation failed for tag(%s:%s): %v", statusTags.Key, statusTags.Value, err)
274+
continue
275+
}
276+
infraTagSet[statusTags.Key] = statusTags.Value
277+
}
278+
return infraTagSet
279+
}
280+
281+
// syncInfraTags synchronizes the tags obtained from S3 bucket and Infrastructure CR.
282+
// this modifies the s3TagSet based on new tags which are added and update the value to a key if it has changed.
283+
func syncInfraTags(s3TagSet map[string]string, infraTagSet map[string]string) int {
284+
tagUpdatedCount := 0
285+
for key, value := range infraTagSet {
286+
val, ok := s3TagSet[key]
287+
if !ok || val != value {
288+
klog.V(5).Infof("%s tag will be added/updated with value %s", key, value)
289+
s3TagSet[key] = value
290+
tagUpdatedCount++
291+
}
292+
}
293+
return tagUpdatedCount
294+
}
295+
296+
// validateUserTag is for validating the user defined tags in Infrastructure CR
297+
func validateUserTag(key, value string) error {
298+
if !tagKeyRegex.MatchString(key) {
299+
return fmt.Errorf("key has invalid characters or length")
300+
}
301+
if strings.EqualFold(key, "Name") {
302+
return fmt.Errorf("name key is not allowed for user defined tags")
303+
}
304+
if !tagValRegex.MatchString(value) {
305+
return fmt.Errorf("value has invalid characters or length")
306+
}
307+
if kubernetesNamespaceRegex.MatchString(key) {
308+
return fmt.Errorf("key is in the kubernetes.io namespace")
309+
}
310+
if openshiftNamespaceRegex.MatchString(key) {
311+
return fmt.Errorf("key is in the openshift.io namespace")
312+
}
313+
return nil
314+
}

pkg/operator/starter.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package operator
22

33
import (
44
"context"
5-
"fmt"
5+
"errors"
66
"time"
77

88
kubeinformers "k8s.io/client-go/informers"
@@ -88,7 +88,7 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error {
8888
klog.Infof("FeatureGates initialized: knownFeatureGates=%v", featureGates.KnownFeatures())
8989
case <-time.After(1 * time.Minute):
9090
klog.Errorf("timed out waiting for FeatureGate detection")
91-
return fmt.Errorf("timed out waiting for FeatureGate detection")
91+
return errors.New("timed out waiting for FeatureGate detection")
9292
}
9393

9494
controller, err := NewController(
@@ -217,6 +217,22 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error {
217217
return err
218218
}
219219

220+
awsTagController, err := NewAWSTagController(
221+
configClient.ConfigV1().Infrastructures(),
222+
imageregistryClient.ImageregistryV1().Configs(),
223+
kubeInformers,
224+
imageregistryInformers,
225+
routeInformers,
226+
configInformers,
227+
kubeInformersForOpenShiftConfig,
228+
kubeInformersForOpenShiftConfigManaged,
229+
eventRecorder,
230+
featureGateAccessor,
231+
)
232+
if err != nil {
233+
return err
234+
}
235+
220236
metricsController := NewMetricsController(imageInformers.Image().V1().ImageStreams())
221237

222238
kubeInformers.Start(ctx.Done())
@@ -237,6 +253,7 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error {
237253
go loggingController.Run(ctx, 1)
238254
go azureStackCloudController.Run(ctx)
239255
go azurePathFixController.Run(ctx.Done())
256+
go awsTagController.Run(ctx)
240257
go metricsController.Run(ctx)
241258

242259
<-ctx.Done()

0 commit comments

Comments
 (0)