Skip to content
This repository was archived by the owner on Nov 7, 2019. It is now read-only.

Commit 0610ab8

Browse files
Adding code generated code
Signed-off-by: Christopher Hein <[email protected]>
1 parent 049f785 commit 0610ab8

File tree

9 files changed

+116
-69
lines changed

9 files changed

+116
-69
lines changed

configs/aws-service-operator.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,15 @@ items:
150150
- pods
151151
- configmaps
152152
- services
153+
- events
153154
verbs:
154155
- get
155156
- list
156157
- watch
157158
- create
158159
- delete
159160
- update
161+
- patch
160162
- apiGroups:
161163
- apiextensions.k8s.io
162164
resources:
@@ -203,7 +205,7 @@ items:
203205
template:
204206
metadata:
205207
annotations:
206-
iam.amazonaws.com/role: arn:aws:iam::<ACCOUNT_ID>:role/aws-service-operator
208+
iam.amazonaws.com/role: aws-service-operator
207209
labels:
208210
app: aws-service-operator
209211
spec:

pkg/operators/base/base.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,48 +10,59 @@ import (
1010
"github.com/awslabs/aws-service-operator/pkg/operators/snssubscription"
1111
"github.com/awslabs/aws-service-operator/pkg/operators/snstopic"
1212
"github.com/awslabs/aws-service-operator/pkg/operators/sqsqueue"
13+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1314
)
1415

1516
type base struct {
16-
config *config.Config
17+
config *config.Config
18+
queueManager *queuemanager.QueueManager
19+
cloudformationtemplate *cloudformationtemplate.Operator
20+
dynamodb *dynamodb.Operator
21+
ecrrepository *ecrrepository.Operator
22+
s3bucket *s3bucket.Operator
23+
snssubscription *snssubscription.Operator
24+
snstopic *snstopic.Operator
25+
sqsqueue *sqsqueue.Operator
1726
}
1827

1928
func New(
2029
config *config.Config,
30+
queueManager *queuemanager.QueueManager,
2131
) *base {
2232
return &base{
23-
config: config,
33+
config: config,
34+
queueManager: queueManager,
35+
cloudformationtemplate: cloudformationtemplate.NewOperator(config, queueManager),
36+
dynamodb: dynamodb.NewOperator(config, queueManager),
37+
ecrrepository: ecrrepository.NewOperator(config, queueManager),
38+
s3bucket: s3bucket.NewOperator(config, queueManager),
39+
snssubscription: snssubscription.NewOperator(config, queueManager),
40+
snstopic: snstopic.NewOperator(config, queueManager),
41+
sqsqueue: sqsqueue.NewOperator(config, queueManager),
2442
}
2543
}
2644

