Skip to content

Commit 8c3ab4a

Browse files
Merge pull request #49 from EbubeCode/master
Add consume messages endpoints
2 parents a5f5b09 + 1a0ba41 commit 8c3ab4a

File tree

4 files changed

+210
-0
lines changed

4 files changed

+210
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea/
2+
rest-gateway

README.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,95 @@ Expected output:
217217
```json
218218
{"errors":["Schema validation has failed: jsonschema: '' does not validate with file:///Users/user/memphisdev/memphis-rest-gateway/123#/required: missing properties: 'field1'","Schema validation has failed: jsonschema: '' does not validate with file:///Users/user/memphisdev/memphis-rest-gateway/123#/required: missing properties: 'field1'"],"fail":2,"sent":1,"success":false}
219219
```
220+
### Consume a batch of messages 
221+
222+
Attach the JWT token to every request.\
223+
JWT token as '`Bearer`' as a header.
224+
225+
The messages are auto acknowledged by the rest gateway.
226+
227+
#### Supported content types:
228+
229+
* application/json
230+
231+
#### Example:
232+
233+
```bash
234+
curl --location --request POST 'rest_gateway:4444/stations/<station_name>/consume/batch' \
235+
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4KOGRhUaqvm-qSHnmMwX5VrLKsvHo33u3UdJ0qYP0kI' \
236+
--header 'Content-Type: application/json' \
237+
--data-raw '{
238+
"consumer_name": <consumer_name> string required,
239+
"consumer_group": <consumer_group> string defaults to <consumer_name>,
240+
"batch_size": <batch_size> integer defaults to 10,
241+
"batch_max_wait_time_ms": <batch_max_wait_time> integer defaults to 5 secs,
242+
"max_msg_deliveries": <max_msg_deliveries> integer defaults to 10
243+
}'
244+
```
245+
246+
Expected output:
247+
248+
```json
249+
[
250+
{
251+
"message": "{\n \"message\": \"message x\"\n}",
252+
"headers": {
253+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
254+
"$memphis_producedBy": "rest_gateway",
255+
"Accept": "*/*",
256+
"Accept-Encoding": "gzip, deflate, br",
257+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
258+
"Connection": "keep-alive",
259+
"Content-Length": "30",
260+
"Content-Type": "application/json",
261+
"Host": "localhost:4444",
262+
"Postman-Token": "1c1947d9-5dfb-4c3a-8da7-a31fbf993be6",
263+
"User-Agent": "PostmanRuntime/7.32.3"
264+
}
265+
},
266+
{
267+
"message": "{\n \"message\": \"message y\"\n}",
268+
"headers": {
269+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
270+
"$memphis_producedBy": "rest_gateway",
271+
"Accept": "*/*",
272+
"Accept-Encoding": "gzip, deflate, br",
273+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
274+
"Connection": "keep-alive",
275+
"Content-Length": "30",
276+
"Content-Type": "application/json",
277+
"Host": "localhost:4444",
278+
"Postman-Token": "9f76af3e-a861-483b-a2f0-b3f7e9e72125",
279+
"User-Agent": "PostmanRuntime/7.32.3"
280+
}
281+
},
282+
{
283+
"message": "{\n \"message\": \"Hello world\"\n}",
284+
"headers": {
285+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
286+
"$memphis_producedBy": "rest_gateway",
287+
"Accept": "*/*",
288+
"Accept-Encoding": "gzip, deflate, br",
289+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
290+
"Connection": "keep-alive",
291+
"Content-Length": "32",
292+
"Content-Type": "application/json",
293+
"Host": "localhost:4444",
294+
"Postman-Token": "7fe95b24-edae-4415-bc4c-258e6c93a51d",
295+
"User-Agent": "PostmanRuntime/7.32.3"
296+
}
297+
}
298+
]
299+
```
300+
301+
#### Error Examples:
302+
303+
```json
304+
{
305+
"error": "Consumer name is required",
306+
"success": false
307+
}
308+
```
220309

221310
## Support 🙋‍♂️🤝
222311

