Skip to content

Commit ebeddd4

Browse files
authored
Merge pull request #34 from meero-com/feat/configurable-backend
Configurable Backend
2 parents 26ba513 + bb474be commit ebeddd4

File tree

13 files changed

+251
-57
lines changed

13 files changed

+251
-57
lines changed

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ services:
2020
AWS_REGION: eu-west-1
2121
AWS_ENDPOINT_URL: http://localstack:4566/
2222
SQS_SOURCE_QUEUE: input-queue
23-
SQS_DESTINATION_QUEUE: output-queue
23+
SQS_DESTINATION_QUEUE: result-queue
2424
DDB_REQUEST_TABLE: request-ddb-table
2525
DDB_RESPONSE_TABLE: response-ddb-table
2626
build:

dummy-service/envrc.dist

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
export SQS_SOURCE_QUEUE=input-queue
2-
export SQS_DESTINATION_QUEUE=output-queue
2+
export SQS_DESTINATION_QUEUE=result-queue
33

44
export AWS_ACCESS_KEY_ID=test
55
export AWS_SECRET_ACCESS_KEY=test

e2e/test_ddb.sh

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#!/usr/bin/env bash
2+
3+
set +e # Disable exit on error, we want to clean up after an error
4+
5+
export AWS_ACCESS_KEY_ID="test"
6+
export AWS_SECRET_ACCESS_KEY="test"
7+
export AWS_DEFAULT_REGION="eu-west-1"
8+
export AWS_ENDPOINT_URL="http://localhost:4566"
9+
10+
uuid=$(uuidgen)
11+
12+
# Background process: posts item to DynamoDB to trigger the response
13+
(
14+
# Generate random number between 1-5
15+
sleep_time=$(( (RANDOM % 5) + 1 ))
16+
echo "Will execute AWS command after ${sleep_time} seconds..."
17+
sleep ${sleep_time}
18+
aws dynamodb put-item \
19+
--endpoint-url http://localhost:4566 \
20+
--table-name response-ddb-table \
21+
--item '{"uuid": {"S": "'$uuid'"}, "payload": { "M": { "name": { "S": "response payload from service" } } }}'
22+
echo "AWS command executed."
23+
) &
24+
background_pid=$!
25+
26+
echo "Request UUID: $uuid"
27+
28+
curl -XPOST --retry 5 --fail-with-body \
29+
localhost:8080/api/process \
30+
-H "Content-Type: application/json" \
31+
-d "{\"uuid\": \"$uuid\", \"payload\": {\"name\": \"default\", \"timeout\": 20}}"
32+
curl_exit_status=$?
33+
34+
if [ $curl_exit_status -ne 0 ]; then
35+
echo "\nCurl failed with status $curl_exit_status. Killing background process..."
36+
kill $background_pid
37+
wait $background_pid 2>/dev/null || true
38+
echo "Background process terminated."
39+
echo "Test failed."
40+
exit $curl_exit_status
41+
fi
42+
43+
echo "\nWaiting for AWS command to complete..."
44+
wait $background_pid
45+
echo "Test successful."

e2e/test_sqs.sh

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/usr/bin/env bash
2+
3+
set +e # Disable exit on error, we want to clean up after an error
4+
5+
export AWS_ACCESS_KEY_ID="test"
6+
export AWS_SECRET_ACCESS_KEY="test"
7+
export AWS_DEFAULT_REGION="eu-west-1"
8+
export AWS_ENDPOINT_URL="http://localhost:4566"
9+
10+
uuid=$(uuidgen)
11+
12+
# get the queue URL
13+
queue_url=$(aws sqs get-queue-url --queue-name result-queue | jq -r '.QueueUrl')
14+
echo "Queue URL: $queue_url"
15+
16+
# Background process: posts item to SQS queue to trigger the response
17+
(
18+
# Generate random number between 1-5
19+
sleep_time=$(( (RANDOM % 5) + 1 ))
20+
echo "Will execute AWS command after ${sleep_time} seconds..."
21+
sleep ${sleep_time}
22+
aws sqs send-message \
23+
--endpoint-url http://localhost:4566 \
24+
--queue-url http://localhost:4566/000000000000/result-queue \
25+
--message-body "{\"uuid\": \"$uuid\", \"payload\": {\"name\": \"response payload from service\"}}"
26+
echo "AWS command executed."
27+
) &
28+
background_pid=$!
29+
30+
echo "Request UUID: $uuid"
31+
32+
curl -XPOST --retry 5 --fail-with-body \
33+
localhost:8080/api/process \
34+
-H "Content-Type: application/json" \
35+
-d "{\"uuid\": \"$uuid\", \"payload\": {\"name\": \"default\", \"timeout\": 20}}"
36+
curl_exit_status=$?
37+
38+
if [ $curl_exit_status -ne 0 ]; then
39+
echo "\nCurl failed with status $curl_exit_status. Killing background process..."
40+
kill $background_pid
41+
wait $background_pid 2>/dev/null || true
42+
echo "Background process terminated."
43+
echo "Test failed."
44+
exit $curl_exit_status
45+
fi
46+
47+
echo "\nWaiting for AWS command to complete..."
48+
wait $background_pid
49+
echo "Test successful."

