Skip to content

Commit fb3d73e

Browse files
committed
Change message structure phrasing last error handling
1 parent 373fceb commit fb3d73e

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

handlers/producer.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,28 +102,36 @@ func CreateHandleBatch(conn *memphis.Conn) func(*fiber.Ctx) error {
102102

103103
switch contentType {
104104
case "application/json":
105-
var batchReq []string
105+
var batchReq []map[string]string
106106
err := json.Unmarshal(bodyReq, &batchReq)
107107
if err != nil {
108-
return err
108+
return errors.New("unsupported request")
109109
}
110110
hdrs, err := handleHeaders(headers)
111111
if err != nil {
112112
return err
113113
}
114114

115115
errCount := 0
116+
var lastErr error
116117
for _, msg := range batchReq {
117-
if err = producer.Produce([]byte(msg), memphis.MsgHeaders(hdrs)); err != nil {
118+
rawRes, err := json.Marshal(msg)
119+
if err != nil {
120+
errCount++
121+
lastErr = err
122+
continue
123+
}
124+
if err := producer.Produce(rawRes, memphis.MsgHeaders(hdrs)); err != nil {
118125
errCount++
126+
lastErr = err
119127
}
120128
}
121129

122130
if errCount > 0 {
123131
c.Status(400)
124132
return c.JSON(&fiber.Map{
125133
"success": false,
126-
"error": fmt.Sprintf("send failed for %d/%d message, last error %v", errCount, len(batchReq), err.Error()),
134+
"error": fmt.Sprintf("send failed for %d/%d messages, last error: %v", errCount, len(batchReq), lastErr.Error()),
127135
})
128136
}
129137
default:

0 commit comments

Comments
 (0)