1
1
package queue
2
2
3
3
import (
4
+ "context"
4
5
"encoding/json"
6
+ "os"
7
+
5
8
"github.com/aws/aws-sdk-go/aws"
6
9
"github.com/aws/aws-sdk-go/aws/session"
7
10
"github.com/aws/aws-sdk-go/service/sns"
8
11
"github.com/aws/aws-sdk-go/service/sqs"
9
12
awsclient "github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/typed/service-operator.aws/v1alpha1"
10
13
"github.com/awslabs/aws-service-operator/pkg/config"
11
- "github.com/awslabs/aws-service-operator/pkg/helpers"
12
- "os"
13
- "strings"
14
+ "github.com/awslabs/aws-service-operator/pkg/queuemanager"
14
15
)
15
16
16
- // HandlerFunc allows you to define a custom function for when a message is stored
17
- type HandlerFunc func (config * config.Config , msg * MessageBody ) error
18
-
19
- // HandleMessage will stub the handler for processing messages
20
- func (f HandlerFunc ) HandleMessage (config * config.Config , msg * MessageBody ) error {
21
- return f (config , msg )
22
- }
23
-
24
- // Handler allows a custom function to be passed
25
- type Handler interface {
26
- HandleMessage (config * config.Config , msg * MessageBody ) error
27
- }
28
-
29
- // ParseMessage will take the message attribute and make it readable
30
- func (m * MessageBody ) ParseMessage () error {
31
- m .Updatable = false
32
- resp := make (map [string ]string )
33
- items := strings .Split (m .Message , "\n " )
34
- for _ , item := range items {
35
- x := strings .Split (item , "=" )
36
- key := x [0 ]
37
- if key != "" {
38
- s := x [1 ]
39
- s = s [1 : len (s )- 1 ]
40
- resp [key ] = s
41
- }
42
- }
43
- m .ParsedMessage = resp
44
-
45
- var resourceProperties ResourceProperties
46
- if resp ["ResourceProperties" ] != "null" {
47
- err := json .Unmarshal ([]byte (resp ["ResourceProperties" ]), & resourceProperties )
48
- if err != nil {
49
- return err
50
- }
51
- m .ResourceProperties = resourceProperties
52
- for _ , tag := range resourceProperties .Tags {
53
- switch tag .Key {
54
- case "Namespace" :
55
- m .Namespace = tag .Value
56
- case "ResourceName" :
57
- m .ResourceName = tag .Value
58
- }
59
- }
60
- if m .Namespace != "" && m .ResourceName != "" {
61
- m .Updatable = true
62
- }
63
- }
64
- return nil
65
- }
66
-
67
- // IsComplete returns a simple status instead of the raw CFT resp
68
- func (m * MessageBody ) IsComplete () bool {
69
- return helpers .IsStackComplete (m .ParsedMessage ["ResourceStatus" ], true )
70
- }
71
-
72
17
// New will initialize the Queue object for watching
73
18
func New (config * config.Config , awsclientset awsclient.ServiceoperatorV1alpha1Interface , timeout int ) * Queue {
74
19
return & Queue {
@@ -78,21 +23,11 @@ func New(config *config.Config, awsclientset awsclient.ServiceoperatorV1alpha1In
78
23
}
79
24
}
80
25
81
- // Register will create all the affilate resources
82
- func (q * Queue ) Register (name string , obj interface {}) (topicARN string , queueURL string , queueAttrs map [string ]* string , subARN string ) {
83
- config := q .config
84
- logger := config .Logger
85
- keyname := keyName (config .ClusterName , name )
86
- svc := sns .New (config .AWSSession )
87
- topicInputs := sns.CreateTopicInput {Name : aws .String (keyname )}
88
- output , err := svc .CreateTopic (& topicInputs )
89
- if err != nil {
90
- logger .Errorf ("Error creating SNS Topic with error '%s'" , err .Error ())
91
- }
92
- topicARN = * output .TopicArn
93
- logger .Infof ("Created sns topic '%s'" , topicARN )
94
-
26
+ // RegisterQueue wkll create the Queue so it is accessible to SNS.
27
+ func RegisterQueue (config * config.Config , name string ) (queueURL , queueARN string , manager * queuemanager.QueueManager , err error ) {
28
+ manager = queuemanager .New ()
95
29
sqsSvc := sqs .New (config .AWSSession )
30
+ keyname := keyName (config .ClusterName , name )
96
31
queueInputs := sqs.CreateQueueInput {
97
32
QueueName : aws .String (keyname ),
98
33
Attributes : map [string ]* string {
@@ -102,14 +37,10 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR
102
37
}
103
38
sqsOutput , err := sqsSvc .CreateQueue (& queueInputs )
104
39
if err != nil {
105
- logger . Errorf ( "Error creating SQS Queue with error '%s'" , err . Error ())
40
+ return "" , "" , manager , err
106
41
}
107
42
queueURL = * sqsOutput .QueueUrl
108
- logger .Infof ("Created sqs queue '%s'" , queueURL )
109
- q .queueURL = queueURL
110
43
111
- // Get Queue Information
112
- // This will later happen with the CRD cft subscription
113
44
queueQueryInputs := sqs.GetQueueAttributesInput {
114
45
QueueUrl : aws .String (queueURL ),
115
46
AttributeNames : []* string {
@@ -118,29 +49,46 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR
118
49
}
119
50
sqsQueueOutput , err := sqsSvc .GetQueueAttributes (& queueQueryInputs )
120
51
if err != nil {
121
- logger . Errorf ( "Error getting SQS Information with error '%s'" , err . Error ())
52
+ return "" , "" , manager , err
122
53
}
123
- queueAttrs = sqsQueueOutput .Attributes
54
+ queueARN = * sqsQueueOutput .Attributes ["QueueArn" ]
55
+ return queueURL , queueARN , manager , nil
56
+ }
124
57
125
- policy := newPolicy (* queueAttrs ["QueueArn" ], keyname , topicARN )
126
- policyb , err := json .Marshal (policy )
58
+ // Subscribe will listen to the global queue and distribute messages
59
+ func Subscribe (config * config.Config , manager * queuemanager.QueueManager , ctx context.Context ) {
60
+ logger := config .Logger
61
+ sess , err := session .NewSession (& aws.Config {
62
+ Region : aws .String (config .Region ),
63
+ })
127
64
if err != nil {
128
- logger .WithError (err ).Error ("error encoding policy to json" )
65
+ logger .WithError (err ).Error ("error creating AWS session" )
66
+ os .Exit (1 )
129
67
}
130
- addPermInput := & sqs.SetQueueAttributesInput {
131
- QueueUrl : aws .String (queueURL ),
132
- Attributes : map [string ]* string {
133
- "Policy" : aws .String (string (policyb )),
134
- }}
135
- _ , err = sqsSvc .SetQueueAttributes (addPermInput )
68
+
69
+ svc := sqs .New (sess )
70
+
71
+ process (config , svc , manager , ctx )
72
+ }
73
+
74
+ // Register will create all the affilate resources
75
+ func (q * Queue ) Register (name string ) (topicARN string , subARN string ) {
76
+ config := q .config
77
+ logger := config .Logger
78
+ keyname := keyName (config .ClusterName , name )
79
+ svc := sns .New (config .AWSSession )
80
+ topicInputs := sns.CreateTopicInput {Name : aws .String (keyname )}
81
+ output , err := svc .CreateTopic (& topicInputs )
136
82
if err != nil {
137
- logger .WithError ( err ). Error ( "error setting queue policy" )
83
+ logger .Errorf ( "Error creating SNS Topic with error '%s'" , err . Error () )
138
84
}
85
+ topicARN = * output .TopicArn
86
+ logger .Infof ("Created sns topic '%s'" , topicARN )
139
87
140
88
// Move to somewhere else.
141
89
subInput := sns.SubscribeInput {
142
90
TopicArn : aws .String (topicARN ),
143
- Endpoint : aws .String (* queueAttrs [ "QueueArn" ] ),
91
+ Endpoint : aws .String (config . QueueARN ),
144
92
Protocol : aws .String ("sqs" ),
145
93
}
146
94
subOutput , err := svc .Subscribe (& subInput )
@@ -153,84 +101,96 @@ func (q *Queue) Register(name string, obj interface{}) (topicARN string, queueUR
153
101
return
154
102
}
155
103
156
- // StartWatch will start a go routine to watch for new events to store in etcd
157
- func (q * Queue ) StartWatch (h Handler , stopCh <- chan struct {}) {
158
- config := q .config
159
- logger := config .Logger
160
- sess , err := session .NewSession (& aws.Config {
161
- Region : aws .String (config .Region ),
162
- })
104
+ func SetQueuePolicy (config * config.Config , manager * queuemanager.QueueManager ) error {
105
+ sqsSvc := sqs .New (config .AWSSession )
106
+ topicARNs := manager .Keys ()
107
+
108
+ policy := newPolicy (config .QueueARN , config .ClusterName , topicARNs )
109
+ policyb , err := json .Marshal (policy )
163
110
if err != nil {
164
- logger .WithError (err ).Infof ("error creating AWS session" )
165
- os .Exit (1 )
111
+ return err
166
112
}
167
113
168
- svc := sqs .New (sess )
169
-
170
- process (q , svc , h , stopCh )
114
+ addPermInput := & sqs.SetQueueAttributesInput {
115
+ QueueUrl : aws .String (config .QueueURL ),
116
+ Attributes : map [string ]* string {
117
+ "Policy" : aws .String (string (policyb )),
118
+ }}
119
+ _ , err = sqsSvc .SetQueueAttributes (addPermInput )
120
+ if err != nil {
121
+ return err
122
+ }
123
+ return nil
171
124
}
172
125
173
126
func keyName (clusterName string , name string ) string {
174
127
return clusterName + "-" + name
175
128
}
176
129
177
- func newPolicy (queueARN string , keyname string , topicARN string ) Policy {
178
- return Policy {
179
- Version : "2012-10-17" ,
180
- ID : queueARN + "/" + keyname + "-policy" ,
181
- Statement : []Statement {
182
- Statement {
183
- Sid : keyname ,
184
- Effect : "Allow" ,
185
- Principal : "*" ,
186
- Action : []string {"SQS:ReceiveMessage" , "SQS:SendMessage" },
187
- Resource : queueARN ,
188
- Condition : Condition {
189
- ArnEquals : ArnEquals {
190
- AwsSourceArn : topicARN ,
191
- },
130
+ func newPolicy (queueARN string , name string , topicARNs []string ) Policy {
131
+ statements := []Statement {}
132
+ for _ , topicARN := range topicARNs {
133
+ statements = append (statements , Statement {
134
+ Sid : topicARN + " statment" ,
135
+ Effect : "Allow" ,
136
+ Principal : "*" ,
137
+ Action : []string {"SQS:ReceiveMessage" , "SQS:SendMessage" },
138
+ Resource : queueARN ,
139
+ Condition : Condition {
140
+ ArnEquals : ArnEquals {
141
+ AwsSourceArn : topicARN ,
192
142
},
193
143
},
194
- },
144
+ })
145
+ }
146
+
147
+ return Policy {
148
+ Version : "2012-10-17" ,
149
+ ID : queueARN + "/" + name + "-policy" ,
150
+ Statement : statements ,
195
151
}
196
152
}
197
153
198
- func process (q * Queue , svc * sqs.SQS , h Handler , stopCh <- chan struct {}) error {
199
- config := q .config
154
+ func process (config * config.Config , svc * sqs.SQS , manager * queuemanager.QueueManager , ctx context.Context ) error {
200
155
logger := config .Logger
201
156
for {
202
157
result , err := svc .ReceiveMessage (& sqs.ReceiveMessageInput {
203
- QueueUrl : aws .String (q . queueURL ),
158
+ QueueUrl : aws .String (config . QueueURL ),
204
159
AttributeNames : aws .StringSlice ([]string {
205
160
"SentTimestamp" ,
206
161
}),
207
162
MaxNumberOfMessages : aws .Int64 (1 ),
208
163
MessageAttributeNames : aws .StringSlice ([]string {
209
164
"All" ,
210
165
}),
211
- WaitTimeSeconds : aws .Int64 (q . timeout ),
166
+ WaitTimeSeconds : aws .Int64 (10 ),
212
167
})
213
168
if err != nil {
214
169
logger .WithError (err ).Error ("unable to receive messages from queue" )
215
170
os .Exit (1 )
216
171
}
217
172
218
173
for _ , message := range result .Messages {
219
- mb := & MessageBody {}
174
+ mb := & queuemanager. MessageBody {}
220
175
err := json .Unmarshal ([]byte (* message .Body ), mb )
221
176
if err != nil {
222
177
logger .WithError (err ).Error ("error parsing message body" )
223
178
}
224
179
mb .ParseMessage ()
225
180
logger .Debugf ("%+v" , mb )
226
181
182
+ h , ok := manager .Get (mb .TopicARN )
183
+ if ! ok {
184
+ logger .Errorf ("error missing handler function for topic %s" , mb .TopicARN )
185
+ }
186
+
227
187
err = h .HandleMessage (config , mb )
228
188
if err != nil {
229
189
logger .WithError (err ).Error ("error processing message" )
230
190
}
231
191
logger .Debugf ("stackID %v updated status to %v" , mb .ParsedMessage ["StackId" ], mb .ParsedMessage ["ResourceStatus" ])
232
192
_ , err = svc .DeleteMessage (& sqs.DeleteMessageInput {
233
- QueueUrl : aws .String (q . queueURL ),
193
+ QueueUrl : aws .String (config . QueueURL ),
234
194
ReceiptHandle : message .ReceiptHandle ,
235
195
})
236
196
@@ -239,11 +199,5 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error {
239
199
os .Exit (1 )
240
200
}
241
201
}
242
-
243
- select {
244
- case <- stopCh :
245
- os .Exit (1 )
246
- default :
247
- }
248
202
}
249
203
}
0 commit comments