Skip to content

Commit bb474be

Browse files
author
Oliver Allweyer
committed
feat: sqs producer and poller
1 parent 0be58e7 commit bb474be

File tree

10 files changed

+141
-42
lines changed

10 files changed

+141
-42
lines changed

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ services:
2020
AWS_REGION: eu-west-1
2121
AWS_ENDPOINT_URL: http://localstack:4566/
2222
SQS_SOURCE_QUEUE: input-queue
23-
SQS_DESTINATION_QUEUE: output-queue
23+
SQS_DESTINATION_QUEUE: result-queue
2424
DDB_REQUEST_TABLE: request-ddb-table
2525
DDB_RESPONSE_TABLE: response-ddb-table
2626
build:

dummy-service/envrc.dist

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
export SQS_SOURCE_QUEUE=input-queue
2-
export SQS_DESTINATION_QUEUE=output-queue
2+
export SQS_DESTINATION_QUEUE=result-queue
33

44
export AWS_ACCESS_KEY_ID=test
55
export AWS_SECRET_ACCESS_KEY=test

e2e/test_sqs.sh

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/usr/bin/env bash
2+
3+
set +e # Disable exit on error, we want to clean up after an error
4+
5+
export AWS_ACCESS_KEY_ID="test"
6+
export AWS_SECRET_ACCESS_KEY="test"
7+
export AWS_DEFAULT_REGION="eu-west-1"
8+
export AWS_ENDPOINT_URL="http://localhost:4566"
9+
10+
uuid=$(uuidgen)
11+
12+
# get the queue URL
13+
queue_url=$(aws sqs get-queue-url --queue-name result-queue | jq -r '.QueueUrl')
14+
echo "Queue URL: $queue_url"
15+
16+
# Background process: posts item to SQS queue to trigger the response
17+
(
18+
# Generate random number between 1-5
19+
sleep_time=$(( (RANDOM % 5) + 1 ))
20+
echo "Will execute AWS command after ${sleep_time} seconds..."
21+
sleep ${sleep_time}
22+
aws sqs send-message \
23+
--endpoint-url http://localhost:4566 \
24+
--queue-url http://localhost:4566/000000000000/result-queue \
25+
--message-body "{\"uuid\": \"$uuid\", \"payload\": {\"name\": \"response payload from service\"}}"
26+
echo "AWS command executed."
27+
) &
28+
background_pid=$!
29+
30+
echo "Request UUID: $uuid"
31+
32+
curl -XPOST --retry 5 --fail-with-body \
33+
localhost:8080/api/process \
34+
-H "Content-Type: application/json" \
35+
-d "{\"uuid\": \"$uuid\", \"payload\": {\"name\": \"default\", \"timeout\": 20}}"
36+
curl_exit_status=$?
37+
38+
if [ $curl_exit_status -ne 0 ]; then
39+
echo "\nCurl failed with status $curl_exit_status. Killing background process..."
40+
kill $background_pid
41+
wait $background_pid 2>/dev/null || true
42+
echo "Background process terminated."
43+
echo "Test failed."
44+
exit $curl_exit_status
45+
fi
46+
47+
echo "\nWaiting for AWS command to complete..."
48+
wait $background_pid
49+
echo "Test successful."

pkg/api/process.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@ func process(ch chan string, p requestPayload) {
2323
poller = &pollers.DdbPoller{
2424
Ddb: ddb,
2525
}
26+
} else if backendType == "sqs" {
27+
sqs := aws.NewSqsCoordinator()
28+
producer = &producers.SqsProducer{
29+
Sqs: sqs,
30+
}
31+
poller = &pollers.SqsPoller{
32+
Sqs: sqs,
33+
}
2634
} else {
27-
// TODO: sqs producer and poller
28-
log.Fatalf("SQS backend not yet implemented")
35+
log.Fatalf("Unsupported backend type: %s", backendType)
2936
}
3037

3138
producer.Produce(p.Uuid, p.Payload.Name)

pkg/config/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ env: dev
33
server:
44
port: 8080
55

6-
backend_type: ddb
6+
backend_type: sqs
77

88
aws:
99
access_key_id: default

pkg/pollers/ddb_poller.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@ import (
1111
"github.com/meero-com/hmb-proxy/pkg/config"
1212
)
1313

14-
const (
15-
pollInterval = 6 * time.Second
16-
)
17-
1814
type DdbPoller struct {
1915
Ddb aws.DdbCoordinator
2016
}

pkg/pollers/model.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package pollers
22

3+
import "time"
4+
35
type Poller interface {
46
Poll(ch chan string, uuid string)
57
}
8+
9+
const (
10+
pollInterval = 6 * time.Second
11+
)

pkg/pollers/poller.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

pkg/pollers/sqs_poller.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package pollers
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"time"
8+
9+
"github.com/meero-com/hmb-proxy/pkg/aws"
10+
"github.com/meero-com/hmb-proxy/pkg/config"
11+
)
12+
13+
type SqsPoller struct {
14+
Sqs aws.SqsCoordinator
15+
}
16+
17+
func (p *SqsPoller) Poll(ch chan string, uuid string) {
18+
c := p.Sqs
19+
destinationQueue := config.GetConfig("sqs.destination_queue").(string)
20+
21+
log.Printf("Start polling\n")
22+
23+
for {
24+
messages, _ := c.GetMessages(context.Background(), destinationQueue, 10, 10)
25+
log.Printf("got %d messages \n", len(messages))
26+
for idx, message := range messages {
27+
log.Printf("got messages #%d id=%s %s\n", idx, *message.MessageId, *message.Body)
28+
29+
// TODO: check if message attributes contain uuid, if not reschedule
30+
31+
err := c.AckMessage(context.Background(), destinationQueue, message.ReceiptHandle)
32+
if err != nil {
33+
log.Fatalf("Failed to acknowledge message %s", *message.MessageId)
34+
}
35+
36+
si, err := json.Marshal(*message.Body)
37+
if err != nil {
38+
log.Fatalf("Failed to Marshal item: %s", *message.Body)
39+
}
40+
ch <- string(si)
41+
close(ch)
42+
return
43+
}
44+
45+
time.Sleep(pollInterval)
46+
}
47+
}

pkg/producers/sqs_producer.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package producers
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
"github.com/meero-com/hmb-proxy/pkg/aws"
8+
"github.com/meero-com/hmb-proxy/pkg/config"
9+
)
10+
11+
type SqsProducer struct {
12+
Sqs aws.SqsCoordinator
13+
}
14+
15+
func (p *SqsProducer) Produce(uuid string, payload string) {
16+
sqs := p.Sqs
17+
queueUrl := config.GetConfig("sqs.source_queue").(string)
18+
19+
// TODO: forge sqs payload with uuid in message attributes
20+
sqsPayload := payload
21+
22+
err := sqs.PutMessage(context.Background(), queueUrl, &sqsPayload)
23+
24+
if err != nil {
25+
log.Fatalf("Failed to put message %s to sqs queue %s", uuid, queueUrl)
26+
}
27+
}

0 commit comments

Comments
 (0)