11package handler
22
33import (
4- "encoding/json"
54 "errors"
65 "strings"
76
87 "github.com/gofiber/fiber/v2"
98 "github.com/memphisdev/memphis.go"
109)
1110
11+ var producers = make (map [string ]* memphis.Producer )
12+
1213func handleHeaders (headers map [string ]string ) (memphis.Headers , error ) {
1314 hdrs := memphis.Headers {}
1415 hdrs .New ()
@@ -22,44 +23,31 @@ func handleHeaders(headers map[string]string) (memphis.Headers, error) {
2223 return hdrs , nil
2324}
2425
25- func handleJsonMessage (bodyReq []byte , headers map [string ]string ) ([]byte , memphis.Headers , error ) {
26- type body struct {
27- Message string `json:"message"`
28- }
29- var bodyRequest body
30- err := json .Unmarshal (bodyReq , & bodyRequest )
31- if err != nil {
32- return nil , memphis.Headers {}, err
33- }
34-
35- hdrs , err := handleHeaders (headers )
36- if err != nil {
37- return nil , memphis.Headers {}, err
26+ func createProducer (conn * memphis.Conn , producers map [string ]* memphis.Producer , stationName string ) (* memphis.Producer , error ) {
27+ producerName := "http_proxy"
28+ var producer * memphis.Producer
29+ var err error
30+ if _ , ok := producers [stationName ]; ! ok {
31+ producer , err = conn .CreateProducer (stationName , producerName , memphis .ProducerGenUniqueSuffix ())
32+ if err != nil {
33+ return nil , err
34+ }
35+ producers [stationName ] = producer
36+ } else {
37+ producer = producers [stationName ]
3838 }
3939
40- message , err := json .Marshal (bodyRequest .Message )
41- if err != nil {
42- return nil , memphis.Headers {}, err
43- }
44- return message , hdrs , nil
40+ return producer , nil
4541}
4642
4743func CreateHandleMessage (conn * memphis.Conn ) func (* fiber.Ctx ) error {
48- producers := make (map [string ]* memphis.Producer )
4944 return func (c * fiber.Ctx ) error {
5045 stationName := c .Params ("stationName" )
51- producerName := "http_proxy"
5246 var producer * memphis.Producer
53- var err error
5447
55- if _ , ok := producers [stationName ]; ! ok {
56- producer , err = conn .CreateProducer (stationName , producerName , memphis .ProducerGenUniqueSuffix ())
57- if err != nil {
58- return err
59- }
60- producers [stationName ] = producer
61- } else {
62- producer = producers [stationName ]
48+ producer , err := createProducer (conn , producers , stationName )
49+ if err != nil {
50+ return err
6351 }
6452
6553 bodyReq := c .Body ()
@@ -73,18 +61,7 @@ func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
7361 }
7462
7563 switch contentType {
76- case "application/json" :
77- message , hdrs , err = handleJsonMessage (bodyReq , headers )
78- if err != nil {
79- return err
80- }
81- case "text/" :
82- message = bodyReq
83- hdrs , err = handleHeaders (headers )
84- if err != nil {
85- return err
86- }
87- case "application/x-protobuf" :
64+ case "application/json" , "text/" , "application/x-protobuf" :
8865 message = bodyReq
8966 hdrs , err = handleHeaders (headers )
9067 if err != nil {
0 commit comments