Skip to content

Commit 61086db

Browse files
authored
Improve backend error tolerance (#33)
* Update SQS test to reflect resizable concurrency pool signature * Improve SQS broker reliability when backend returns errors Originally, the SQS broker created a goroutine with an infinite loop. That goroutine would poll SQS and put any received messages on a channel. A separate goroutine would poll that channel, spawning a goroutine for any received items. A separate channel reported errors between these different goroutines. In theory, all the goroutines would listen for channel closure and error channel items to handle errors. Unfortunately, if the backend reports errors then polling the queue would quickly stop, because in this case the currency pool wasn't properly returned. To address this, the multiple goroutines are collapsed. When the initial pollying goroutine receives a message, it immediately spawns a goroutine to process that message, rather than using a channel and intermediate goroutine. In addition, it always returns to the concurrency pool immediately after either encountering an error or processing a message. This allows polling and processing to continue even if the backend reports an error.
1 parent 6c32ce7 commit 61086db

File tree

3 files changed

+51
-81
lines changed

3 files changed

+51
-81
lines changed

v1/brokers/sqs/sqs.go

Lines changed: 42 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -66,48 +66,57 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency iface.Resizeable
6666
//save it so that it can be used later when attempting to delete task
6767
b.queueUrl = qURL
6868

69-
deliveries := make(chan *awssqs.ReceiveMessageOutput)
70-
7169
b.stopReceivingChan = make(chan int)
7270
b.receivingWG.Add(1)
7371

74-
go func() {
75-
defer b.receivingWG.Done()
76-
77-
log.INFO.Printf("[*] Waiting for messages on queue: %s. To exit press CTRL+C\n", *qURL)
78-
79-
pool := concurrency.Pool()
72+
defer b.receivingWG.Done()
8073

81-
for {
82-
select {
83-
// A way to stop this goroutine from b.StopConsuming
84-
case <-b.stopReceivingChan:
85-
close(deliveries)
74+
log.INFO.Printf("[*] Waiting for messages on queue: %s. To exit press CTRL+C\n", *qURL)
8675

87-
return
88-
case <-pool:
89-
output, err := b.receiveMessage(qURL)
90-
if err == nil && len(output.Messages) > 0 {
91-
deliveries <- output
76+
pool := concurrency.Pool()
9277

93-
} else {
94-
if err != nil {
95-
log.ERROR.Printf("Queue consume error on %s: %s", *qURL, err)
78+
errorsChan := make(chan error)
79+
defer func() {
80+
// There could be an outstanding consumeOne call in a goroutine that will return an error that
81+
// get put on the channel, so make sure there are no oustanding consumeOne calls before closing the channel
82+
b.processingWG.Wait()
83+
close(errorsChan)
84+
}()
9685

97-
// Avoid repeating this
98-
if strings.Contains(err.Error(), "AWS.SimpleQueueService.NonExistentQueue") {
99-
time.Sleep(30 * time.Second)
100-
}
86+
for {
87+
select {
88+
case workerError := <-errorsChan:
89+
return b.GetRetry(), workerError
90+
// A way to stop this goroutine from b.StopConsuming
91+
case <-b.stopReceivingChan:
92+
// If someone called b.stopReceivingChannel, they are trying to shut down the process
93+
// so we don't want to retry the StartConsuming call
94+
return false, nil
95+
case <-pool:
96+
output, err := b.receiveMessage(qURL)
97+
if err == nil && len(output.Messages) > 0 {
98+
b.processingWG.Add(1)
99+
go func() {
100+
consumeError := b.consumeOne(output, taskProcessor)
101+
if consumeError != nil {
102+
errorsChan <- consumeError
101103
}
102-
//return back to pool right away
103104
concurrency.Return()
105+
b.processingWG.Done()
106+
}()
107+
} else {
108+
if err != nil {
109+
log.ERROR.Printf("Queue consume error on %s: %s", *qURL, err)
110+
111+
// Avoid repeating this
112+
if strings.Contains(err.Error(), "AWS.SimpleQueueService.NonExistentQueue") {
113+
time.Sleep(30 * time.Second)
114+
}
104115
}
116+
//return back to pool right away
117+
concurrency.Return()
105118
}
106119
}
107-
}()
108-
109-
if err := b.consume(deliveries, taskProcessor, concurrency); err != nil {
110-
return b.GetRetry(), err
111120
}
112121

113122
return b.GetRetry(), nil
@@ -231,22 +240,6 @@ func restrictVisibilityTimeoutDelay(delay time.Duration, receivedAt time.Time) t
231240
return delay
232241
}
233242

234-
// consume is a method which keeps consuming deliveries from a channel, until there is an error or a stop signal
235-
func (b *Broker) consume(deliveries <-chan *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor, concurrency iface.ResizeablePool) error {
236-
237-
errorsChan := make(chan error)
238-
239-
for {
240-
whetherContinue, err := b.consumeDeliveries(deliveries, taskProcessor, concurrency, errorsChan)
241-
if err != nil {
242-
return err
243-
}
244-
if whetherContinue == false {
245-
return nil
246-
}
247-
}
248-
}
249-
250243
// consumeOne is a method consumes a delivery. If a delivery was consumed successfully, it will be deleted from AWS SQS
251244
func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor) error {
252245
if len(delivery.Messages) == 0 {
@@ -390,34 +383,6 @@ func (b *Broker) initializePool(pool chan struct{}, concurrency int) {
390383
}
391384
}
392385

393-
// consumeDeliveries is a method consuming deliveries from deliveries channel
394-
func (b *Broker) consumeDeliveries(deliveries <-chan *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor, concurrency iface.ResizeablePool, errorsChan chan error) (bool, error) {
395-
select {
396-
case err := <-errorsChan:
397-
return false, err
398-
case d := <-deliveries:
399-
400-
b.processingWG.Add(1)
401-
402-
// Consume the task inside a goroutine so multiple tasks
403-
// can be processed concurrently
404-
go func() {
405-
406-
if err := b.consumeOne(d, taskProcessor); err != nil {
407-
errorsChan <- err
408-
}
409-
410-
b.processingWG.Done()
411-
412-
// give worker back to pool
413-
concurrency.Return()
414-
}()
415-
case <-b.GetStopChan():
416-
return false, nil
417-
}
418-
return true, nil
419-
}
420-
421386
// continueReceivingMessages is a method returns a continue signal
422387
func (b *Broker) continueReceivingMessages(qURL *string, deliveries chan *awssqs.ReceiveMessageOutput) (bool, error) {
423388
select {
@@ -432,7 +397,9 @@ func (b *Broker) continueReceivingMessages(qURL *string, deliveries chan *awssqs
432397
if len(output.Messages) == 0 {
433398
return true, nil
434399
}
435-
go func() { deliveries <- output }()
400+
go func() {
401+
deliveries <- output
402+
}()
436403
}
437404
return true, nil
438405
}

v1/brokers/sqs/sqs_export_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func NewTestErrorBroker() *Broker {
147147
}
148148

149149
func (b *Broker) ConsumeForTest(deliveries <-chan *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor, concurrency iface.ResizeablePool) error {
150-
return b.consume(deliveries, taskProcessor, concurrency)
150+
panic("not implemented")
151+
return nil
151152
}
152153

153154
func (b *Broker) ConsumeOneForTest(delivery *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor) error {
@@ -171,11 +172,13 @@ func (b *Broker) InitializePoolForTest(pool chan struct{}, concurrency int) {
171172
}
172173

173174
func (b *Broker) ConsumeDeliveriesForTest(deliveries <-chan *awssqs.ReceiveMessageOutput, taskProcessor iface.TaskProcessor, concurrency iface.ResizeablePool, errorsChan chan error) (bool, error) {
174-
return b.consumeDeliveries(deliveries, taskProcessor, concurrency, errorsChan)
175+
panic("not implemented")
176+
return false, nil
175177
}
176178

177179
func (b *Broker) ContinueReceivingMessagesForTest(qURL *string, deliveries chan *awssqs.ReceiveMessageOutput) (bool, error) {
178-
return b.continueReceivingMessages(qURL, deliveries)
180+
panic("not implemented")
181+
return false, nil
179182
}
180183

181184
func (b *Broker) StopReceivingForTest() {

v1/brokers/sqs/sqs_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestPrivateFunc_consume(t *testing.T) {
102102
t.Fatal(err)
103103
}
104104

105-
concurrency := common.NewResizablePool(1)
105+
concurrency, _ := common.NewResizablePool(1)
106106
wk := server1.NewWorker("sms_worker", 0)
107107
deliveries := make(chan *awssqs.ReceiveMessageOutput)
108108
outputCopy := *receiveMessageOutput
@@ -204,7 +204,7 @@ func TestPrivateFunc_receiveMessage(t *testing.T) {
204204

205205
func TestPrivateFunc_consumeDeliveries(t *testing.T) {
206206

207-
concurrency := common.NewResizablePool(0)
207+
concurrency, _ := common.NewResizablePool(0)
208208
errorsChan := make(chan error)
209209
deliveries := make(chan *awssqs.ReceiveMessageOutput)
210210
server1, err := machinery.NewServer(cnf)
@@ -319,7 +319,7 @@ func TestPrivateFunc_consumeWithConcurrency(t *testing.T) {
319319
broker.SetRegisteredTaskNames([]string{"test-task"})
320320
assert.NoError(t, err)
321321

322-
concurrency := common.NewResizablePool(1)
322+
concurrency, _ := common.NewResizablePool(1)
323323
wk := server1.NewWorker("sms_worker", 1)
324324
deliveries := make(chan *awssqs.ReceiveMessageOutput)
325325
outputCopy := *receiveMessageOutput

0 commit comments

Comments
 (0)