Skip to content

Commit 069ef7d

Browse files
add loop for comsume , add sqs.go for connection
1 parent b7e5cd5 commit 069ef7d

File tree

2 files changed

+77
-35
lines changed

2 files changed

+77
-35
lines changed

consumer.go

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,46 +29,50 @@ func NewConsumer(client *sqs.SQS, queueURL string, ackOnConsume bool, visibility
2929
}
3030

3131
func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
32-
result, er1 := c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
33-
AttributeNames: []*string{
34-
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
35-
},
36-
MessageAttributeNames: []*string{
37-
aws.String(sqs.QueueAttributeNameAll),
38-
},
39-
QueueUrl: c.QueueURL,
40-
MaxNumberOfMessages: aws.Int64(1),
41-
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
42-
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
43-
})
44-
if er1 != nil {
45-
handle(ctx, nil, er1)
46-
} else {
47-
if len(result.Messages) > 0 {
48-
m := result.Messages[0]
49-
data := []byte(*m.Body)
50-
attributes := PtrToMap(m.Attributes)
51-
message := mq.Message{
52-
Id: *m.MessageId,
53-
Data: data,
54-
Attributes: attributes,
55-
Raw: m,
56-
}
57-
if c.AckOnConsume {
58-
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
59-
QueueUrl: c.QueueURL,
60-
ReceiptHandle: result.Messages[0].ReceiptHandle,
61-
})
62-
if er2 != nil {
63-
handle(ctx, nil, er2)
32+
var result *sqs.ReceiveMessageOutput
33+
var er1 error
34+
loop:
35+
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
36+
AttributeNames: []*string{
37+
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
38+
},
39+
MessageAttributeNames: []*string{
40+
aws.String(sqs.QueueAttributeNameAll),
41+
},
42+
QueueUrl: c.QueueURL,
43+
MaxNumberOfMessages: aws.Int64(1),
44+
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
45+
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
46+
})
47+
if er1 != nil {
48+
handle(ctx, nil, er1)
49+
} else {
50+
if len(result.Messages) > 0 {
51+
m := result.Messages[0]
52+
data := []byte(*m.Body)
53+
attributes := PtrToMap(m.Attributes)
54+
message := mq.Message{
55+
Id: *m.MessageId,
56+
Data: data,
57+
Attributes: attributes,
58+
Raw: m,
59+
}
60+
if c.AckOnConsume {
61+
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
62+
QueueUrl: c.QueueURL,
63+
ReceiptHandle: result.Messages[0].ReceiptHandle,
64+
})
65+
if er2 != nil {
66+
handle(ctx, nil, er2)
67+
} else {
68+
handle(ctx, &message, nil)
69+
}
6470
} else {
6571
handle(ctx, &message, nil)
6672
}
67-
} else {
68-
handle(ctx, &message, nil)
6973
}
7074
}
71-
}
75+
goto loop
7276
}
7377

7478
func PtrToMap(m map[string]*string) map[string]string {

sqs.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package sqs
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws/session"
5+
"github.com/aws/aws-sdk-go/aws"
6+
"github.com/aws/aws-sdk-go/aws/credentials"
7+
"github.com/aws/aws-sdk-go/service/sqs"
8+
)
9+
10+
type (
11+
Config struct {
12+
Region string `mapstructure:"region" json:"region,omitempty" gorm:"column:region" bson:"region,omitempty" dynamodbav:"region,omitempty" firestore:"region,omitempty"`
13+
AccessKeyID string `mapstructure:"access_key_id" json:"accessKeyID,omitempty" gorm:"column:accessKeyID" bson:"accessKeyID,omitempty" dynamodbav:"accessKeyID,omitempty" firestore:"accessKeyID,omitempty"`
14+
SecretAccessKey string `mapstructure:"secret_access_key" json:"secretAccessKey,omitempty" gorm:"column:secretaccesskey" bson:"secretAccessKey,omitempty" dynamodbav:"secretAccessKey,omitempty" firestore:"secretAccessKey,omitempty"`
15+
QueueName string `mapstructure:"queue_name" json:"queueName,omitempty" gorm:"column:token" bson:"queueName,omitempty" dynamodbav:"queueName,omitempty" firestore:"queueName,omitempty"`
16+
}
17+
)
18+
19+
func NewSession(config Config) (*session.Session, error) {
20+
c := &aws.Config{
21+
Region: aws.String(config.Region),
22+
Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""),
23+
}
24+
return session.NewSession(c)
25+
}
26+
27+
func Connect(config Config) (*sqs.SQS, error) {
28+
sess, err := NewSession(config)
29+
if err != nil {
30+
return nil, err
31+
}
32+
mySQS := sqs.New(sess)
33+
return mySQS, nil
34+
}
35+
36+
func ConnectWithSession(session *session.Session) *sqs.SQS {
37+
return sqs.New(session)
38+
}

0 commit comments

Comments
 (0)