Skip to content

Commit 373fceb

Browse files
committed
Support batch message request
1 parent df946de commit 373fceb

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

handlers/producer.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package handler
22

33
import (
4+
"encoding/json"
45
"errors"
6+
"fmt"
57
"strings"
68

79
"github.com/gofiber/fiber/v2"
@@ -83,3 +85,55 @@ func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
8385
})
8486
}
8587
}
88+
89+
func CreateHandleBatch(conn *memphis.Conn) func(*fiber.Ctx) error {
90+
return func(c *fiber.Ctx) error {
91+
stationName := c.Params("stationName")
92+
var producer *memphis.Producer
93+
94+
producer, err := createProducer(conn, producers, stationName)
95+
if err != nil {
96+
return err
97+
}
98+
99+
bodyReq := c.Body()
100+
headers := c.GetReqHeaders()
101+
contentType := string(c.Request().Header.ContentType())
102+
103+
switch contentType {
104+
case "application/json":
105+
var batchReq []string
106+
err := json.Unmarshal(bodyReq, &batchReq)
107+
if err != nil {
108+
return err
109+
}
110+
hdrs, err := handleHeaders(headers)
111+
if err != nil {
112+
return err
113+
}
114+
115+
errCount := 0
116+
for _, msg := range batchReq {
117+
if err = producer.Produce([]byte(msg), memphis.MsgHeaders(hdrs)); err != nil {
118+
errCount++
119+
}
120+
}
121+
122+
if errCount > 0 {
123+
c.Status(400)
124+
return c.JSON(&fiber.Map{
125+
"success": false,
126+
"error": fmt.Sprintf("send failed for %d/%d message, last error %v", errCount, len(batchReq), err.Error()),
127+
})
128+
}
129+
default:
130+
return errors.New("unsupported content type")
131+
}
132+
133+
c.Status(200)
134+
return c.JSON(&fiber.Map{
135+
"success": true,
136+
"error": nil,
137+
})
138+
}
139+
}

router/router.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package router
22

33
import (
4+
handler "http-proxy/handlers"
5+
46
"github.com/gofiber/fiber/v2"
57
"github.com/gofiber/fiber/v2/middleware/logger"
68
"github.com/memphisdev/memphis.go"
7-
handler "http-proxy/handlers"
89
)
910

1011
// SetupRoutes setup router api
1112
func SetupRoutes(app *fiber.App, conn *memphis.Conn) {
1213
api := app.Group("/stations", logger.New())
1314
api.Post("/:stationName/produce/single", handler.CreateHandleMessage(conn))
15+
api.Post("/:stationName/produce/batch", handler.CreateHandleBatch(conn))
1416
}

0 commit comments

Comments
 (0)