Skip to content

Commit 9911e71

Browse files
committed
implement message acknowledgement
1 parent 3c56d12 commit 9911e71

File tree

6 files changed

+9
-5
lines changed

6 files changed

+9
-5
lines changed

cache/cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ func New(config conf.Configuration, log *logger.Logger) MessageCache {
3333
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
3434
if err != nil {
3535
log.Errorf("MessageCache.New - %s", err.Error())
36-
log.Noticef("Defaulting to in-memory cache")
3736
} else {
3837
repo := Repository{db}
3938
err := repo.MigrateMessages()
4039
if err != nil {
4140
log.Errorf("MessageCache.New - %s", err.Error())
4241
}
42+
log.Noticef("Using database for message cache")
4343
return repo
4444
}
4545
}
46+
log.Noticef("Defaulting to in-memory cache")
4647
return InMemoryCache{map[string]map[string][]Message{}}
4748
}

conf/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Configuration struct {
2828
DB_PASSWORD string
2929
DB_NAME string
3030
DB_PORT string
31-
DB_SSLMODE bool
31+
DB_SSLMODE string
3232
DB_TIME_ZONE string
3333
}
3434

handlers/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func getQueryParamValue(urlString, paramName string) (string, error) {
185185
}
186186

187187
func getConsumerNameParameterValue(urlString, defaultValue string) string {
188-
consumerName, err := getQueryParamValue(urlString, "consumerName")
188+
consumerName, err := getQueryParamValue(urlString, "consumer_name")
189189
if err != nil {
190190
consumerName = defaultValue
191191
}
@@ -194,7 +194,7 @@ func getConsumerNameParameterValue(urlString, defaultValue string) string {
194194
}
195195

196196
func getBatchSizeParameterValue(urlString string, defaultValue int) int {
197-
batchSizeString, _ := getQueryParamValue(urlString, "batchSize")
197+
batchSizeString, _ := getQueryParamValue(urlString, "batch_size")
198198
batchSize, err := strconv.ParseInt(batchSizeString, 10, 64)
199199
if err != nil {
200200
return defaultValue

main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func main() {
3737
l := initalizeLogger()
3838
go handlers.CleanConnectionsCache()
3939
app := router.SetupRoutes(l)
40-
handlers.InitializeMessageCache(l)
4140
l.Noticef("Memphis REST gateway is up and running")
4241
l.Noticef("Version %s", configuration.VERSION)
4342
app.Listen(":" + configuration.HTTP_PORT)

router/router.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package router
22

33
import (
4+
"rest-gateway/handlers"
45
"rest-gateway/logger"
56
"rest-gateway/middlewares"
67
"rest-gateway/utils"
@@ -15,6 +16,7 @@ func SetupRoutes(l *logger.Logger) *fiber.App {
1516
app := fiber.New(fiber.Config{
1617
DisableStartupMessage: true,
1718
})
19+
handlers.InitializeMessageCache(l)
1820

1921
logger.SetLogger(app, l)
2022
app.Use(cors.New())

router/stations.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ func InitializeStationsRoutes(app *fiber.App) {
1111
api := app.Group("/stations", logger.New())
1212
api.Post("/:stationName/produce/single", handlers.CreateHandleMessage())
1313
api.Post("/:stationName/produce/batch", handlers.CreateHandleBatch())
14+
api.Get("/:stationName/consume", handlers.ConsumeHandleMessage())
15+
api.Post("/:stationName/ack", handlers.AcknowledgeMessage())
1416
}

0 commit comments

Comments
 (0)