Skip to content

Commit d91f667

Browse files
Merge pull request #30 from memphisdev/remove-create-producer
use conn.produce instead of creating producer
2 parents a97b1dd + f5c8b7a commit d91f667

File tree

1 file changed

+17
-99
lines changed

1 file changed

+17
-99
lines changed

handlers/producer.go

Lines changed: 17 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"github.com/memphisdev/memphis.go"
1111
)
1212

13-
var producers = make(map[string]*memphis.Producer)
14-
1513
func handleHeaders(headers map[string]string) (memphis.Headers, error) {
1614
hdrs := memphis.Headers{}
1715
hdrs.New()
@@ -25,35 +23,10 @@ func handleHeaders(headers map[string]string) (memphis.Headers, error) {
2523
return hdrs, nil
2624
}
2725

28-
func createProducer(conn *memphis.Conn, producers map[string]*memphis.Producer, stationName string) (*memphis.Producer, error) {
29-
producerName := "rest_gateway"
30-
var producer *memphis.Producer
31-
var err error
32-
if _, ok := producers[stationName]; !ok {
33-
producer, err = conn.CreateProducer(stationName, producerName, memphis.ProducerGenUniqueSuffix())
34-
if err != nil {
35-
return nil, err
36-
}
37-
producers[stationName] = producer
38-
} else {
39-
producer = producers[stationName]
40-
}
41-
42-
return producer, nil
43-
}
44-
4526
func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
4627
return func(c *fiber.Ctx) error {
4728
log := logger.GetLogger(c)
4829
stationName := c.Params("stationName")
49-
var producer *memphis.Producer
50-
51-
producer, err := createProducer(conn, producers, stationName)
52-
if err != nil {
53-
log.Errorf("CreateHandleMessage - createProducer: %s", err.Error())
54-
return err
55-
}
56-
5730
bodyReq := c.Body()
5831
headers := c.GetReqHeaders()
5932
contentType := string(c.Request().Header.ContentType())
@@ -73,35 +46,14 @@ func CreateHandleMessage(conn *memphis.Conn) func(*fiber.Ctx) error {
7346
log.Errorf("CreateHandleMessage - handleHeaders: %s", err.Error())
7447
return err
7548
}
76-
if err := producer.Produce(message, memphis.MsgHeaders(hdrs)); err != nil {
77-
if strings.Contains(err.Error(), "memphis: no responders available for request") {
78-
delete(producers, stationName)
79-
producer, err = createProducer(conn, producers, stationName)
80-
if err != nil {
81-
log.Errorf("CreateHandleMessage - createProducer retry: %s", err.Error())
82-
c.Status(500)
83-
return c.JSON(&fiber.Map{
84-
"success": false,
85-
"error": err.Error(),
86-
})
87-
}
88-
err = producer.Produce(message, memphis.MsgHeaders(hdrs))
89-
if err != nil {
90-
log.Errorf("CreateHandleMessage - produce retry: %s", err.Error())
91-
c.Status(500)
92-
return c.JSON(&fiber.Map{
93-
"success": false,
94-
"error": err.Error(),
95-
})
96-
}
97-
} else {
98-
log.Errorf("CreateHandleMessage - produce: %s", err.Error())
99-
c.Status(500)
100-
return c.JSON(&fiber.Map{
101-
"success": false,
102-
"error": err.Error(),
103-
})
104-
}
49+
err = conn.Produce(stationName, "rest_gateway", message, []memphis.ProducerOpt{memphis.ProducerGenUniqueSuffix()}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)})
50+
if err != nil {
51+
log.Errorf("CreateHandleMessage - produce: %s", err.Error())
52+
c.Status(500)
53+
return c.JSON(&fiber.Map{
54+
"success": false,
55+
"error": err.Error(),
56+
})
10557
}
10658
default:
10759
return errors.New("unsupported content type")
@@ -119,14 +71,6 @@ func CreateHandleBatch(conn *memphis.Conn) func(*fiber.Ctx) error {
11971
return func(c *fiber.Ctx) error {
12072
log := logger.GetLogger(c)
12173
stationName := c.Params("stationName")
122-
var producer *memphis.Producer
123-
124-
producer, err := createProducer(conn, producers, stationName)
125-
if err != nil {
126-
log.Errorf("CreateHandleBatch - createProducer: %s", err.Error())
127-
return err
128-
}
129-
13074
bodyReq := c.Body()
13175
headers := c.GetReqHeaders()
13276
contentType := string(c.Request().Header.ContentType())
@@ -154,41 +98,15 @@ func CreateHandleBatch(conn *memphis.Conn) func(*fiber.Ctx) error {
15498
allErr = append(allErr, err.Error())
15599
continue
156100
}
157-
if err := producer.Produce(rawRes, memphis.MsgHeaders(hdrs)); err != nil {
158-
if strings.Contains(err.Error(), "memphis: no responders available for request") {
159-
delete(producers, stationName)
160-
producer, err = createProducer(conn, producers, stationName)
161-
if err != nil {
162-
log.Errorf("CreateHandleBatch - createProducer retry: %s", err.Error())
163-
errCount++
164-
allErr = append(allErr, err.Error())
165-
c.Status(400)
166-
return c.JSON(&fiber.Map{
167-
"success": false,
168-
"error": allErr,
169-
})
170-
}
171-
err = producer.Produce(rawRes, memphis.MsgHeaders(hdrs))
172-
if err != nil {
173-
log.Errorf("CreateHandleBatch - produce retry: %s", err.Error())
174-
errCount++
175-
allErr = append(allErr, err.Error())
176-
c.Status(400)
177-
return c.JSON(&fiber.Map{
178-
"success": false,
179-
"error": allErr,
180-
})
181-
}
182-
} else {
183-
log.Errorf("CreateHandleBatch - produce: %s", err.Error())
184-
errCount++
185-
allErr = append(allErr, err.Error())
186-
c.Status(400)
187-
return c.JSON(&fiber.Map{
188-
"success": false,
189-
"error": allErr,
190-
})
191-
}
101+
if err := conn.Produce(stationName, "rest_gateway", rawRes, []memphis.ProducerOpt{memphis.ProducerGenUniqueSuffix()}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)}); err != nil {
102+
log.Errorf("CreateHandleBatch - produce: %s", err.Error())
103+
errCount++
104+
allErr = append(allErr, err.Error())
105+
c.Status(400)
106+
return c.JSON(&fiber.Map{
107+
"success": false,
108+
"error": allErr,
109+
})
192110
}
193111
}
194112

0 commit comments

Comments
 (0)