Skip to content

Commit 0eb6f4f

Browse files
author
Armin
committed
add RabbitMQ integration with connection and consumer setup
1 parent f99f7db commit 0eb6f4f

File tree

7 files changed

+314
-3
lines changed

7 files changed

+314
-3
lines changed

app/app.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"sms-gateway/config"
77
"sms-gateway/pkg/db"
8+
amqp "sms-gateway/pkg/queue"
89

910
_ "github.com/go-sql-driver/mysql"
1011
"github.com/labstack/echo/v4"
@@ -14,12 +15,14 @@ var (
1415
Echo *echo.Echo
1516
Logger *slog.Logger
1617
DB *db.DB
18+
Rabbit *amqp.RabbitConnection
1719
)
1820

1921
func Init() {
2022
initLogger()
2123
initDB()
2224
initEcho()
25+
iniRabbit()
2326
}
2427

2528
func initLogger() {
@@ -42,6 +45,14 @@ func initDB() {
4245
//TODO: migrate
4346
}
4447

48+
func iniRabbit() {
49+
var err error
50+
Rabbit, err = amqp.NewRabbitConnection(config.RabbitmqUri)
51+
if err != nil {
52+
panic(err)
53+
}
54+
}
55+
4556
func initEcho() {
4657
Echo = echo.New()
4758
}

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ var (
1414
DBHost string
1515
DBPort int
1616
DBName string
17+
RabbitmqUri string
1718
)
1819

1920
func init() {
@@ -28,4 +29,5 @@ func init() {
2829
}
2930
DBPort = port
3031
DBName = env.RequiredNotEmpty("DB_NAME")
32+
RabbitmqUri = env.RequiredNotEmpty("RABBIT_URI")
3133
}

docker-compose.yml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,24 @@ services:
2020
timeout: 5s
2121
retries: 5
2222

23+
rabbitmq:
24+
image: rabbitmq:3.13-management
25+
container_name: sms_gateway_rabbitmq
26+
restart: unless-stopped
27+
environment:
28+
RABBITMQ_DEFAULT_USER: rabbit_user
29+
RABBITMQ_DEFAULT_PASS: rabbit_pass
30+
ports:
31+
- "5672:5672"
32+
- "15672:15672"
33+
volumes:
34+
- rabbitmq_data:/var/lib/rabbitmq
35+
healthcheck:
36+
test: ["CMD", "rabbitmq-diagnostics", "ping"]
37+
interval: 10s
38+
timeout: 5s
39+
retries: 5
40+
2341
volumes:
2442
mysql_data:
25-
43+
rabbitmq_data:

go.mod

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,33 @@ module sms-gateway
33
go 1.25
44

55
require (
6+
github.com/go-sql-driver/mysql v1.9.3
7+
github.com/google/uuid v1.6.0
68
github.com/jmoiron/sqlx v1.4.0
79
github.com/joho/godotenv v1.5.1
810
github.com/labstack/echo/v4 v4.14.0
11+
github.com/prometheus/client_golang v1.23.2
12+
github.com/rabbitmq/amqp091-go v1.10.0
913
)
1014

1115
require (
1216
filippo.io/edwards25519 v1.1.0 // indirect
13-
github.com/go-sql-driver/mysql v1.9.3 // indirect
17+
github.com/beorn7/perks v1.0.1 // indirect
18+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
19+
github.com/kr/text v0.2.0 // indirect
1420
github.com/labstack/gommon v0.4.2 // indirect
1521
github.com/mattn/go-colorable v0.1.14 // indirect
1622
github.com/mattn/go-isatty v0.0.20 // indirect
23+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
24+
github.com/prometheus/client_model v0.6.2 // indirect
25+
github.com/prometheus/common v0.66.1 // indirect
26+
github.com/prometheus/procfs v0.16.1 // indirect
1727
github.com/valyala/bytebufferpool v1.0.0 // indirect
1828
github.com/valyala/fasttemplate v1.2.2 // indirect
29+
go.yaml.in/yaml/v2 v2.4.2 // indirect
1930
golang.org/x/crypto v0.46.0 // indirect
2031
golang.org/x/net v0.48.0 // indirect
2132
golang.org/x/sys v0.39.0 // indirect
2233
golang.org/x/text v0.32.0 // indirect
34+
google.golang.org/protobuf v1.36.8 // indirect
2335
)

go.sum

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
11
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
22
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
3+
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
4+
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
5+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
6+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
38
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
49
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5-
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
610
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
711
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
812
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
13+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
14+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
15+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
16+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
917
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
1018
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
1119
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
1220
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
21+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
22+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
23+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
24+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1325
github.com/labstack/echo/v4 v4.14.0 h1:+tiMrDLxwv6u0oKtD03mv+V1vXXB3wCqPHJqPuIe+7M=
1426
github.com/labstack/echo/v4 v4.14.0/go.mod h1:xmw1clThob0BSVRX1CRQkGQ/vjwcpOMjQZSZa9fKA/c=
1527
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
@@ -22,14 +34,32 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
2234
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
2335
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
2436
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
37+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
38+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
2539
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2640
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
41+
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
42+
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
43+
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
44+
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
45+
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
46+
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
47+
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
48+
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
49+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
50+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
51+
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
52+
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
2753
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
2854
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
2955
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
3056
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
3157
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
3258
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
59+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
60+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
61+
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
62+
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
3363
golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU=
3464
golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0=
3565
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
@@ -39,5 +69,10 @@ golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
3969
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
4070
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
4171
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
72+
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
73+
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
74+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
75+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
76+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
4277
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
4378
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/queue/rabbitmq.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"github.com/rabbitmq/amqp091-go"
11+
)
12+
13+
type RabbitConnection struct {
14+
Conn *amqp091.Connection
15+
}
16+
17+
func NewRabbitConnection(rabbitURI string) (*RabbitConnection, error) {
18+
cfg := amqp091.Config{
19+
Properties: amqp091.NewConnectionProperties(),
20+
}
21+
22+
conn, err := amqp091.DialConfig(rabbitURI, cfg)
23+
if err != nil {
24+
slog.Error("cannot connect to rabbit", "err", err)
25+
return nil, err
26+
}
27+
28+
return &RabbitConnection{Conn: conn}, nil
29+
}
30+
31+
func (rp *RabbitConnection) PublishContext(ctx context.Context, exchange string, key string, msg []byte) error {
32+
ch, err := rp.Conn.Channel()
33+
if err != nil {
34+
slog.Error("cannot create channel from rabbit mq connection", "err", err)
35+
return err
36+
}
37+
defer func() {
38+
err := ch.Close()
39+
if err != nil {
40+
slog.Error("cannot close channel from rabbit mq connection", "err", err)
41+
}
42+
}()
43+
44+
err = ch.PublishWithContext(ctx, exchange, key, false, false, amqp091.Publishing{
45+
Timestamp: time.Now(),
46+
Body: msg,
47+
})
48+
if err != nil {
49+
return err
50+
}
51+
return nil
52+
}
53+
54+
func (rp *RabbitConnection) ConsumeContext(ctx context.Context, appName string, queueName string, routingKey string, exchangeName string, prefetch int,
55+
) (<-chan amqp091.Delivery, error) {
56+
ch, err := rp.Conn.Channel()
57+
if err != nil {
58+
slog.Error("cannot create channel from rabbit mq connection", "err", err)
59+
return nil, err
60+
}
61+
62+
_, err = ch.QueueDeclare(queueName, true, false, false, false, amqp091.Table{})
63+
if err != nil {
64+
slog.Error("cannot declare rabbit queue", "err", err)
65+
return nil, err
66+
}
67+
68+
err = ch.QueueBind(queueName, routingKey, exchangeName, false, amqp091.Table{})
69+
if err != nil {
70+
slog.Error("cannot bind rabbit queue", "err", err)
71+
return nil, err
72+
}
73+
if prefetch != 0 {
74+
err = ch.Qos(
75+
prefetch, // prefetch count
76+
0, // prefetch size
77+
false, // global
78+
)
79+
80+
if err != nil {
81+
slog.Error("cannot set qos (prefetch) rabbit queue", "err", err)
82+
return nil, err
83+
}
84+
}
85+
86+
delivery, err := ch.ConsumeWithContext(ctx, queueName, fmt.Sprintf("consumer-%s-%s", appName, uuid.NewString()),
87+
false,
88+
false,
89+
false,
90+
false,
91+
amqp091.Table{})
92+
if err != nil {
93+
slog.Error("cannot consume rabbit queue", "err", err)
94+
return nil, err
95+
}
96+
97+
return delivery, nil
98+
}

0 commit comments

Comments
 (0)