Skip to content

Commit a2f5fcc

Browse files
bugfixes
1 parent 8c3ab4a commit a2f5fcc

File tree

3 files changed

+17
-60
lines changed

3 files changed

+17
-60
lines changed

README.md

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,7 @@ curl --location --request POST 'rest_gateway:4444/stations/<station_name>/consum
238238
"consumer_name": <consumer_name> string required,
239239
"consumer_group": <consumer_group> string defaults to <consumer_name>,
240240
"batch_size": <batch_size> integer defaults to 10,
241-
"batch_max_wait_time_ms": <batch_max_wait_time> integer defaults to 5 secs,
242-
"max_msg_deliveries": <max_msg_deliveries> integer defaults to 10
241+
"batch_max_wait_time_ms": <batch_max_wait_time> integer defaults to 5 secs
243242
}'
244243
```
245244

@@ -250,49 +249,15 @@ Expected output:
250249
{
251250
"message": "{\n \"message\": \"message x\"\n}",
252251
"headers": {
253-
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
254-
"$memphis_producedBy": "rest_gateway",
255252
"Accept": "*/*",
256-
"Accept-Encoding": "gzip, deflate, br",
257-
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
258-
"Connection": "keep-alive",
259-
"Content-Length": "30",
260-
"Content-Type": "application/json",
261-
"Host": "localhost:4444",
262-
"Postman-Token": "1c1947d9-5dfb-4c3a-8da7-a31fbf993be6",
263-
"User-Agent": "PostmanRuntime/7.32.3"
253+
"Accept-Encoding": "gzip, deflate, br"
264254
}
265255
},
266256
{
267257
"message": "{\n \"message\": \"message y\"\n}",
268258
"headers": {
269-
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
270-
"$memphis_producedBy": "rest_gateway",
271-
"Accept": "*/*",
272-
"Accept-Encoding": "gzip, deflate, br",
273-
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
274-
"Connection": "keep-alive",
275-
"Content-Length": "30",
276-
"Content-Type": "application/json",
277-
"Host": "localhost:4444",
278-
"Postman-Token": "9f76af3e-a861-483b-a2f0-b3f7e9e72125",
279-
"User-Agent": "PostmanRuntime/7.32.3"
280-
}
281-
},
282-
{
283-
"message": "{\n \"message\": \"Hello world\"\n}",
284-
"headers": {
285-
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
286-
"$memphis_producedBy": "rest_gateway",
287-
"Accept": "*/*",
288-
"Accept-Encoding": "gzip, deflate, br",
289-
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
290-
"Connection": "keep-alive",
291-
"Content-Length": "32",
292259
"Content-Type": "application/json",
293-
"Host": "localhost:4444",
294-
"Postman-Token": "7fe95b24-edae-4415-bc4c-258e6c93a51d",
295-
"User-Agent": "PostmanRuntime/7.32.3"
260+
"Host": "localhost:4444"
296261
}
297262
}
298263
]

handlers/consumer.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,35 @@ package handlers
22

33
import (
44
"fmt"
5-
"github.com/gofiber/fiber/v2"
6-
"github.com/memphisdev/memphis.go"
75
"rest-gateway/logger"
86
"rest-gateway/models"
97
"strconv"
108
"strings"
119
"time"
10+
11+
"github.com/gofiber/fiber/v2"
12+
"github.com/memphisdev/memphis.go"
1213
)
1314

1415
type requestBody struct {
1516
ConsumerName string `json:"consumer_name"`
1617
ConsumerGroup string `json:"consumer_group"`
1718
BatchSize int `json:"batch_size"`
1819
BatchMaxWaitTimeMs int `json:"batch_max_wait_time_ms"`
19-
MaxMsgDeliveries int `json:"max_msg_deliveries"`
2020
}
2121

2222
func (r *requestBody) initializeDefaults() {
2323
if r.ConsumerGroup == "" {
24-
r.ConsumerGroup = r.ConsumerName
24+
r.ConsumerGroup = "rest-gateway"
25+
} else {
26+
r.ConsumerGroup = fmt.Sprintf("%s-rest-gateway", r.ConsumerGroup)
2527
}
2628
if r.BatchSize == 0 {
2729
r.BatchSize = 10
2830
}
2931
if r.BatchMaxWaitTimeMs == 0 {
3032
r.BatchMaxWaitTimeMs = 5000
3133
}
32-
if r.MaxMsgDeliveries == 0 {
33-
r.MaxMsgDeliveries = 10
34-
}
3534
}
3635

3736
func ConsumeHandleMessage() func(*fiber.Ctx) error {
@@ -71,9 +70,7 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
7170
accountIdStr := strconv.Itoa(int(accountId))
7271
conn := ConnectionsCache[accountIdStr][username].Connection
7372
if conn == nil {
74-
errMsg := fmt.Sprintf("Connection does not exist")
75-
log.Errorf("ConsumeHandleMessage - consume: %s", errMsg)
76-
73+
log.Warnf("ConsumeHandleMessage - consume: Connection does not exist")
7774
c.Status(fiber.StatusInternalServerError)
7875
return c.JSON(&fiber.Map{
7976
"success": false,
@@ -85,14 +82,14 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
8582
memphis.FetchBatchSize(reqBody.BatchSize),
8683
memphis.FetchConsumerGroup(reqBody.ConsumerGroup),
8784
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTimeMs)*time.Millisecond),
88-
memphis.FetchMaxMsgDeliveries(reqBody.MaxMsgDeliveries))
85+
memphis.FetchMaxMsgDeliveries(1))
8986

9087
if err != nil {
9188
log.Errorf("ConsumeHandleMessage - fetch messages: %s", err.Error())
92-
c.Status(fiber.StatusInternalServerError)
89+
c.Status(fiber.StatusBadRequest)
9390
return c.JSON(&fiber.Map{
9491
"success": false,
95-
"error": "Server error",
92+
"error": err.Error(),
9693
})
9794
}
9895

handlers/producer.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package handlers
33
import (
44
"encoding/json"
55
"errors"
6-
"fmt"
76
"strconv"
87

98
"rest-gateway/logger"
@@ -72,16 +71,14 @@ func CreateHandleMessage() func(*fiber.Ctx) error {
7271
accountIdStr := strconv.Itoa(int(accountId))
7372
conn := ConnectionsCache[accountIdStr][username].Connection
7473
if conn == nil {
75-
errMsg := fmt.Sprintf("Connection does not exist")
76-
log.Errorf("CreateHandleMessage - produce: %s", errMsg)
77-
74+
log.Warnf("CreateHandleMessage - produce: Connection does not exist")
7875
c.Status(fiber.StatusInternalServerError)
7976
return c.JSON(&fiber.Map{
8077
"success": false,
8178
"error": "Server error",
8279
})
8380
}
84-
err = conn.Produce(stationName, "rest_gateway", message, []memphis.ProducerOpt{}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)})
81+
err = conn.Produce(stationName, "rest-gateway", message, []memphis.ProducerOpt{}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)})
8582
if err != nil {
8683
log.Errorf("CreateHandleMessage - produce: %s", err.Error())
8784
c.Status(fiber.StatusInternalServerError)
@@ -147,9 +144,7 @@ func CreateHandleBatch() func(*fiber.Ctx) error {
147144
accountIdStr := strconv.Itoa(int(accountId))
148145
conn := ConnectionsCache[accountIdStr][username].Connection
149146
if conn == nil {
150-
errMsg := fmt.Sprintf("Connection does not exist")
151-
log.Errorf("CreateHandleBatch - produce: %s", errMsg)
152-
147+
log.Warnf("CreateHandleBatch - produce: Connection does not exist")
153148
c.Status(fiber.StatusInternalServerError)
154149
return c.JSON(&fiber.Map{
155150
"success": false,
@@ -166,7 +161,7 @@ func CreateHandleBatch() func(*fiber.Ctx) error {
166161
allErr = append(allErr, err.Error())
167162
continue
168163
}
169-
if err := conn.Produce(stationName, "rest_gateway", rawRes, []memphis.ProducerOpt{}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)}); err != nil {
164+
if err := conn.Produce(stationName, "rest-gateway", rawRes, []memphis.ProducerOpt{}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)}); err != nil {
170165
log.Errorf("CreateHandleBatch - produce: %s", err.Error())
171166
errCount++
172167
allErr = append(allErr, err.Error())

0 commit comments

Comments
 (0)