Skip to content

Commit bd02aad

Browse files
Add a log message when SQS queue is empty and log sample of wait times (#40)
* Add a log message when SQS queue is empty * Log wait time for SQS read for sample of 0.01% of messages
1 parent 9ff7ab2 commit bd02aad

File tree

1 file changed

+24
-12
lines changed

1 file changed

+24
-12
lines changed

v1/brokers/sqs/sqs.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"math/rand"
89
"strconv"
910
"strings"
1011
"sync"
@@ -26,6 +27,7 @@ import (
2627
const (
2728
maxAWSSQSDelay = time.Minute * 15 // Max supported SQS delay is 15 min: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
2829
maxAWSSQSVisibilityTimeout = time.Hour * 12 // Max supported SQS visibility timeout is 12 hours: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibility.html
30+
logSQSReceiveSampleRate = 0.0001 // 0.01% of messages received from SQS will be logged
2931
)
3032

3133
// Broker represents a AWS SQS broker
@@ -94,7 +96,20 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency iface.Resizeable
9496
return false, nil
9597
case <-pool:
9698
output, err := b.receiveMessage(qURL)
97-
if err == nil && len(output.Messages) > 0 {
99+
if err != nil {
100+
log.ERROR.Printf("Queue consume error on %s: %s", *qURL, err)
101+
102+
// Avoid repeating this
103+
if strings.Contains(err.Error(), "AWS.SimpleQueueService.NonExistentQueue") {
104+
time.Sleep(30 * time.Second)
105+
}
106+
//return back to pool right away
107+
concurrency.Return()
108+
} else if len(output.Messages) == 0 {
109+
log.INFO.Printf("Received Messages returned empty on %s", *qURL)
110+
//return back to pool right away
111+
concurrency.Return()
112+
} else {
98113
b.processingWG.Add(1)
99114
go func() {
100115
consumeError := b.consumeOne(output, taskProcessor)
@@ -104,17 +119,6 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency iface.Resizeable
104119
concurrency.Return()
105120
b.processingWG.Done()
106121
}()
107-
} else {
108-
if err != nil {
109-
log.ERROR.Printf("Queue consume error on %s: %s", *qURL, err)
110-
111-
// Avoid repeating this
112-
if strings.Contains(err.Error(), "AWS.SimpleQueueService.NonExistentQueue") {
113-
time.Sleep(30 * time.Second)
114-
}
115-
}
116-
//return back to pool right away
117-
concurrency.Return()
118122
}
119123
}
120124
}
@@ -375,7 +379,15 @@ func (b *Broker) receiveMessage(qURL *string) (*awssqs.ReceiveMessageOutput, err
375379
if visibilityTimeout != nil {
376380
input.VisibilityTimeout = aws.Int64(int64(*visibilityTimeout))
377381
}
382+
start := time.Now()
378383
result, err := b.service.ReceiveMessage(input)
384+
if rand.Float64() < logSQSReceiveSampleRate {
385+
messageID := "unknown"
386+
if err == nil && len(result.Messages) > 0 {
387+
messageID = *result.Messages[0].MessageId
388+
}
389+
log.INFO.Printf("Sampled SQS ReceiveMessage (messageID: %s) for queue (%s) took %d (%.2f%% sample rate). Was error? %t", messageID, *qURL, time.Since(start).Milliseconds(), logSQSReceiveSampleRate*100, err != nil)
390+
}
379391
if err != nil {
380392
return nil, err
381393
}

0 commit comments

Comments
 (0)