Skip to content

Commit df946de

Browse files
Merge pull request #2 from memphisdev/add-single-produce-endpoint
Add single produce endpoint
2 parents ff8a67f + 20a24fc commit df946de

File tree

8 files changed

+73
-101
lines changed

8 files changed

+73
-101
lines changed

.vscode/launch.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
"env": {
1111
"JWT_SECRET": "35nhvjfosfklgmfg56+fdsgzvfnjksacvbhfksfkgofadsjfgjkoldsdkfvpl'jbgio;dfsjgkl;'XZFVMifobd;dlgjv[sfvjmiodfkvs2fh;fhk44gfdhksdkfdffk",
1212
"REFRESH_JWT_SECRET": "35c7b3eb969db065bfa3c66b38e4323e8f73113f3965dfb55c6bc585dcb0ba62bd399e2588fdc8f709ae0b63fb24be32590f134506ca1d7a4314339f11b8045a",
13+
"CONNECTION_TOKEN": "memphis",
14+
"ROOT_USER": "root",
15+
"MEMPHIS_HOST":"localhost",
16+
"HTTP_PORT": ":3000"
1317
}
1418
}
1519
]

conf/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ type Configuration struct {
99
JWT_EXPIRES_IN_MINUTES int
1010
REFRESH_JWT_SECRET string
1111
REFRESH_JWT_EXPIRES_IN_MINUTES int
12+
HTTP_PORT string
13+
ROOT_USER string
14+
CONNECTION_TOKEN string
15+
MEMPHIS_HOST string
1216
}
1317