pkg/api/process.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,40 @@
11
package api
22

33
import (
4-
"context"
54
"log"
65

7-
"github.com/google/uuid"
86
"github.com/meero-com/hmb-proxy/pkg/aws"
97
"github.com/meero-com/hmb-proxy/pkg/config"
108
"github.com/meero-com/hmb-proxy/pkg/pollers"
9+
"github.com/meero-com/hmb-proxy/pkg/producers"
1110
)
1211

1312
func process(ch chan string, p requestPayload) {
14-
ddb := aws.NewDdbCoordinator()
15-
uuid := uuid.NewString()
16-
requestTable := config.GetConfig("ddb.request_table").(string)
13+
var producer producers.Producer
14+
var poller pollers.Poller
1715

18-
ddbPayload := aws.DdbPayload{
19-
Name: p.Payload.Name,
20-
}
21-
ddbi := aws.DdbItem{
22-
Uuid: p.Uuid,
23-
Payload: ddbPayload,
24-
}
25-
26-
_, err := ddb.Put(context.Background(), requestTable, ddbi)
16+
backendType := config.GetConfig("backend_type").(string)
2717

28-
if err != nil {
29-
log.Fatalf("Failed to put item %s into table %s", uuid, requestTable)
18+
if backendType == "ddb" {
19+
ddb := aws.NewDdbCoordinator()
20+
producer = &producers.DdbProducer{
21+
Ddb: ddb,
22+
}
23+
poller = &pollers.DdbPoller{
24+
Ddb: ddb,
25+
}
26+
} else if backendType == "sqs" {
27+
sqs := aws.NewSqsCoordinator()
28+
producer = &producers.SqsProducer{
29+
Sqs: sqs,
30+
}
31+
poller = &pollers.SqsPoller{
32+
Sqs: sqs,
33+
}
34+
} else {
35+
log.Fatalf("Unsupported backend type: %s", backendType)
3036
}
3137

32-
pollers.PollDdb(ch, p.Uuid, ddb)
38+
producer.Produce(p.Uuid, p.Payload.Name)
39+
poller.Poll(ch, p.Uuid)
3340
}

pkg/config/config.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ env: dev
33
server:
44
port: 8080
55

6+
backend_type: sqs
7+
68
aws:
79
access_key_id: default
810
secret_access_key: default
911
region: eu-west-1
1012
endpoint_url: http://localstack:4566/
1113
sqs:
1214
source_queue: input-queue
13-
destination_queue: output-queue
15+
destination_queue: result-queue
16+
ddb:
17+
request-table: request-ddb-table
18+
response-table: response-ddb-table

pkg/pollers/ddb_poller.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ import (
1111
"github.com/meero-com/hmb-proxy/pkg/config"
1212
)
1313

14-
const (
15-
pollInterval = 6 * time.Second
16-
)
14+
type DdbPoller struct {
15+
Ddb aws.DdbCoordinator
16+
}
1717

18-
func PollDdb(ch chan string, uuid string, ddb aws.DdbCoordinator) {
18+
func (p *DdbPoller) Poll(ch chan string, uuid string) {
19+
ddb := p.Ddb
1920
responseTable := config.GetConfig("ddb.response_table").(string)
2021
ctx := context.Background()
2122

pkg/pollers/model.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,11 @@
11
package pollers
2+
3+
import "time"
4+
5+
type Poller interface {
6+
Poll(ch chan string, uuid string)
7+
}
8+
9+
const (
10+
pollInterval = 6 * time.Second
11+
)

pkg/pollers/poller.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

pkg/pollers/sqs_poller.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package pollers
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"time"
8+
9+
"github.com/meero-com/hmb-proxy/pkg/aws"
10+
"github.com/meero-com/hmb-proxy/pkg/config"
11+
)
12+
13+
type SqsPoller struct {
14+
Sqs aws.SqsCoordinator
15+
}
16+
17+
func (p *SqsPoller) Poll(ch chan string, uuid string) {
18+
c := p.Sqs
19+
destinationQueue := config.GetConfig("sqs.destination_queue").(string)
20+
21+
log.Printf("Start polling\n")
22+
23+
for {
24+
messages, _ := c.GetMessages(context.Background(), destinationQueue, 10, 10)
25+
log.Printf("got %d messages \n", len(messages))
26+
for idx, message := range messages {
27+
log.Printf("got messages #%d id=%s %s\n", idx, *message.MessageId, *message.Body)
28+
29+
// TODO: check if message attributes contain uuid, if not reschedule
30+
31+
err := c.AckMessage(context.Background(), destinationQueue, message.ReceiptHandle)
32+
if err != nil {
33+
log.Fatalf("Failed to acknowledge message %s", *message.MessageId)
34+
}
35+
36+
si, err := json.Marshal(*message.Body)
37+
if err != nil {
38+
log.Fatalf("Failed to Marshal item: %s", *message.Body)
39+
}
40+
ch <- string(si)
41+
close(ch)
42+
return
43+
}
44+
45+
time.Sleep(pollInterval)
46+
}
47+
}

0 commit comments

Comments
 (0)