@@ -12,7 +12,9 @@ import (
1212type AliQueueManager interface {
1313 CreateSimpleQueue (queueName string ) (err error )
1414 CreateQueue (queueName string , delaySeconds int32 , maxMessageSize int32 , messageRetentionPeriod int32 , visibilityTimeout int32 , pollingWaitSeconds int32 , slices int32 ) (err error )
15+ CreateQueueWithOptions (queueName string , options ... QueueOption ) (err error )
1516 SetQueueAttributes (queueName string , delaySeconds int32 , maxMessageSize int32 , messageRetentionPeriod int32 , visibilityTimeout int32 , pollingWaitSeconds int32 , slices int32 ) (err error )
17+ SetQueueAttributesWithOptions (queueName string , options ... QueueOption ) (err error )
1618 GetQueueAttributes (queueName string ) (attr QueueAttribute , err error )
1719 DeleteQueue (queueName string ) (err error )
1820 ListQueue (nextMarker string , retNumber int32 , prefix string ) (queues Queues , err error )
@@ -24,6 +26,59 @@ type MNSQueueManager struct {
2426 decoder MNSDecoder
2527}
2628
29+ type QueueOptions struct {
30+ delaySeconds int32
31+ maxMessageSize int32
32+ messageRetentionPeriod int32
33+ visibilityTimeout int32
34+ pollingWaitSeconds int32
35+ loggingEnabled bool
36+ }
37+
38+ type QueueOption func (* QueueOptions , map [string ]bool )
39+
40+ func WithDelaySeconds (delay int32 ) QueueOption {
41+ return func (o * QueueOptions , tracker map [string ]bool ) {
42+ o .delaySeconds = delay
43+ tracker ["delaySeconds" ] = true
44+ }
45+ }
46+
47+ func WithMaxMessageSize (size int32 ) QueueOption {
48+ return func (o * QueueOptions , tracker map [string ]bool ) {
49+ o .maxMessageSize = size
50+ tracker ["maxMessageSize" ] = true
51+ }
52+ }
53+
54+ func WithMessageRetentionPeriod (period int32 ) QueueOption {
55+ return func (o * QueueOptions , tracker map [string ]bool ) {
56+ o .messageRetentionPeriod = period
57+ tracker ["messageRetentionPeriod" ] = true
58+ }
59+ }
60+
61+ func WithVisibilityTimeout (timeout int32 ) QueueOption {
62+ return func (o * QueueOptions , tracker map [string ]bool ) {
63+ o .visibilityTimeout = timeout
64+ tracker ["visibilityTimeout" ] = true
65+ }
66+ }
67+
68+ func WithPollingWaitSeconds (seconds int32 ) QueueOption {
69+ return func (o * QueueOptions , tracker map [string ]bool ) {
70+ o .pollingWaitSeconds = seconds
71+ tracker ["pollingWaitSeconds" ] = true
72+ }
73+ }
74+
75+ func WithLoggingEnabled (enabled bool ) QueueOption {
76+ return func (o * QueueOptions , tracker map [string ]bool ) {
77+ o .loggingEnabled = enabled
78+ tracker ["loggingEnabled" ] = true
79+ }
80+ }
81+
2782func checkQueueName (queueName string ) (err error ) {
2883 if len (queueName ) > 256 {
2984 err = ERR_MNS_QUEUE_NAME_IS_TOO_LONG .New ()
@@ -111,6 +166,7 @@ func (p *MNSQueueManager) CreateQueue(queueName string, delaySeconds int32, maxM
111166 MessageRetentionPeriod : messageRetentionPeriod ,
112167 VisibilityTimeout : visibilityTimeout ,
113168 PollingWaitSeconds : pollingWaitSeconds ,
169+ LoggingEnabled : false ,
114170 }
115171
116172 var code int
@@ -124,6 +180,41 @@ func (p *MNSQueueManager) CreateQueue(queueName string, delaySeconds int32, maxM
124180 return
125181}
126182
183+ func (p * MNSQueueManager ) CreateQueueWithOptions (queueName string , options ... QueueOption ) (err error ) {
184+ queueName = strings .TrimSpace (queueName )
185+ if err = checkQueueName (queueName ); err != nil {
186+ return
187+ }
188+ opts := defaultQueueOptions ()
189+ tracker := make (map [string ]bool )
190+ for _ , opt := range options {
191+ opt (& opts , tracker )
192+ }
193+
194+ if err = checkAttributes (opts .delaySeconds , opts .messageRetentionPeriod ,
195+ opts .visibilityTimeout , opts .pollingWaitSeconds ); err != nil {
196+ return
197+ }
198+
199+ message := CreateQueueRequest {
200+ DelaySeconds : opts .delaySeconds ,
201+ MaxMessageSize : opts .maxMessageSize ,
202+ MessageRetentionPeriod : opts .messageRetentionPeriod ,
203+ VisibilityTimeout : opts .visibilityTimeout ,
204+ PollingWaitSeconds : opts .pollingWaitSeconds ,
205+ LoggingEnabled : opts .loggingEnabled ,
206+ }
207+
208+ var code int
209+ code , err = send (p .cli , p .decoder , PUT , nil , & message , "queues/" + queueName , nil )
210+ if code == http .StatusNoContent {
211+ err = ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR .New (errors.Params {"name" : queueName })
212+ return
213+ }
214+
215+ return
216+ }
217+
127218func (p * MNSQueueManager ) SetQueueAttributes (queueName string , delaySeconds int32 , maxMessageSize int32 , messageRetentionPeriod int32 , visibilityTimeout int32 , pollingWaitSeconds int32 , slices int32 ) (err error ) {
128219 queueName = strings .TrimSpace (queueName )
129220
@@ -150,6 +241,58 @@ func (p *MNSQueueManager) SetQueueAttributes(queueName string, delaySeconds int3
150241 return
151242}
152243
244+ func (p * MNSQueueManager ) SetQueueAttributesWithOptions (queueName string , options ... QueueOption ) (err error ) {
245+ queueName = strings .TrimSpace (queueName )
246+ if err = checkQueueName (queueName ); err != nil {
247+ return
248+ }
249+ opts := QueueOptions {}
250+ tracker := make (map [string ]bool )
251+ for _ , opt := range options {
252+ opt (& opts , tracker )
253+ }
254+
255+ message := CreateQueueRequest {}
256+ if tracker ["delaySeconds" ] {
257+ if err = checkDelaySeconds (opts .delaySeconds ); err != nil {
258+ return
259+ }
260+ message .DelaySeconds = opts .delaySeconds
261+ }
262+
263+ if tracker ["maxMessageSize" ] {
264+ message .MaxMessageSize = opts .maxMessageSize
265+ }
266+
267+ if tracker ["messageRetentionPeriod" ] {
268+ if err = checkMessageRetentionPeriod (opts .messageRetentionPeriod ); err != nil {
269+ return
270+ }
271+ message .MessageRetentionPeriod = opts .messageRetentionPeriod
272+ }
273+
274+ if tracker ["visibilityTimeout" ] {
275+ if err = checkVisibilityTimeout (opts .visibilityTimeout ); err != nil {
276+ return
277+ }
278+ message .VisibilityTimeout = opts .visibilityTimeout
279+ }
280+
281+ if tracker ["pollingWaitSeconds" ] {
282+ if err = checkPollingWaitSeconds (opts .pollingWaitSeconds ); err != nil {
283+ return
284+ }
285+ message .PollingWaitSeconds = opts .pollingWaitSeconds
286+ }
287+
288+ if tracker ["loggingEnabled" ] {
289+ message .LoggingEnabled = opts .loggingEnabled
290+ }
291+
292+ _ , err = send (p .cli , p .decoder , PUT , nil , & message , fmt .Sprintf ("queues/%s?metaoverride=true" , queueName ), nil )
293+ return
294+ }
295+
153296func (p * MNSQueueManager ) GetQueueAttributes (queueName string ) (attr QueueAttribute , err error ) {
154297 queueName = strings .TrimSpace (queueName )
155298
@@ -235,3 +378,14 @@ func (p *MNSQueueManager) ListQueueDetail(nextMarker string, retNumber int32, pr
235378
236379 return
237380}
381+
382+ func defaultQueueOptions () QueueOptions {
383+ return QueueOptions {
384+ delaySeconds : 0 ,
385+ maxMessageSize : 65536 ,
386+ messageRetentionPeriod : 345600 ,
387+ visibilityTimeout : 30 ,
388+ pollingWaitSeconds : 0 ,
389+ loggingEnabled : false ,
390+ }
391+ }
0 commit comments