@@ -14,12 +14,13 @@ import (
14
14
{{- end}}
15
15
16
16
" github.com/awslabs/aws-service-operator/pkg/config"
17
- {{- if .Spec .Queue }}
18
- " github.com/awslabs/aws-service-operator/pkg/queue"
19
- corev1 " k8s.io/api/core/v1"
20
- " github.com/iancoleman/strcase"
21
- " strings"
22
- awsclient " github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/typed/service-operator.aws/v1alpha1"
17
+ " github.com/awslabs/aws-service-operator/pkg/queuemanager"
18
+ {{- if .Spec .Queue }}
19
+ corev1 " k8s.io/api/core/v1"
20
+ " github.com/iancoleman/strcase"
21
+ " strings"
22
+ awsclient " github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/typed/service-operator.aws/v1alpha1"
23
+ " github.com/awslabs/aws-service-operator/pkg/queue"
23
24
{{- end}}
24
25
" github.com/awslabs/aws-service-operator/pkg/operator"
25
26
" k8s.io/client-go/tools/cache"
@@ -34,12 +35,23 @@ import (
34
35
type Operator struct {
35
36
config *config.Config
36
37
topicARN string
38
+ queueManager *queuemanager.QueueManager
37
39
}
38
40
39
41
// NewOperator create controller for watching object store custom resources created
40
- func NewOperator (config *config .Config ) *Operator {
42
+ func NewOperator (config *config .Config , queueManager *queuemanager .QueueManager ) *Operator {
43
+ {{- if .Spec .Queue }}
44
+ queuectrl := queue.New (config, config.AWSClientset , 10 )
45
+ topicARN , _ := queuectrl.Register (" {{.Spec.Resource.Name}}" )
46
+ queueManager.Add (topicARN, queuemanager.HandlerFunc (QueueUpdater))
47
+ {{- end}}
48
+
41
49
return &Operator{
42
50
config: config,
51
+ {{- if .Spec .Queue }}
52
+ topicARN: topicARN,
53
+ {{- end}}
54
+ queueManager: queueManager,
43
55
}
44
56
}
45
57
@@ -50,19 +62,14 @@ func (c *Operator) StartWatch(ctx context.Context, namespace string) {
50
62
UpdateFunc: c.onUpdate ,
51
63
DeleteFunc: c.onDelete ,
52
64
}
53
- {{- if .Spec .Queue }}
54
- queuectrl := queue.New (c.config , c.config .AWSClientset , 1 )
55
- c.topicARN , _, _, _ = queuectrl.Register (" {{.Spec.Resource.Name}}" , &awsV1alpha1.{{.Spec .Kind }}{})
56
- go queuectrl.StartWatch (queue.HandlerFunc (QueueUpdater), ctx.Done ())
57
- {{- end}}
58
65
59
66
oper := operator.New (" {{.Spec.Resource.Plural}}" , namespace, resourceHandlers, c.config .AWSClientset .RESTClient ())
60
67
oper.Watch (&awsV1alpha1.{{.Spec .Kind }}{}, ctx.Done ())
61
68
}
62
69
63
70
{{- if .Spec .Queue }}
64
71
// QueueUpdater will take the messages from the queue and process them
65
- func QueueUpdater (config *config .Config , msg *queue .MessageBody ) error {
72
+ func QueueUpdater (config *config .Config , msg *queuemanager .MessageBody ) error {
66
73
logger := config.Logger
67
74
var name , namespace string
68
75
if msg.Updatable {
0 commit comments