handlers/consumer.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package handlers
2+
3+
import (
4+
"fmt"
5+
"github.com/gofiber/fiber/v2"
6+
"github.com/memphisdev/memphis.go"
7+
"rest-gateway/logger"
8+
"rest-gateway/models"
9+
"strconv"
10+
"strings"
11+
"time"
12+
)
13+
14+
type requestBody struct {
15+
ConsumerName string `json:"consumer_name"`
16+
ConsumerGroup string `json:"consumer_group"`
17+
BatchSize int `json:"batch_size"`
18+
BatchMaxWaitTimeMs int `json:"batch_max_wait_time_ms"`
19+
MaxMsgDeliveries int `json:"max_msg_deliveries"`
20+
}
21+
22+
func (r *requestBody) initializeDefaults() {
23+
if r.ConsumerGroup == "" {
24+
r.ConsumerGroup = r.ConsumerName
25+
}
26+
if r.BatchSize == 0 {
27+
r.BatchSize = 10
28+
}
29+
if r.BatchMaxWaitTimeMs == 0 {
30+
r.BatchMaxWaitTimeMs = 5000
31+
}
32+
if r.MaxMsgDeliveries == 0 {
33+
r.MaxMsgDeliveries = 10
34+
}
35+
}
36+
37+
func ConsumeHandleMessage() func(*fiber.Ctx) error {
38+
return func(c *fiber.Ctx) error {
39+
log := logger.GetLogger(c)
40+
url := c.Request().URI().String()
41+
urlParts := strings.Split(url, "/")
42+
stationName := urlParts[4]
43+
reqBody := requestBody{}
44+
err := c.BodyParser(&reqBody)
45+
if err != nil {
46+
log.Errorf("ConsumeHandleMessage - parse request body: %s", err.Error())
47+
c.Status(fiber.StatusBadRequest)
48+
return c.JSON(&fiber.Map{
49+
"success": false,
50+
"error": "Invalid request body",
51+
})
52+
}
53+
if reqBody.ConsumerName == "" {
54+
c.Status(fiber.StatusBadRequest)
55+
return c.JSON(&fiber.Map{
56+
"success": false,
57+
"error": "Consumer name is required",
58+
})
59+
}
60+
userData, ok := c.Locals("userData").(models.AuthSchema)
61+
if !ok {
62+
log.Errorf("ConsumeHandleMessage: failed to get the user data from the middleware")
63+
c.Status(fiber.StatusInternalServerError)
64+
return c.JSON(&fiber.Map{
65+
"success": false,
66+
"error": "Server error",
67+
})
68+
}
69+
username := userData.Username
70+
accountId := userData.AccountId
71+
accountIdStr := strconv.Itoa(int(accountId))
72+
conn := ConnectionsCache[accountIdStr][username].Connection
73+
if conn == nil {
74+
errMsg := fmt.Sprintf("Connection does not exist")
75+
log.Errorf("ConsumeHandleMessage - consume: %s", errMsg)
76+
77+
c.Status(fiber.StatusInternalServerError)
78+
return c.JSON(&fiber.Map{
79+
"success": false,
80+
"error": "Server error",
81+
})
82+
}
83+
reqBody.initializeDefaults()
84+
msgs, err := conn.FetchMessages(stationName, reqBody.ConsumerName,
85+
memphis.FetchBatchSize(reqBody.BatchSize),
86+
memphis.FetchConsumerGroup(reqBody.ConsumerGroup),
87+
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTimeMs)*time.Millisecond),
88+
memphis.FetchMaxMsgDeliveries(reqBody.MaxMsgDeliveries))
89+
90+
if err != nil {
91+
log.Errorf("ConsumeHandleMessage - fetch messages: %s", err.Error())
92+
c.Status(fiber.StatusInternalServerError)
93+
return c.JSON(&fiber.Map{
94+
"success": false,
95+
"error": "Server error",
96+
})
97+
}
98+
99+
type message struct {
100+
Message string `json:"message"`
101+
Headers map[string]string `json:"headers"`
102+
}
103+
messages := []message{}
104+
105+
for _, msg := range msgs {
106+
err := msg.Ack()
107+
if err != nil {
108+
log.Errorf("ConsumeHandleMessage - acknowledge message: %s", err)
109+
}
110+
messages = append(messages, message{
111+
Message: string(msg.Data()),
112+
Headers: msg.GetHeaders(),
113+
})
114+
}
115+
c.Status(fiber.StatusOK)
116+
return c.JSON(&messages)
117+
}
118+
}

router/stations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ func InitializeStationsRoutes(app *fiber.App) {
1111
api := app.Group("/stations", logger.New())
1212
api.Post("/:stationName/produce/single", handlers.CreateHandleMessage())
1313
api.Post("/:stationName/produce/batch", handlers.CreateHandleBatch())
14+
api.Post("/:stationName/consume/batch", handlers.ConsumeHandleMessage())
1415
}

0 commit comments

Comments
 (0)