@@ -2,6 +2,8 @@ package ppsarama
22
33import (
44 "context"
5+ "sync"
6+
57 "github.com/Shopify/sarama"
68 "github.com/pinpoint-apm/pinpoint-go-agent"
79)
@@ -24,6 +26,8 @@ type asyncProducer struct {
2426 successes chan * sarama.ProducerMessage
2527 errors chan * sarama.ProducerError
2628 ctx context.Context
29+ spans map [string ]pinpoint.Tracer
30+ spansLock sync.Mutex
2731}
2832
2933// InputContext sends a given message with tracer context to the input channel of sarama.AsyncProducer.
@@ -78,37 +82,52 @@ func NewAsyncProducer(addrs []string, config *sarama.Config) (AsyncProducer, err
7882 successes : make (chan * sarama.ProducerMessage ),
7983 errors : make (chan * sarama.ProducerError ),
8084 ctx : context .Background (),
85+ spans : make (map [string ]pinpoint.Tracer ),
86+ spansLock : sync.Mutex {},
8187 }
8288
8389 go func () {
84- spans := make (map [string ]pinpoint.Tracer )
85- defer close (wrapped .successes )
86- defer close (wrapped .errors )
87-
8890 for {
8991 select {
90- case msgCtx := <- wrapped .inputContext :
92+ case msgCtx , ok := <- wrapped .inputContext :
93+ if ! ok {
94+ return
95+ }
9196 span := newAsyncProducerTracer (msgCtx .ctx , addrs , msgCtx .msg , config )
9297 producer .Input () <- msgCtx .msg
93- saveAsyncProducerTracer (config , spans , span )
98+ saveAsyncProducerTracer (config , wrapped , span )
9499
95- case msg := <- wrapped .input :
100+ case msg , ok := <- wrapped .input :
101+ if ! ok {
102+ return
103+ }
96104 span := newAsyncProducerTracer (wrapped .ctx , addrs , msg , config )
97105 producer .Input () <- msg
98- saveAsyncProducerTracer (config , spans , span )
106+ saveAsyncProducerTracer (config , wrapped , span )
107+ }
108+ }
109+ }()
110+
111+ go func () {
112+ defer close (wrapped .inputContext )
113+ defer close (wrapped .input )
114+ defer close (wrapped .successes )
115+ defer close (wrapped .errors )
99116
117+ for {
118+ select {
100119 case msg , ok := <- producer .Successes ():
101120 if ! ok {
102- return // producer was closed, so exit
121+ return
103122 }
104- endAsyncProducerTracer (spans , msg , nil )
123+ endAsyncProducerTracer (wrapped , msg , nil )
105124 wrapped .successes <- msg
106125
107126 case e , ok := <- producer .Errors ():
108127 if ! ok {
109- return // producer was closed
128+ return
110129 }
111- endAsyncProducerTracer (spans , e .Msg , e .Err )
130+ endAsyncProducerTracer (wrapped , e .Msg , e .Err )
112131 wrapped .errors <- e
113132 }
114133 }
@@ -139,27 +158,33 @@ func newAsyncProducerTracer(ctx context.Context, addrs []string, msg *sarama.Pro
139158 return tracer
140159}
141160
142- func saveAsyncProducerTracer (config * sarama.Config , spans map [ string ]pinpoint. Tracer , span pinpoint.Tracer ) {
161+ func saveAsyncProducerTracer (config * sarama.Config , wrapped * asyncProducer , span pinpoint.Tracer ) {
143162 if config .Producer .Return .Successes && span .IsSampled () {
144- spans [span .AsyncSpanId ()] = span
163+ wrapped .spansLock .Lock ()
164+ defer wrapped .spansLock .Unlock ()
165+
166+ wrapped .spans [span .AsyncSpanId ()] = span
145167 } else {
146168 span .EndSpanEvent ()
147169 span .EndSpan ()
148170 }
149171}
150172
151- func endAsyncProducerTracer (spans map [ string ]pinpoint. Tracer , msg * sarama.ProducerMessage , err error ) {
173+ func endAsyncProducerTracer (wrapped * asyncProducer , msg * sarama.ProducerMessage , err error ) {
152174 headers := & distributedTracingContextWriterProducer {msg }
153175 if id := headers .Get (HeaderAsyncSpanId ); id != "" {
154- if span , ok := spans [id ]; ok {
176+ wrapped .spansLock .Lock ()
177+ defer wrapped .spansLock .Unlock ()
178+
179+ if span , ok := wrapped .spans [id ]; ok {
155180 //fmt.Printf("Get HeaderAsyncSpanId :%s, topic : %s\n", id, msg.Topic)
156181 if err != nil {
157182 span .SpanEvent ().SetError (err )
158183 }
159184 span .EndSpanEvent ()
160185 span .EndSpan ()
161186
162- delete (spans , id )
187+ delete (wrapped . spans , id )
163188 }
164189 }
165190}
0 commit comments