Skip to content

Commit dc288ea

Browse files
committed
Refactor code
1 parent 8acfe26 commit dc288ea

File tree

6 files changed

+250
-54
lines changed

6 files changed

+250
-54
lines changed

README.md

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,78 @@
11
# sqs
2+
A fully managed message queue service offered by AWS. It provides a reliable, scalable, and cost-effective way to decouple and coordinate distributed software systems and microservices.
3+
4+
### Libraries for Amazon SQS (Simple Queue Service)
5+
- GO: [sqs](https://github.com/core-go/sqs), to wrap and simplify [aws-sdk-go/service/sqs](https://github.com/aws/aws-sdk-go/tree/main/service/sqs). Example is at [go-amazon-sqs-sample](https://github.com/project-samples/go-amazon-sqs-sample)
6+
7+
#### A common flow to consume a message from a message queue
8+
![A common flow to consume a message from a message queue](https://cdn-images-1.medium.com/max/800/1*Y4QUN6QnfmJgaKigcNHbQA.png)
9+
- The libraries to implement this flow are:
10+
- [mq](https://github.com/core-go/mq) for GOLANG. Example is at [go-amazon-sqs-sample](https://github.com/project-samples/go-amazon-sqs-sample)
11+
12+
### Use Cases of Amazon SQS (Simple Queue Service)
13+
![Microservice Architecture](https://cdn-images-1.medium.com/max/800/1*vKeePO_UC73i7tfymSmYNA.png)
14+
#### Decoupling Microservices
15+
- <b>Scenario</b>: Separating different parts of an application to ensure that a failure in one part does not affect others.
16+
- <b>Benefit</b>: Enhances fault tolerance and scalability by allowing asynchronous communication between services.
17+
#### Asynchronous Processing
18+
- <b>Scenario</b>: Handling tasks that do not need immediate processing, such as batch processing or background tasks.
19+
- <b>Benefit</b>: Improves system efficiency and response times for end-users.
20+
![A typical micro service](https://cdn-images-1.medium.com/max/800/1*d9kyekAbQYBxH-C6w38XZQ.png)
21+
#### Job Queuing
22+
- <b>Scenario</b>: Managing and distributing jobs to worker processes.
23+
- <b>Benefit</b>: Balances load and ensures all tasks are completed without overloading any single worker.
24+
#### Order Processing Systems
25+
- <b>Scenario</b>: Processing customer orders, where each order can be handled as a separate task.
26+
- <b>Benefit</b>: Ensures reliable and scalable processing of orders, even during high demand.
27+
#### Message Buffering
28+
- <b>Scenario</b>: Smoothing out bursty traffic in applications to prevent overload.
29+
- <b>Benefit</b>: Protects the system from spikes in traffic by buffering messages.
30+
#### Workflow Orchestration
31+
- <b>Scenario</b>: Orchestrating steps in a complex workflow, such as image processing pipelines.
32+
- <b>Benefit</b>: Coordinates different stages of processing in a reliable and scalable manner.
33+
34+
## Comparison of Amazon SQS, Google Pub/Sub and Apache Kafka
35+
#### Amazon SQS
36+
- <b>Type</b>: Managed message queuing service.
37+
- <b>Use Case</b>: Decoupling and scaling microservices, asynchronous tasks.
38+
- <b>Scalability</b>: Automatically scales.
39+
- <b>Delivery Guarantees</b>: At-least-once, FIFO (exactly-once).
40+
- <b>Integration</b>: Deep integration with AWS services.
41+
- <b>Delivery Models</b>: Primarily pull, with long polling.
42+
43+
#### Google Pub/Sub:
44+
- <b>Type</b>: Managed real-time messaging service.
45+
- <b>Use Case</b>: Event-driven architectures, real-time analytics.
46+
- <b>Scalability</b>: Automatically scales.
47+
- <b>Delivery Guarantees</b>: At-least-once delivery.
48+
- <b>Integration</b>: Tight with Google Cloud services.
49+
- <b>Delivery Models</b>: Push and pull.
50+
51+
#### Apache Kafka
52+
- <b>Type</b>: Open-source event streaming platform.
53+
- <b>Use Case</b>: High-throughput messaging, event sourcing, log aggregation.
54+
- <b>Scalability</b>: High with partitioned topics.
55+
- <b>Delivery Guarantees</b>: Configurable (at-least-once, exactly-once).
56+
- <b>Integration</b>: Broad ecosystem with various connectors.
57+
- <b>Delivery Models</b>: Pull-based consumer groups.
58+
59+
### Key Differences
60+
- <b>Management</b>: Pub/Sub and SQS are managed services, while Kafka is typically self-managed or via managed services like Confluent.
61+
- <b>Use Case Focus</b>: Pub/Sub and Kafka are ideal for real-time processing, whereas SQS is great for decoupling microservices and handling asynchronous tasks.
62+
- <b>Delivery Models</b>: Pub/Sub supports push and pull, SQS supports pull with long polling, and Kafka primarily uses pull with consumer groups.
63+
- <b>Scalability</b>: All three are highly scalable, but Kafka offers the most control over performance tuning.
64+
- <b>Integration</b>: Pub/Sub integrates well with Google Cloud, SQS with AWS, and Kafka has a broad integration ecosystem.
265

366
## Installation
467

5-
Please make sure to initialize a Go module before installing common-go/sqs:
68+
Please make sure to initialize a Go module before installing core-go/sqs:
669

770
```shell
8-
go get -u github.com/common-go/sqs
71+
go get -u github.com/core-go/sqs
972
```
1073

1174
Import:
1275

1376
```go
14-
import "github.com/common-go/sqs"
77+
import "github.com/core-go/sqs"
1578
```

health_checker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func (h *HealthChecker) Name() string {
3838

3939
func (h *HealthChecker) Check(ctx context.Context) (map[string]interface{}, error) {
4040
res := make(map[string]interface{})
41+
h.Client.Config.HTTPClient.Timeout = h.Timeout
4142
_, err := h.Client.GetQueueUrl(&sqs.GetQueueUrlInput{
4243
QueueName: h.QueueName,
4344
})
@@ -48,6 +49,9 @@ func (h *HealthChecker) Build(ctx context.Context, data map[string]interface{},
4849
if err == nil {
4950
return data
5051
}
52+
if data == nil {
53+
data = make(map[string]interface{}, 0)
54+
}
5155
data["error"] = err.Error()
5256
return data
5357
}

queue_sender.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package sqs
2+
3+
import (
4+
"context"
5+
"github.com/aws/aws-sdk-go/aws"
6+
"github.com/aws/aws-sdk-go/service/sqs"
7+
)
8+
9+
type QueueSender struct {
10+
Client *sqs.SQS
11+
DelaySeconds *int64 //could be 10
12+
}
13+
14+
func NewQueueSender(client *sqs.SQS, delaySeconds int64) *QueueSender {
15+
return &QueueSender{Client: client, DelaySeconds: &delaySeconds}
16+
}
17+
func (p *QueueSender) Send(ctx context.Context, queueName string, data []byte, attributes map[string]string) error {
18+
queueUrl, er0 := GetQueueUrl(p.Client, queueName)
19+
if er0 != nil {
20+
return er0
21+
}
22+
attrs := MapToAttributes(attributes)
23+
s := string(data)
24+
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
25+
DelaySeconds: p.DelaySeconds,
26+
MessageAttributes: attrs,
27+
MessageBody: aws.String(s),
28+
QueueUrl: &queueUrl,
29+
})
30+
return err
31+
}
32+
func (p *QueueSender) SendBody(ctx context.Context, queueName string, data []byte) error {
33+
queueUrl, er0 := GetQueueUrl(p.Client, queueName)
34+
if er0 != nil {
35+
return er0
36+
}
37+
s := string(data)
38+
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
39+
DelaySeconds: p.DelaySeconds,
40+
MessageBody: aws.String(s),
41+
QueueUrl: &queueUrl,
42+
})
43+
return err
44+
}

receiver.go

Lines changed: 110 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"github.com/aws/aws-sdk-go/aws"
66
"github.com/aws/aws-sdk-go/service/sqs"
7-
"github.com/common-go/mq"
87
)
98

109
type Receiver struct {
@@ -13,6 +12,7 @@ type Receiver struct {
1312
AckOnConsume bool
1413
VisibilityTimeout int64 // should be 20 (seconds)
1514
WaitTimeSeconds int64 // should be 0
15+
LogError func(ctx context.Context, msg string)
1616
}
1717

1818
func NewReceiverByQueueName(client *sqs.SQS, queueName string, ackOnConsume bool, visibilityTimeout int64, waitTimeSeconds int64) (*Receiver, error) {
@@ -27,53 +27,123 @@ func NewReceiver(client *sqs.SQS, queueURL string, ackOnConsume bool, visibility
2727
return &Receiver{Client: client, QueueURL: &queueURL, AckOnConsume: ackOnConsume, VisibilityTimeout: visibilityTimeout, WaitTimeSeconds: waitTimeSeconds}
2828
}
2929

30-
func (c *Receiver) Receive(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
30+
func (c *Receiver) Receive(ctx context.Context, handle func(context.Context, []byte, map[string]string)) {
3131
var result *sqs.ReceiveMessageOutput
3232
var er1 error
33-
loop:
34-
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
35-
AttributeNames: []*string{
36-
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
37-
},
38-
MessageAttributeNames: []*string{
39-
aws.String(sqs.QueueAttributeNameAll),
40-
},
41-
QueueUrl: c.QueueURL,
42-
MaxNumberOfMessages: aws.Int64(1),
43-
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
44-
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
45-
})
46-
if er1 != nil {
47-
handle(ctx, nil, er1)
48-
} else {
49-
if len(result.Messages) > 0 {
50-
m := result.Messages[0]
51-
data := []byte(*m.Body)
52-
attributes := PtrToMap(m.Attributes)
53-
message := mq.Message{
54-
Id: *m.MessageId,
55-
Data: data,
56-
Attributes: attributes,
57-
Raw: m,
33+
loop:
34+
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
35+
AttributeNames: []*string{
36+
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
37+
},
38+
MessageAttributeNames: []*string{
39+
aws.String(sqs.QueueAttributeNameAll),
40+
},
41+
QueueUrl: c.QueueURL,
42+
MaxNumberOfMessages: aws.Int64(1),
43+
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
44+
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
45+
})
46+
if er1 != nil {
47+
c.LogError(ctx, "Error when subscribe: "+er1.Error())
48+
} else {
49+
if len(result.Messages) > 0 {
50+
m := result.Messages[0]
51+
data := []byte(*m.Body)
52+
attributes := PtrToMap(m.Attributes)
53+
if c.AckOnConsume {
54+
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
55+
QueueUrl: c.QueueURL,
56+
ReceiptHandle: result.Messages[0].ReceiptHandle,
57+
})
58+
if er2 != nil {
59+
c.LogError(ctx, "Error when delete message: "+er2.Error())
60+
} else {
61+
handle(ctx, data, attributes)
5862
}
59-
if c.AckOnConsume {
60-
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
61-
QueueUrl: c.QueueURL,
62-
ReceiptHandle: result.Messages[0].ReceiptHandle,
63-
})
64-
if er2 != nil {
65-
handle(ctx, nil, er2)
66-
} else {
67-
handle(ctx, &message, nil)
68-
}
63+
} else {
64+
handle(ctx, data, attributes)
65+
}
66+
}
67+
}
68+
goto loop
69+
}
70+
func (c *Receiver) ReceiveBody(ctx context.Context, handle func(context.Context, []byte)) {
71+
var result *sqs.ReceiveMessageOutput
72+
var er1 error
73+
loop:
74+
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
75+
AttributeNames: []*string{
76+
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
77+
},
78+
MessageAttributeNames: []*string{
79+
aws.String(sqs.QueueAttributeNameAll),
80+
},
81+
QueueUrl: c.QueueURL,
82+
MaxNumberOfMessages: aws.Int64(1),
83+
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
84+
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
85+
})
86+
if er1 != nil {
87+
c.LogError(ctx, "Error when subscribe: "+er1.Error())
88+
} else {
89+
if len(result.Messages) > 0 {
90+
m := result.Messages[0]
91+
data := []byte(*m.Body)
92+
if c.AckOnConsume {
93+
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
94+
QueueUrl: c.QueueURL,
95+
ReceiptHandle: result.Messages[0].ReceiptHandle,
96+
})
97+
if er2 != nil {
98+
c.LogError(ctx, "Error when delete message: "+er2.Error())
6999
} else {
70-
handle(ctx, &message, nil)
100+
handle(ctx, data)
71101
}
102+
} else {
103+
handle(ctx, data)
72104
}
73105
}
74-
goto loop
106+
}
107+
goto loop
108+
}
109+
func (c *Receiver) ReceiveMessage(ctx context.Context, handle func(context.Context, *sqs.Message)) {
110+
var result *sqs.ReceiveMessageOutput
111+
var er1 error
112+
loop:
113+
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
114+
AttributeNames: []*string{
115+
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
116+
},
117+
MessageAttributeNames: []*string{
118+
aws.String(sqs.QueueAttributeNameAll),
119+
},
120+
QueueUrl: c.QueueURL,
121+
MaxNumberOfMessages: aws.Int64(1),
122+
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
123+
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
124+
})
125+
if er1 != nil {
126+
c.LogError(ctx, "Error when subscribe: "+er1.Error())
127+
} else {
128+
if len(result.Messages) > 0 {
129+
m := result.Messages[0]
130+
if c.AckOnConsume {
131+
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
132+
QueueUrl: c.QueueURL,
133+
ReceiptHandle: result.Messages[0].ReceiptHandle,
134+
})
135+
if er2 != nil {
136+
c.LogError(ctx, "Error when delete message: "+er2.Error())
137+
} else {
138+
handle(ctx, m)
139+
}
140+
} else {
141+
handle(ctx, m)
142+
}
143+
}
144+
}
145+
goto loop
75146
}
76-
77147
func PtrToMap(m map[string]*string) map[string]string {
78148
attributes := make(map[string]string)
79149
for k, v := range m {

sender.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,42 @@ type Sender struct {
1212
DelaySeconds *int64 //could be 10
1313
}
1414

15-
func NewSenderByQueueName(client *sqs.SQS, queueName string, delaySeconds *int64) (*Sender, error) {
15+
func NewSenderByQueueName(client *sqs.SQS, queueName string, delaySeconds int64) (*Sender, error) {
1616
queueUrl, err := GetQueueUrl(client, queueName)
1717
if err != nil {
1818
return nil, err
1919
}
2020
return NewSender(client, queueUrl, delaySeconds), nil
2121
}
2222

23-
func NewSender(client *sqs.SQS, queueURL string, delaySeconds *int64) *Sender {
24-
return &Sender{Client: client, QueueURL: &queueURL, DelaySeconds: delaySeconds}
23+
func NewSender(client *sqs.SQS, queueURL string, delaySeconds int64) *Sender {
24+
return &Sender{Client: client, QueueURL: &queueURL, DelaySeconds: &delaySeconds}
2525
}
26-
27-
func (p *Sender) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
26+
func (p *Sender) Send(ctx context.Context, data []byte, attributes map[string]string) error {
2827
attrs := MapToAttributes(attributes)
2928
s := string(data)
30-
result, err := p.Client.SendMessage(&sqs.SendMessageInput{
29+
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
3130
DelaySeconds: p.DelaySeconds,
3231
MessageAttributes: attrs,
3332
MessageBody: aws.String(s),
3433
QueueUrl: p.QueueURL,
3534
})
35+
return err
36+
}
37+
func (p *Sender) SendBody(ctx context.Context, data []byte) error {
38+
s := string(data)
39+
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
40+
DelaySeconds: p.DelaySeconds,
41+
MessageBody: aws.String(s),
42+
QueueUrl: p.QueueURL,
43+
})
44+
return err
45+
}
46+
func (p *Sender) SendMessage(msg *sqs.SendMessageInput) (string, error) {
47+
if msg == nil {
48+
return "", nil
49+
}
50+
result, err := p.Client.SendMessage(msg)
3651
if result != nil && result.MessageId != nil {
3752
return *result.MessageId, err
3853
} else {

sqs.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99

1010
type (
1111
Config struct {
12-
Region string `mapstructure:"region" json:"region,omitempty" gorm:"column:region" bson:"region,omitempty" dynamodbav:"region,omitempty" firestore:"region,omitempty"`
13-
AccessKeyID string `mapstructure:"access_key_id" json:"accessKeyID,omitempty" gorm:"column:accessKeyID" bson:"accessKeyID,omitempty" dynamodbav:"accessKeyID,omitempty" firestore:"accessKeyID,omitempty"`
14-
SecretAccessKey string `mapstructure:"secret_access_key" json:"secretAccessKey,omitempty" gorm:"column:secretaccesskey" bson:"secretAccessKey,omitempty" dynamodbav:"secretAccessKey,omitempty" firestore:"secretAccessKey,omitempty"`
15-
QueueName string `mapstructure:"queue_name" json:"queueName,omitempty" gorm:"column:token" bson:"queueName,omitempty" dynamodbav:"queueName,omitempty" firestore:"queueName,omitempty"`
12+
Region string `yaml:"region" mapstructure:"region" json:"region,omitempty" gorm:"column:region" bson:"region,omitempty" dynamodbav:"region,omitempty" firestore:"region,omitempty"`
13+
AccessKeyID string `yaml:"access_key_id" mapstructure:"access_key_id" json:"accessKeyID,omitempty" gorm:"column:accessKeyID" bson:"accessKeyID,omitempty" dynamodbav:"accessKeyID,omitempty" firestore:"accessKeyID,omitempty"`
14+
SecretAccessKey string `yaml:"secret_access_key" mapstructure:"secret_access_key" json:"secretAccessKey,omitempty" gorm:"column:secretaccesskey" bson:"secretAccessKey,omitempty" dynamodbav:"secretAccessKey,omitempty" firestore:"secretAccessKey,omitempty"`
15+
QueueName string `yaml:"a" mapstructure:"queue_name" json:"queueName,omitempty" gorm:"column:token" bson:"queueName,omitempty" dynamodbav:"queueName,omitempty" firestore:"queueName,omitempty"`
1616
}
1717
)
1818

@@ -35,4 +35,4 @@ func Connect(config Config) (*sqs.SQS, error) {
3535

3636
func ConnectWithSession(session *session.Session) *sqs.SQS {
3737
return sqs.New(session)
38-
}
38+
}

0 commit comments

Comments
 (0)