Skip to content

Commit 3c56d12

Browse files
committed
implement consumer endpoints
1 parent a9b6b52 commit 3c56d12

File tree

10 files changed

+414
-0
lines changed

10 files changed

+414
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea/
2+
rest-gateway

cache/cache.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
"gorm.io/driver/postgres"
6+
"gorm.io/gorm"
7+
"rest-gateway/conf"
8+
"rest-gateway/logger"
9+
"sync"
10+
)
11+
12+
type Message struct {
13+
ID uint `gorm:"primary key;autoIncrement" json:"id"`
14+
StationName string `json:"-"`
15+
ConsumerName string `json:"-"`
16+
Username string `json:"-"`
17+
Data string `json:"data"`
18+
}
19+
20+
var messageCacheLock sync.Mutex
21+
22+
type MessageCache interface {
23+
GetMessages(stationName, consumerName string, batchSize int) ([]Message, error)
24+
GetMessageById(stationName, consumerName string, id uint) (*Message, error)
25+
AddMessage(message *Message) error
26+
RemoveMessage(message *Message) error
27+
}
28+
29+
func New(config conf.Configuration, log *logger.Logger) MessageCache {
30+
if config.USE_DB_CACHE {
31+
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=%v TimeZone=%s",
32+
config.DB_HOST, config.DB_USER, config.DB_PASSWORD, config.DB_NAME, config.DB_PORT, config.DB_SSLMODE, config.DB_TIME_ZONE)
33+
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
34+
if err != nil {
35+
log.Errorf("MessageCache.New - %s", err.Error())
36+
log.Noticef("Defaulting to in-memory cache")
37+
} else {
38+
repo := Repository{db}
39+
err := repo.MigrateMessages()
40+
if err != nil {
41+
log.Errorf("MessageCache.New - %s", err.Error())
42+
}
43+
return repo
44+
}
45+
}
46+
return InMemoryCache{map[string]map[string][]Message{}}
47+
}

cache/database.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package cache
2+
3+
import "gorm.io/gorm"
4+
5+
type Repository struct {
6+
DB *gorm.DB
7+
}
8+
9+
func (r Repository) GetMessageById(stationName, consumerName string, id uint) (*Message, error) {
10+
message := Message{}
11+
err := r.DB.Model(&Message{}).Find(&message, "id = ? AND station_name = ? AND consumer_name = ?", id, stationName, consumerName).Error
12+
return &message, err
13+
}
14+
15+
func (r Repository) GetMessages(stationName, consumerName string, batchSize int) ([]Message, error) {
16+
messages := []Message{}
17+
err := r.DB.Model(&Message{}).Limit(batchSize).Find(&messages, "station_name = ? AND consumer_name = ?", stationName, consumerName).Error
18+
return messages, err
19+
}
20+
21+
func (r Repository) AddMessage(message *Message) error {
22+
messageCacheLock.Lock()
23+
defer messageCacheLock.Unlock()
24+
err := r.DB.Create(message).Error
25+
return err
26+
}
27+
28+
func (r Repository) RemoveMessage(message *Message) error {
29+
messageCacheLock.Lock()
30+
defer messageCacheLock.Unlock()
31+
err := r.DB.Delete(message, message.ID).Error
32+
return err
33+
}
34+
35+
func (r Repository) MigrateMessages() error {
36+
err := r.DB.AutoMigrate(&Message{})
37+
return err
38+
}

