Skip to content

Commit 48930ea

Browse files
fix support application/x-protobuf
1 parent a5ea39a commit 48930ea

File tree

1 file changed

+23
-56
lines changed

1 file changed

+23
-56
lines changed

handlers/producer.go

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,116 +3,83 @@ package handler
33
import (
44
"encoding/json"
55
"errors"
6-
"log"
76
"strings"
87

98
"github.com/gofiber/fiber/v2"
109
"github.com/memphisdev/memphis.go"
11-
"google.golang.org/protobuf/proto"
1210
)
1311

14-
type Handler struct{ P *memphis.Producer }
12+
func handleHeaders(headers map[string]string) (memphis.Headers, error) {
13+
hdrs := memphis.Headers{}
14+
hdrs.New()
1515

16-
func (p Handler) produce(message []byte, hdrs memphis.Headers) error {
17-
if err := p.P.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
18-
return err
16+
for key, value := range headers {
17+
err := hdrs.Add(key, value)
18+
if err != nil {
19+
return memphis.Headers{}, err
20+
}
1921
}
20-
return nil
21-
22+
return hdrs, nil
2223
}
2324

24-
func handleMessageHdrs(bodyReq []byte, hdrs memphis.Headers) ([]byte, memphis.Headers, error) {
25+
func handleJsonMessage(bodyReq []byte, headers map[string]string) ([]byte, memphis.Headers, error) {
2526
type body struct {
2627
Message string `json:"message"`
27-
Headers string `json:"headers"`
2828
}
29-
var b body
30-
err := json.Unmarshal(bodyReq, &b)
29+
var bodyRequest body
30+
err := json.Unmarshal(bodyReq, &bodyRequest)
3131
if err != nil {
3232
return nil, memphis.Headers{}, err
3333
}
3434

35-
var headers map[string]string
36-
err = json.Unmarshal([]byte(b.Headers), &headers)
35+
hdrs, err := handleHeaders(headers)
3736
if err != nil {
3837
return nil, memphis.Headers{}, err
3938
}
4039

41-
var k, v string
42-
for key, value := range headers {
43-
k = key
44-
v = value
45-
46-
err = hdrs.Add(k, v)
47-
if err != nil {
48-
return nil, memphis.Headers{}, err
49-
}
50-
}
51-
52-
message, err := json.Marshal(b.Message)
40+
message, err := json.Marshal(bodyRequest.Message)
5341
if err != nil {
5442
return nil, memphis.Headers{}, err
5543
}
5644
return message, hdrs, nil
5745
}
5846

59-
func CreateHandleMessage(p *memphis.Producer) func(*fiber.Ctx) error {
47+
func CreateHandleMessage(producer *memphis.Producer) func(*fiber.Ctx) error {
6048
return func(c *fiber.Ctx) error {
6149
bodyReq := c.Body()
50+
headers := c.GetReqHeaders()
6251
contentType := string(c.Request().Header.ContentType())
6352
var message []byte
64-
var err error
6553
hdrs := memphis.Headers{}
66-
hdrs.New()
54+
var err error
6755
caseText := strings.Contains(contentType, "text")
68-
6956
if caseText {
7057
contentType = "text/"
7158
}
7259

7360
switch contentType {
7461
case "application/json":
75-
message, hdrs, err = handleMessageHdrs(bodyReq, hdrs)
62+
message, hdrs, err = handleJsonMessage(bodyReq, headers)
7663
if err != nil {
7764
return err
7865
}
7966
case "text/":
80-
message, hdrs, err = handleMessageHdrs(bodyReq, hdrs)
67+
message = bodyReq
68+
hdrs, err = handleHeaders(headers)
8169
if err != nil {
8270
return err
8371
}
8472
case "application/x-protobuf":
85-
msg := &Msg{}
86-
err := proto.Unmarshal(bodyReq, msg)
87-
if err != nil {
88-
log.Fatal("unmarshaling error: ", err)
89-
}
90-
91-
message, err = json.Marshal(msg.Message)
73+
message = bodyReq
74+
hdrs, err = handleHeaders(headers)
9275
if err != nil {
9376
return err
9477
}
95-
96-
var headers map[string]string
97-
err = json.Unmarshal([]byte(msg.Headers), &headers)
98-
if err != nil {
99-
return err
100-
}
101-
102-
var k, v string
103-
for key, value := range headers {
104-
k = key
105-
v = value
106-
err = hdrs.Add(k, v)
107-
if err != nil {
108-
return err
109-
}
110-
}
11178
default:
11279
return errors.New("unsupported content type")
11380
}
11481

115-
if err := p.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
82+
if err := producer.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
11683
c.Status(400)
11784
return c.JSON(&fiber.Map{
11885
"success": false,

0 commit comments

Comments
 (0)