Skip to content

Commit a649d9e

Browse files
Merge pull request #7 from memphisdev/batch-message-support
Batch message support
2 parents 28de0fc + fb3d73e commit a649d9e

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

handlers/producer.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package handler
22

33
import (
4+
"encoding/json"
45
"errors"
6+
"fmt"
57
"strings"
68

79
"github.com/gofiber/fiber/v2"
@@ -83,3 +85,63 @@ func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
8385
})
8486
}
8587
}
88+
89+
func CreateHandleBatch(conn *memphis.Conn) func(*fiber.Ctx) error {
90+
return func(c *fiber.Ctx) error {
91+
stationName := c.Params("stationName")
92+
var producer *memphis.Producer
93+
94+
producer, err := createProducer(conn, producers, stationName)
95+
if err != nil {
96+
return err
97+
}
98+
99+
bodyReq := c.Body()
100+
headers := c.GetReqHeaders()
101+
contentType := string(c.Request().Header.ContentType())
102+
103+
switch contentType {
104+
case "application/json":
105+
var batchReq []map[string]string
106+
err := json.Unmarshal(bodyReq, &batchReq)
107+
if err != nil {
108+
return errors.New("unsupported request")
109+
}
110+
hdrs, err := handleHeaders(headers)
111+
if err != nil {
112+
return err
113+
}
114+
115+
errCount := 0
116+
var lastErr error
117+
for _, msg := range batchReq {
118+
rawRes, err := json.Marshal(msg)
119+
if err != nil {
120+
errCount++
121+
lastErr = err
122+
continue
123+
}
124+
if err := producer.Produce(rawRes, memphis.MsgHeaders(hdrs)); err != nil {
125+
errCount++
126+
lastErr = err
127+
}
128+
}
129+
130+
if errCount > 0 {
131+
c.Status(400)
132+
return c.JSON(&fiber.Map{
133+
"success": false,
134+
"error": fmt.Sprintf("send failed for %d/%d messages, last error: %v", errCount, len(batchReq), lastErr.Error()),
135+
})
136+
}
137+
default:
138+
return errors.New("unsupported content type")
139+
}
140+
141+
c.Status(200)
142+
return c.JSON(&fiber.Map{
143+
"success": true,
144+
"error": nil,
145+
})
146+
}
147+
}

router/router.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package router
22

33
import (
4+
handler "http-proxy/handlers"
5+
46
"github.com/gofiber/fiber/v2"
57
"github.com/gofiber/fiber/v2/middleware/logger"
68
"github.com/memphisdev/memphis.go"
7-
handler "http-proxy/handlers"
89
)
910

1011
// SetupRoutes setup router api
1112
func SetupRoutes(app *fiber.App, conn *memphis.Conn) {
1213
api := app.Group("/stations", logger.New())
1314
api.Post("/:stationName/produce/single", handler.CreateHandleMessage(conn))
15+
api.Post("/:stationName/produce/batch", handler.CreateHandleBatch(conn))
1416
}

0 commit comments

Comments
 (0)