cache/in_memory_cache.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package cache
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
)
7+
8+
type InMemoryCache struct {
9+
cache map[string]map[string][]Message
10+
}
11+
12+
func (m InMemoryCache) GetMessages(stationName, consumerName string, batchSize int) ([]Message, error) {
13+
if m.cache == nil {
14+
return nil, errors.New("In-Memory cache wasn't initialized")
15+
}
16+
stationMessages, ok := m.cache[stationName]
17+
if !ok {
18+
return []Message{}, nil
19+
}
20+
consumerMessages, ok := stationMessages[consumerName]
21+
if !ok {
22+
return []Message{}, nil
23+
}
24+
if batchSize < len(consumerMessages) {
25+
return consumerMessages[:batchSize], nil
26+
}
27+
return consumerMessages, nil
28+
}
29+
30+
func (m InMemoryCache) GetMessageById(stationName, consumerName string, id uint) (*Message, error) {
31+
if m.cache == nil {
32+
return nil, errors.New("In-Memory cache wasn't initialized")
33+
}
34+
stationMessages, ok := m.cache[stationName]
35+
if !ok {
36+
return nil, fmt.Errorf("message with id = %v not found", id)
37+
}
38+
consumerMessages, ok := stationMessages[consumerName]
39+
if !ok {
40+
return nil, fmt.Errorf("message with id = %v not found", id)
41+
}
42+
for _, msg := range consumerMessages {
43+
if msg.ID == id {
44+
return &msg, nil
45+
}
46+
}
47+
return nil, fmt.Errorf("message with id = %v not found", id)
48+
}
49+
50+
func (m InMemoryCache) AddMessage(message *Message) error {
51+
messageCacheLock.Lock()
52+
defer messageCacheLock.Unlock()
53+
if m.cache == nil {
54+
m.cache = map[string]map[string][]Message{}
55+
}
56+
if m.cache[message.StationName] == nil {
57+
m.cache[message.StationName] = map[string][]Message{}
58+
}
59+
if m.cache[message.StationName][message.ConsumerName] == nil {
60+
m.cache[message.StationName][message.ConsumerName] = []Message{}
61+
}
62+
consumerMessages := m.cache[message.StationName][message.ConsumerName]
63+
messageId := len(consumerMessages) + 1
64+
message.ID = uint(messageId)
65+
m.cache[message.StationName][message.ConsumerName] = append(consumerMessages, *message)
66+
return nil
67+
}
68+
69+
func (m InMemoryCache) RemoveMessage(message *Message) error {
70+
if m.cache == nil {
71+
return errors.New("in-memory cache wasn't initialized")
72+
}
73+
stationMessages, ok := m.cache[message.StationName]
74+
if !ok {
75+
return fmt.Errorf("no cache found for station - %s", message.StationName)
76+
}
77+
consumerNameMessages, ok := stationMessages[message.ConsumerName]
78+
if !ok {
79+
return fmt.Errorf("no cache found for consumer - %s", message.StationName)
80+
}
81+
index := -1
82+
for i, msg := range consumerNameMessages {
83+
if msg.ID == message.ID {
84+
index = i
85+
}
86+
}
87+
if index != -1 {
88+
stationMessages[message.ConsumerName] = append(consumerNameMessages[:index], consumerNameMessages[index+1:]...)
89+
return nil
90+
}
91+
return errors.New("message not found in cache")
92+
}

conf/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ type Configuration struct {
2222
ROOT_PASSWORD string
2323
DEBUG bool
2424
CLOUD_ENV bool
25+
USE_DB_CACHE bool
26+
DB_HOST string
27+
DB_USER string
28+
DB_PASSWORD string
29+
DB_NAME string
30+
DB_PORT string
31+
DB_SSLMODE bool
32+
DB_TIME_ZONE string
2533
}
2634

2735
func GetConfig() Configuration {

go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ require (
99
github.com/memphisdev/memphis.go v1.1.1
1010
github.com/nats-io/nats.go v1.25.0
1111
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
12+
gorm.io/driver/postgres v1.5.2
13+
gorm.io/gorm v1.25.4
1214
)
1315

1416
require (
@@ -20,6 +22,11 @@ require (
2022
github.com/google/uuid v1.3.0 // indirect
2123
github.com/graph-gophers/graphql-go v1.5.0 // indirect
2224
github.com/hamba/avro/v2 v2.13.0 // indirect
25+
github.com/jackc/pgpassfile v1.0.0 // indirect
26+
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
27+
github.com/jackc/pgx/v5 v5.3.1 // indirect
28+
github.com/jinzhu/inflection v1.0.0 // indirect
29+
github.com/jinzhu/now v1.1.5 // indirect
2330
github.com/json-iterator/go v1.1.12 // indirect
2431
github.com/klauspost/compress v1.16.3 // indirect
2532
github.com/leodido/go-urn v1.2.4 // indirect

go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMN
3535
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
3636
github.com/hamba/avro/v2 v2.13.0 h1:QY2uX2yvJTW0OoMKelGShvq4v1hqab6CxJrPwh0fnj0=
3737
github.com/hamba/avro/v2 v2.13.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
38+
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
39+
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
40+
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
41+
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
42+
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
43+
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
44+
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
45+
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
46+
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
47+
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
3848
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
3949
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
4050
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
@@ -149,3 +159,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
149159
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
150160
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
151161
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
162+
gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0=
163+
gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8=
164+
gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
165+
gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=

0 commit comments

Comments
 (0)