1418
func GetConfig() Configuration {

conf/config.json

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
{
2-
"HTTP_PORT": "3000",
3-
"JWT_EXPIRES_IN_MINUTES": 15,
4-
"REFRESH_JWT_EXPIRES_IN_MINUTES": 300
5-
}
2+
"HTTP_PORT": ":3000",
3+
"MEMPHIS_HOST": "localhost",
4+
"ROOT_USER": "root",
5+
"CONNECTION_TOKEN": "memphis",
6+
"JWT_EXPIRES_IN_MINUTES": 15,
7+
"REFRESH_JWT_EXPIRES_IN_MINUTES": 300
8+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ go 1.18
55
require (
66
github.com/gofiber/fiber/v2 v2.40.1
77
github.com/memphisdev/memphis.go v0.1.6
8+
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
89
)
910

1011
require (
1112
github.com/andybalholm/brotli v1.0.4 // indirect
13+
github.com/ghodss/yaml v1.0.0 // indirect
1214
github.com/klauspost/compress v1.15.9 // indirect
1315
github.com/mattn/go-colorable v0.1.13 // indirect
1416
github.com/mattn/go-isatty v0.0.16 // indirect
@@ -26,4 +28,5 @@ require (
2628
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
2729
golang.org/x/time v0.2.0 // indirect
2830
google.golang.org/protobuf v1.28.1 // indirect
31+
gopkg.in/yaml.v2 v2.4.0 // indirect
2932
)

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
22
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
3+
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
4+
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
35
github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4=
46
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
57
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
@@ -29,6 +31,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
2931
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
3032
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
3133
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
34+
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f h1:xDFq4NVQD34ekH5UsedBSgfxsBuPU2aZf7v4t0tH2jY=
35+
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f/go.mod h1:DaZPBuToMc2eezA9R9nDAnmS2RMwL7yEa5YD36ESQdI=
3236
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
3337
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
3438
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
@@ -62,3 +66,5 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N
6266
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
6367
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
6468
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
69+
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
70+
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

handlers/producer.go

Lines changed: 41 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,125 +1,81 @@
11
package handler
22

33
import (
4-
"encoding/json"
54
"errors"
6-
"log"
75
"strings"
86

97
"github.com/gofiber/fiber/v2"
108
"github.com/memphisdev/memphis.go"
11-
"google.golang.org/protobuf/proto"
129
)
1310

14-
type Handler struct{ P *memphis.Producer }
11+
var producers = make(map[string]*memphis.Producer)
1512

16-
func (p Handler) produce(message []byte, hdrs memphis.Headers) error {
17-
if err := p.P.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
18-
return err
19-
}
20-
return nil
21-
22-
}
13+
func handleHeaders(headers map[string]string) (memphis.Headers, error) {
14+
hdrs := memphis.Headers{}
15+
hdrs.New()
2316

24-
func handleMessageHdrs(bodyReq []byte, hdrs memphis.Headers) ([]byte, memphis.Headers, error) {
25-
type body struct {
26-
Message string `json:"message"`
27-
Headers string `json:"headers"`
28-
}
29-
var b body
30-
err := json.Unmarshal(bodyReq, &b)
31-
if err != nil {
32-
return nil, memphis.Headers{}, err
33-
}
34-
35-
var headers map[string]string
36-
err = json.Unmarshal([]byte(b.Headers), &headers)
37-
if err != nil {
38-
return nil, memphis.Headers{}, err
39-
}
40-
41-
var k, v string
4217
for key, value := range headers {
43-
k = key
44-
v = value
45-
46-
err = hdrs.Add(k, v)
18+
err := hdrs.Add(key, value)
4719
if err != nil {
48-
return nil, memphis.Headers{}, err
20+
return memphis.Headers{}, err
4921
}
5022
}
23+
return hdrs, nil
24+
}
5125

52-
message, err := json.Marshal(b.Message)
53-
if err != nil {
54-
return nil, memphis.Headers{}, err
26+
func createProducer(conn *memphis.Conn, producers map[string]*memphis.Producer, stationName string) (*memphis.Producer, error) {
27+
producerName := "http_proxy"
28+
var producer *memphis.Producer
29+
var err error
30+
if _, ok := producers[stationName]; !ok {
31+
producer, err = conn.CreateProducer(stationName, producerName, memphis.ProducerGenUniqueSuffix())
32+
if err != nil {
33+
return nil, err
34+
}
35+
producers[stationName] = producer
36+
} else {
37+
producer = producers[stationName]
5538
}
56-
return message, hdrs, nil
39+
40+
return producer, nil
5741
}
5842

59-
func CreateHandleMessage(p *memphis.Producer) func(*fiber.Ctx) error {
43+
func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
6044
return func(c *fiber.Ctx) error {
45+
stationName := c.Params("stationName")
46+
var producer *memphis.Producer
47+
48+
producer, err := createProducer(conn, producers, stationName)
49+
if err != nil {
50+
return err
51+
}
52+
6153
bodyReq := c.Body()
54+
headers := c.GetReqHeaders()
6255
contentType := string(c.Request().Header.ContentType())
63-
var message []byte
64-
var err error
65-
hdrs := memphis.Headers{}
66-
hdrs.New()
6756
caseText := strings.Contains(contentType, "text")
68-
6957
if caseText {
7058
contentType = "text/"
7159
}
7260

7361
switch contentType {
74-
case "application/json":
75-
message, hdrs, err = handleMessageHdrs(bodyReq, hdrs)
62+
case "application/json", "text/", "application/x-protobuf":
63+
message := bodyReq
64+
hdrs, err := handleHeaders(headers)
7665
if err != nil {
7766
return err
7867
}
79-
case "text/":
80-
message, hdrs, err = handleMessageHdrs(bodyReq, hdrs)
81-
if err != nil {
82-
return err
83-
}
84-
case "application/x-protobuf":
85-
msg := &Msg{}
86-
err := proto.Unmarshal(bodyReq, msg)
87-
if err != nil {
88-
log.Fatal("unmarshaling error: ", err)
89-
}
90-
91-
message, err = json.Marshal(msg.Message)
92-
if err != nil {
93-
return err
94-
}
95-
96-
var headers map[string]string
97-
err = json.Unmarshal([]byte(msg.Headers), &headers)
98-
if err != nil {
99-
return err
100-
}
101-
102-
var k, v string
103-
for key, value := range headers {
104-
k = key
105-
v = value
106-
err = hdrs.Add(k, v)
107-
if err != nil {
108-
return err
109-
}
68+
if err := producer.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
69+
c.Status(400)
70+
return c.JSON(&fiber.Map{
71+
"success": false,
72+
"error": err.Error(),
73+
})
11074
}
11175
default:
11276
return errors.New("unsupported content type")
11377
}
11478

115-
if err := p.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
116-
c.Status(400)
117-
return c.JSON(&fiber.Map{
118-
"success": false,
119-
"error": err.Error(),
120-
})
121-
}
122-
12379
c.Status(200)
12480
return c.JSON(&fiber.Map{
12581
"success": true,

main.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
"http-proxy/conf"
56
"http-proxy/router"
67

78
"os"
@@ -15,17 +16,13 @@ func main() {
1516
app := fiber.New()
1617
app.Use(cors.New())
1718

18-
conn, err := memphis.Connect("localhost", "root", "memphis")
19-
if err != nil {
20-
fmt.Println(err.Error())
21-
os.Exit(1)
22-
}
23-
producer, err := conn.CreateProducer("test-fiber-go", "simple_go_producer")
19+
configuration := conf.GetConfig()
20+
conn, err := memphis.Connect(configuration.MEMPHIS_HOST, configuration.ROOT_USER, configuration.CONNECTION_TOKEN)
2421
if err != nil {
2522
fmt.Println(err.Error())
2623
os.Exit(1)
2724
}
2825

29-
router.SetupRoutes(app, producer)
30-
app.Listen(":3000")
26+
router.SetupRoutes(app, conn)
27+
app.Listen(configuration.HTTP_PORT)
3128
}

router/router.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import (
88
)
99

1010
// SetupRoutes setup router api
11-
func SetupRoutes(app *fiber.App, producer *memphis.Producer) {
12-
13-
api := app.Group("/", logger.New())
14-
api.Post("/", handler.CreateHandleMessage(producer))
11+
func SetupRoutes(app *fiber.App, conn *memphis.Conn) {
12+
api := app.Group("/stations", logger.New())
13+
api.Post("/:stationName/produce/single", handler.CreateHandleMessage(conn))
1514
}

0 commit comments

Comments
 (0)