Skip to content

Commit 674a64f

Browse files
committed
code review
1 parent b85f6a2 commit 674a64f

File tree

2 files changed

+66
-26
lines changed

2 files changed

+66
-26
lines changed

README.md

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,9 @@ curl --location --request POST 'rest_gateway:4444/stations/<station_name>/consum
236236
--header 'Content-Type: application/json' \
237237
--data-raw '{
238238
"consumer_name": <consumer_name> string required,
239-
"consumer_group_name": <consumer_group_name> string defaults to <consumer_name>,
239+
"consumer_group": <consumer_group> string defaults to <consumer_name>,
240240
"batch_size": <batch_size> integer defaults to 10,
241-
"batch_max_wait_time": <batch_max_wait_time> integer defaults to 5 secs,
242-
"max_ack_time": <max_ack_time> integer defaults to 30 secs,
241+
"batch_max_wait_time_ms": <batch_max_wait_time> integer defaults to 5 secs,
243242
"max_msg_deliveries": <max_msg_deliveries> integer defaults to 10
244243
}'
245244
```
@@ -248,12 +247,54 @@ Expected output:
248247

249248
```json
250249
[
251-
{
252-
"message": "{\n \"message\": \"How're you doing\"\n}"
253-
},
254-
{
255-
"message": "{\n \"message\": \"How far\"\n}"
250+
{
251+
"message": "{\n \"message\": \"message x\"\n}",
252+
"headers": {
253+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
254+
"$memphis_producedBy": "rest_gateway",
255+
"Accept": "*/*",
256+
"Accept-Encoding": "gzip, deflate, br",
257+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
258+
"Connection": "keep-alive",
259+
"Content-Length": "30",
260+
"Content-Type": "application/json",
261+
"Host": "localhost:4444",
262+
"Postman-Token": "1c1947d9-5dfb-4c3a-8da7-a31fbf993be6",
263+
"User-Agent": "PostmanRuntime/7.32.3"
256264
}
265+
},
266+
{
267+
"message": "{\n \"message\": \"message y\"\n}",
268+
"headers": {
269+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
270+
"$memphis_producedBy": "rest_gateway",
271+
"Accept": "*/*",
272+
"Accept-Encoding": "gzip, deflate, br",
273+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
274+
"Connection": "keep-alive",
275+
"Content-Length": "30",
276+
"Content-Type": "application/json",
277+
"Host": "localhost:4444",
278+
"Postman-Token": "9f76af3e-a861-483b-a2f0-b3f7e9e72125",
279+
"User-Agent": "PostmanRuntime/7.32.3"
280+
}
281+
},
282+
{
283+
"message": "{\n \"message\": \"Hello world\"\n}",
284+
"headers": {
285+
"$memphis_connectionId": "ab083953-a3b1-467b-8487-dfcd24e2ba59",
286+
"$memphis_producedBy": "rest_gateway",
287+
"Accept": "*/*",
288+
"Accept-Encoding": "gzip, deflate, br",
289+
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2NvdW50X2lkIjoxLCJjb25uZWN0aW9uX3Rva2VuIjoiIiwiZXhwIjoxNjkzNzc2MzYzLCJwYXNzd29yZCI6IkBlYnViZUBBZ3UxIiwidXNlcm5hbWUiOiJlYnViZWFndSJ9.QW2sx1mVyyQ1g88Gclj9VNfwvXU4Fr__M1XXbjc6jis",
290+
"Connection": "keep-alive",
291+
"Content-Length": "32",
292+
"Content-Type": "application/json",
293+
"Host": "localhost:4444",
294+
"Postman-Token": "7fe95b24-edae-4415-bc4c-258e6c93a51d",
295+
"User-Agent": "PostmanRuntime/7.32.3"
296+
}
297+
}
257298
]
258299
```
259300

handlers/consumer.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,22 @@ import (
1212
)
1313

1414
type requestBody struct {
15-
ConsumerName string `json:"consumer_name"`
16-
ConsumerGroupName string `json:"consumer_group_name"`
17-
BatchSize int `json:"batch_size"`
18-
MaxAckTime int `json:"max_ack_time"`
19-
BatchMaxWaitTime int `json:"batch_max_wait_time"`
20-
MaxMsgDeliveries int `json:"max_msg_deliveries"`
15+
ConsumerName string `json:"consumer_name"`
16+
ConsumerGroup string `json:"consumer_group"`
17+
BatchSize int `json:"batch_size"`
18+
BatchMaxWaitTimeMs int `json:"batch_max_wait_time_ms"`
19+
MaxMsgDeliveries int `json:"max_msg_deliveries"`
2120
}
2221

2322
func (r *requestBody) initializeDefaults() {
24-
if r.ConsumerGroupName == "" {
25-
r.ConsumerGroupName = r.ConsumerName
23+
if r.ConsumerGroup == "" {
24+
r.ConsumerGroup = r.ConsumerName
2625
}
2726
if r.BatchSize == 0 {
2827
r.BatchSize = 10
2928
}
30-
if r.BatchMaxWaitTime == 0 {
31-
r.BatchMaxWaitTime = 5
32-
}
33-
if r.MaxAckTime == 0 {
34-
r.MaxAckTime = 30
29+
if r.BatchMaxWaitTimeMs == 0 {
30+
r.BatchMaxWaitTimeMs = 5000
3531
}
3632
if r.MaxMsgDeliveries == 0 {
3733
r.MaxMsgDeliveries = 10
@@ -87,9 +83,8 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
8783
reqBody.initializeDefaults()
8884
msgs, err := conn.FetchMessages(stationName, reqBody.ConsumerName,
8985
memphis.FetchBatchSize(reqBody.BatchSize),
90-
memphis.FetchConsumerGroup(reqBody.ConsumerGroupName),
91-
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTime)*time.Second),
92-
memphis.FetchMaxAckTime(time.Duration(reqBody.MaxAckTime)*time.Second),
86+
memphis.FetchConsumerGroup(reqBody.ConsumerGroup),
87+
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTimeMs)*time.Millisecond),
9388
memphis.FetchMaxMsgDeliveries(reqBody.MaxMsgDeliveries))
9489

9590
if err != nil {
@@ -102,7 +97,8 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
10297
}
10398

10499
type message struct {
105-
Message string `json:"message"`
100+
Message string `json:"message"`
101+
Headers map[string]string `json:"headers"`
106102
}
107103
messages := []message{}
108104

@@ -111,7 +107,10 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
111107
if err != nil {
112108
log.Errorf("ConsumeHandleMessage - acknowledge message: %s", err)
113109
}
114-
messages = append(messages, message{string(msg.Data())})
110+
messages = append(messages, message{
111+
Message: string(msg.Data()),
112+
Headers: msg.GetHeaders(),
113+
})
115114
}
116115
c.Status(fiber.StatusOK)
117116
return c.JSON(&messages)

0 commit comments

Comments
 (0)