2745
func (b *base) Watch(ctx context.Context, namespace string) {
2846
if b.config.Resources["cloudformationtemplate"] {
29-
cloudformationtemplateoperator := cloudformationtemplate.NewOperator(b.config)
30-
go cloudformationtemplateoperator.StartWatch(ctx, namespace)
47+
go b.cloudformationtemplate.StartWatch(ctx, namespace)
3148
}
3249
if b.config.Resources["dynamodb"] {
33-
dynamodboperator := dynamodb.NewOperator(b.config)
34-
go dynamodboperator.StartWatch(ctx, namespace)
50+
go b.dynamodb.StartWatch(ctx, namespace)
3551
}
3652
if b.config.Resources["ecrrepository"] {
37-
ecrrepositoryoperator := ecrrepository.NewOperator(b.config)
38-
go ecrrepositoryoperator.StartWatch(ctx, namespace)
53+
go b.ecrrepository.StartWatch(ctx, namespace)
3954
}
4055
if b.config.Resources["s3bucket"] {
41-
s3bucketoperator := s3bucket.NewOperator(b.config)
42-
go s3bucketoperator.StartWatch(ctx, namespace)
56+
go b.s3bucket.StartWatch(ctx, namespace)
4357
}
4458
if b.config.Resources["snssubscription"] {
45-
snssubscriptionoperator := snssubscription.NewOperator(b.config)
46-
go snssubscriptionoperator.StartWatch(ctx, namespace)
59+
go b.snssubscription.StartWatch(ctx, namespace)
4760
}
4861
if b.config.Resources["snstopic"] {
49-
snstopicoperator := snstopic.NewOperator(b.config)
50-
go snstopicoperator.StartWatch(ctx, namespace)
62+
go b.snstopic.StartWatch(ctx, namespace)
5163
}
5264
if b.config.Resources["sqsqueue"] {
53-
sqsqueueoperator := sqsqueue.NewOperator(b.config)
54-
go sqsqueueoperator.StartWatch(ctx, namespace)
65+
go b.sqsqueue.StartWatch(ctx, namespace)
5566
}
5667

5768
<-ctx.Done()

pkg/operators/cloudformationtemplate/operator.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/awslabs/aws-service-operator/pkg/config"
1212
"github.com/awslabs/aws-service-operator/pkg/operator"
13+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1314
"k8s.io/client-go/tools/cache"
1415

1516
awsV1alpha1 "github.com/awslabs/aws-service-operator/pkg/apis/service-operator.aws/v1alpha1"
@@ -18,14 +19,17 @@ import (
1819

1920
// Operator represents a controller object for object store custom resources
2021
type Operator struct {
21-
config *config.Config
22-
topicARN string
22+
config *config.Config
23+
topicARN string
24+
queueManager *queuemanager.QueueManager
2325
}
2426

2527
// NewOperator create controller for watching object store custom resources created
26-
func NewOperator(config *config.Config) *Operator {
28+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
29+
2730
return &Operator{
28-
config: config,
31+
config: config,
32+
queueManager: queueManager,
2933
}
3034
}
3135

pkg/operators/dynamodb/operator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
"github.com/awslabs/aws-service-operator/pkg/operator"
1717
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1819
"github.com/iancoleman/strcase"
1920
corev1 "k8s.io/api/core/v1"
2021
"k8s.io/client-go/tools/cache"
@@ -25,14 +26,21 @@ import (
2526

2627
// Operator represents a controller object for object store custom resources
2728
type Operator struct {
28-
config *config.Config
29-
topicARN string
29+
config *config.Config
30+
topicARN string
31+
queueManager *queuemanager.QueueManager
3032
}
3133

3234
// NewOperator create controller for watching object store custom resources created
33-
func NewOperator(config *config.Config) *Operator {
35+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
36+
queuectrl := queue.New(config, config.AWSClientset, 10)
37+
topicARN, _ := queuectrl.Register("dynamodb")
38+
queueManager.Add(topicARN, queuemanager.HandlerFunc(QueueUpdater))
39+
3440
return &Operator{
35-
config: config,
41+
config: config,
42+
topicARN: topicARN,
43+
queueManager: queueManager,
3644
}
3745
}
3846

@@ -43,16 +51,13 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4351
UpdateFunc: c.onUpdate,
4452
DeleteFunc: c.onDelete,
4553
}
46-
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
47-
c.topicARN, _, _, _ = queuectrl.Register("dynamodb", &awsV1alpha1.DynamoDB{})
48-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
4954

5055
oper := operator.New("dynamodbs", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
5156
oper.Watch(&awsV1alpha1.DynamoDB{}, ctx.Done())
5257
}
5358

5459
// QueueUpdater will take the messages from the queue and process them
55-
func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
60+
func QueueUpdater(config *config.Config, msg *queuemanager.MessageBody) error {
5661
logger := config.Logger
5762
var name, namespace string
5863
if msg.Updatable {

pkg/operators/ecrrepository/operator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
"github.com/awslabs/aws-service-operator/pkg/operator"
1717
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1819
"github.com/iancoleman/strcase"
1920
corev1 "k8s.io/api/core/v1"
2021
"k8s.io/client-go/tools/cache"
@@ -25,14 +26,21 @@ import (
2526

2627
// Operator represents a controller object for object store custom resources
2728
type Operator struct {
28-
config *config.Config
29-
topicARN string
29+
config *config.Config
30+
topicARN string
31+
queueManager *queuemanager.QueueManager
3032
}
3133

3234
// NewOperator create controller for watching object store custom resources created
33-
func NewOperator(config *config.Config) *Operator {
35+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
36+
queuectrl := queue.New(config, config.AWSClientset, 10)
37+
topicARN, _ := queuectrl.Register("ecrrepository")
38+
queueManager.Add(topicARN, queuemanager.HandlerFunc(QueueUpdater))
39+
3440
return &Operator{
35-
config: config,
41+
config: config,
42+
topicARN: topicARN,
43+
queueManager: queueManager,
3644
}
3745
}
3846

@@ -43,16 +51,13 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4351
UpdateFunc: c.onUpdate,
4452
DeleteFunc: c.onDelete,
4553
}
46-
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
47-
c.topicARN, _, _, _ = queuectrl.Register("ecrrepository", &awsV1alpha1.ECRRepository{})
48-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
4954

5055
oper := operator.New("ecrrepositories", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
5156
oper.Watch(&awsV1alpha1.ECRRepository{}, ctx.Done())
5257
}
5358

5459
// QueueUpdater will take the messages from the queue and process them
55-
func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
60+
func QueueUpdater(config *config.Config, msg *queuemanager.MessageBody) error {
5661
logger := config.Logger
5762
var name, namespace string
5863
if msg.Updatable {

pkg/operators/s3bucket/operator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
"github.com/awslabs/aws-service-operator/pkg/operator"
1717
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1819
"github.com/iancoleman/strcase"
1920
corev1 "k8s.io/api/core/v1"
2021
"k8s.io/client-go/tools/cache"
@@ -25,14 +26,21 @@ import (
2526

2627
// Operator represents a controller object for object store custom resources
2728
type Operator struct {
28-
config *config.Config
29-
topicARN string
29+
config *config.Config
30+
topicARN string
31+
queueManager *queuemanager.QueueManager
3032
}
3133

3234
// NewOperator create controller for watching object store custom resources created
33-
func NewOperator(config *config.Config) *Operator {
35+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
36+
queuectrl := queue.New(config, config.AWSClientset, 10)
37+
topicARN, _ := queuectrl.Register("s3bucket")
38+
queueManager.Add(topicARN, queuemanager.HandlerFunc(QueueUpdater))
39+
3440
return &Operator{
35-
config: config,
41+
config: config,
42+
topicARN: topicARN,
43+
queueManager: queueManager,
3644
}
3745
}
3846

@@ -43,16 +51,13 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4351
UpdateFunc: c.onUpdate,
4452
DeleteFunc: c.onDelete,
4553
}
46-
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
47-
c.topicARN, _, _, _ = queuectrl.Register("s3bucket", &awsV1alpha1.S3Bucket{})
48-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
4954

5055
oper := operator.New("s3buckets", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
5156
oper.Watch(&awsV1alpha1.S3Bucket{}, ctx.Done())
5257
}
5358

5459
// QueueUpdater will take the messages from the queue and process them
55-
func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
60+
func QueueUpdater(config *config.Config, msg *queuemanager.MessageBody) error {
5661
logger := config.Logger
5762
var name, namespace string
5863
if msg.Updatable {

pkg/operators/snssubscription/operator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
"github.com/awslabs/aws-service-operator/pkg/operator"
1717
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1819
"github.com/iancoleman/strcase"
1920
corev1 "k8s.io/api/core/v1"
2021
"k8s.io/client-go/tools/cache"
@@ -25,14 +26,21 @@ import (
2526

2627
// Operator represents a controller object for object store custom resources
2728
type Operator struct {
28-
config *config.Config
29-
topicARN string
29+
config *config.Config
30+
topicARN string
31+
queueManager *queuemanager.QueueManager
3032
}
3133

3234
// NewOperator create controller for watching object store custom resources created
33-
func NewOperator(config *config.Config) *Operator {
35+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
36+
queuectrl := queue.New(config, config.AWSClientset, 10)
37+
topicARN, _ := queuectrl.Register("snssubscription")
38+
queueManager.Add(topicARN, queuemanager.HandlerFunc(QueueUpdater))
39+
3440
return &Operator{
35-
config: config,
41+
config: config,
42+
topicARN: topicARN,
43+
queueManager: queueManager,
3644
}
3745
}
3846

@@ -43,16 +51,13 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4351
UpdateFunc: c.onUpdate,
4452
DeleteFunc: c.onDelete,
4553
}
46-
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
47-
c.topicARN, _, _, _ = queuectrl.Register("snssubscription", &awsV1alpha1.SNSSubscription{})
48-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
4954

5055
oper := operator.New("snssubscriptions", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
5156
oper.Watch(&awsV1alpha1.SNSSubscription{}, ctx.Done())
5257
}
5358

5459
// QueueUpdater will take the messages from the queue and process them
55-
func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
60+
func QueueUpdater(config *config.Config, msg *queuemanager.MessageBody) error {
5661
logger := config.Logger
5762
var name, namespace string
5863
if msg.Updatable {

pkg/operators/snstopic/operator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
"github.com/awslabs/aws-service-operator/pkg/operator"
1717
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
"github.com/awslabs/aws-service-operator/pkg/queuemanager"
1819
"github.com/iancoleman/strcase"
1920
corev1 "k8s.io/api/core/v1"
2021
"k8s.io/client-go/tools/cache"
@@ -25,14 +26,21 @@ import (
2526

2627
// Operator represents a controller object for object store custom resources
2728
type Operator struct {
28-
config *config.Config
29-
topicARN string
29+
config *config.Config
30+
topicARN string
31+
queueManager *queuemanager.QueueManager
3032
}
3133

3234
// NewOperator create controller for watching object store custom resources created
33-
func NewOperator(config *config.Config) *Operator {
35+
func NewOperator(config *config.Config, queueManager *queuemanager.QueueManager) *Operator {
36+
queuectrl := queue.New(config, config.AWSClientset, 10)
37+
topicARN, _ := queuectrl.Register("snstopic")
38+
queueManager.Add(topicARN, queuemanager.HandlerFunc(QueueUpdater))
39+
3440
return &Operator{
35-
config: config,
41+
config: config,
42+
topicARN: topicARN,
43+
queueManager: queueManager,
3644
}
3745
}
3846

@@ -43,16 +51,13 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4351
UpdateFunc: c.onUpdate,
4452
DeleteFunc: c.onDelete,
4553
}
46-
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
47-
c.topicARN, _, _, _ = queuectrl.Register("snstopic", &awsV1alpha1.SNSTopic{})
48-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
4954

5055
oper := operator.New("snstopics", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
5156
oper.Watch(&awsV1alpha1.SNSTopic{}, ctx.Done())
5257
}
5358

5459
// QueueUpdater will take the messages from the queue and process them
55-
func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
60+
func QueueUpdater(config *config.Config, msg *queuemanager.MessageBody) error {
5661
logger := config.Logger
5762
var name, namespace string
5863
if msg.Updatable {

0 commit comments

Comments
 (0)