Skip to content

Commit b85f6a2

Browse files
committed
make consume method to auto acknowledge the messages and no equivalent acknowledge method for the gateway
1 parent a912d38 commit b85f6a2

File tree

4 files changed

+56
-8
lines changed

4 files changed

+56
-8
lines changed

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,54 @@ Expected output:
217217
```json
218218
{"errors":["Schema validation has failed: jsonschema: '' does not validate with file:///Users/user/memphisdev/memphis-rest-gateway/123#/required: missing properties: 'field1'","Schema validation has failed: jsonschema: '' does not validate with file:///Users/user/memphisdev/memphis-rest-gateway/123#/required: missing properties: 'field1'"],"fail":2,"sent":1,"success":false}
219219
```
220+
### Consume a batch of messages 
221+
222+
Attach the JWT token to every request.\
223+
JWT token as '`Bearer`' as a header.
224+
225+
The messages are auto acknowledged by the rest gateway.
226+
227+
#### Supported content types:
228+
229+
* application/json
230+
231+
#### Example:
232+
233+
```bash
234+
curl --location --request POST 'rest_gateway:4444/stations/<station_name>/consume/batch' \
235+
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4KOGRhUaqvm-qSHnmMwX5VrLKsvHo33u3UdJ0qYP0kI' \
236+
--header 'Content-Type: application/json' \
237+
--data-raw '{
238+
"consumer_name": <consumer_name> string required,
239+
"consumer_group_name": <consumer_group_name> string defaults to <consumer_name>,
240+
"batch_size": <batch_size> integer defaults to 10,
241+
"batch_max_wait_time": <batch_max_wait_time> integer defaults to 5 secs,
242+
"max_ack_time": <max_ack_time> integer defaults to 30 secs,
243+
"max_msg_deliveries": <max_msg_deliveries> integer defaults to 10
244+
}'
245+
```
246+
247+
Expected output:
248+
249+
```json
250+
[
251+
{
252+
"message": "{\n \"message\": \"How're you doing\"\n}"
253+
},
254+
{
255+
"message": "{\n \"message\": \"How far\"\n}"
256+
}
257+
]
258+
```
259+
260+
#### Error Examples:
261+
262+
```json
263+
{
264+
"error": "Consumer name is required",
265+
"success": false
266+
}
267+
```
220268

221269
## Support 🙋‍♂️🤝
222270

handlers/consumer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type requestBody struct {
2020
MaxMsgDeliveries int `json:"max_msg_deliveries"`
2121
}
2222

23-
func (r requestBody) initializeDefaults() {
23+
func (r *requestBody) initializeDefaults() {
2424
if r.ConsumerGroupName == "" {
2525
r.ConsumerGroupName = r.ConsumerName
2626
}
@@ -88,28 +88,28 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
8888
msgs, err := conn.FetchMessages(stationName, reqBody.ConsumerName,
8989
memphis.FetchBatchSize(reqBody.BatchSize),
9090
memphis.FetchConsumerGroup(reqBody.ConsumerGroupName),
91-
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTime)),
92-
memphis.FetchMaxAckTime(time.Duration(reqBody.MaxAckTime)),
91+
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTime)*time.Second),
92+
memphis.FetchMaxAckTime(time.Duration(reqBody.MaxAckTime)*time.Second),
9393
memphis.FetchMaxMsgDeliveries(reqBody.MaxMsgDeliveries))
9494

9595
if err != nil {
9696
log.Errorf("ConsumeHandleMessage - fetch messages: %s", err.Error())
97-
c.Status(fiber.StatusBadRequest)
97+
c.Status(fiber.StatusInternalServerError)
9898
return c.JSON(&fiber.Map{
9999
"success": false,
100100
"error": "Server error",
101101
})
102102
}
103103

104104
type message struct {
105-
Data string `json:"data"`
105+
Message string `json:"message"`
106106
}
107107
messages := []message{}
108108

109109
for _, msg := range msgs {
110110
err := msg.Ack()
111111
if err != nil {
112-
log.Errorf("ConsumeHandleMessage - consume: %s", err)
112+
log.Errorf("ConsumeHandleMessage - acknowledge message: %s", err)
113113
}
114114
messages = append(messages, message{string(msg.Data())})
115115
}

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func initializeLogger() *logger.Logger {
1818
creds := configuration.CONNECTION_TOKEN
1919
username := configuration.ROOT_USER
2020
if configuration.USER_PASS_BASED_AUTH {
21-
username = "$$memphis"
21+
username = "$memphis"
2222
creds = configuration.CONNECTION_TOKEN + "_" + configuration.ROOT_PASSWORD
2323
}
2424
l, err := logger.CreateLogger(configuration.MEMPHIS_HOST, username, creds)

router/stations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ func InitializeStationsRoutes(app *fiber.App) {
1111
api := app.Group("/stations", logger.New())
1212
api.Post("/:stationName/produce/single", handlers.CreateHandleMessage())
1313
api.Post("/:stationName/produce/batch", handlers.CreateHandleBatch())
14-
api.Get("/:stationName/consume/batch", handlers.ConsumeHandleMessage())
14+
api.Post("/:stationName/consume/batch", handlers.ConsumeHandleMessage())
1515
}

0 commit comments

Comments
 (0)