@@ -81,7 +81,7 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency iface.Resizeable
8181 // No messages, prevent fast looping
8282 time .Sleep (100 * time .Millisecond )
8383 }
84- //return back to pool right away
84+ // return back to pool right away
8585 concurrency .Return ()
8686 }
8787 }
@@ -162,7 +162,7 @@ func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
162162 signature .AzureMessageContent ,
163163 & azqueue.UpdateMessageOptions {VisibilityTimeout : & delayS })
164164 if err != nil {
165- log .ERROR .Printf ("ignoring error attempting to change visibility timeout. will re-attempt after default period. task %s (%s)" , signature .UUID , signature .Name )
165+ log .ERROR .Printf ("ignoring error attempting to extend visibility timeout. will re-attempt after default period. task %s (%s): %s " , signature .UUID , signature .Name , err )
166166 }
167167
168168 return nil
@@ -180,13 +180,12 @@ func (b *Broker) RetryMessage(signature *tasks.Signature) {
180180 signature .AzureMessageContent ,
181181 & azqueue.UpdateMessageOptions {VisibilityTimeout : & delayS })
182182 if err != nil {
183- log .ERROR .Printf ("ignoring error attempting to change visibility timeout. will re-attempt after default period. task %s (%s)" , signature .UUID , signature .Name )
183+ log .ERROR .Printf ("ignoring error attempting to change visibility timeout. will re-attempt after default period. task %s (%s): %s " , signature .UUID , signature .Name , err )
184184 }
185185}
186186
187187// consume is a method which keeps consuming deliveries from a channel, until there is an error or a stop signal
188188func (b * Broker ) consume (deliveries <- chan azqueue.DequeueMessagesResponse , taskProcessor iface.TaskProcessor , concurrency iface.ResizeablePool ) error {
189-
190189 errorsChan := make (chan error )
191190
192191 for {
@@ -271,7 +270,6 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces
271270// deleteOne is a method delete a delivery from AWS SQS
272271func (b * Broker ) deleteOne (message * azqueue.DequeuedMessage ) error {
273272 _ , err := b .cfg .Client .NewQueueClient (b .queueName ).DeleteMessage (context .Background (), * message .MessageID , * message .PopReceipt , nil )
274-
275273 if err != nil {
276274 return err
277275 }
@@ -308,7 +306,6 @@ func (b *Broker) consumeDeliveries(deliveries <-chan azqueue.DequeueMessagesResp
308306 // Consume the task inside a goroutine so multiple tasks
309307 // can be processed concurrently
310308 go func () {
311-
312309 if err := b .consumeOne (d , taskProcessor ); err != nil {
313310 errorsChan <- err
314311 }
0 commit comments