Skip to content

Commit 0eafb1a

Browse files
committed
[#106] apply IBM-Sarama plugin
1 parent 5ab26d4 commit 0eafb1a

File tree

1 file changed

+42
-17
lines changed

1 file changed

+42
-17
lines changed

plugin/sarama-IBM/asyncproducer.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package ppsaramaibm
22

33
import (
44
"context"
5+
"sync"
6+
57
"github.com/IBM/sarama"
68
"github.com/pinpoint-apm/pinpoint-go-agent"
79
)
@@ -24,6 +26,8 @@ type asyncProducer struct {
2426
successes chan *sarama.ProducerMessage
2527
errors chan *sarama.ProducerError
2628
ctx context.Context
29+
spans map[string]pinpoint.Tracer
30+
spansLock sync.Mutex
2731
}
2832

2933
// InputContext sends a given message with tracer context to the input channel of sarama.AsyncProducer.
@@ -78,37 +82,52 @@ func NewAsyncProducer(addrs []string, config *sarama.Config) (AsyncProducer, err
7882
successes: make(chan *sarama.ProducerMessage),
7983
errors: make(chan *sarama.ProducerError),
8084
ctx: context.Background(),
85+
spans: make(map[string]pinpoint.Tracer),
86+
spansLock: sync.Mutex{},
8187
}
8288

8389
go func() {
84-
spans := make(map[string]pinpoint.Tracer)
85-
defer close(wrapped.successes)
86-
defer close(wrapped.errors)
87-
8890
for {
8991
select {
90-
case msgCtx := <-wrapped.inputContext:
92+
case msgCtx, ok := <-wrapped.inputContext:
93+
if !ok {
94+
return
95+
}
9196
span := newAsyncProducerTracer(msgCtx.ctx, addrs, msgCtx.msg, config)
9297
producer.Input() <- msgCtx.msg
93-
saveAsyncProducerTracer(config, spans, span)
98+
saveAsyncProducerTracer(config, wrapped, span)
9499

95-
case msg := <-wrapped.input:
100+
case msg, ok := <-wrapped.input:
101+
if !ok {
102+
return
103+
}
96104
span := newAsyncProducerTracer(wrapped.ctx, addrs, msg, config)
97105
producer.Input() <- msg
98-
saveAsyncProducerTracer(config, spans, span)
106+
saveAsyncProducerTracer(config, wrapped, span)
107+
}
108+
}
109+
}()
110+
111+
go func() {
112+
defer close(wrapped.inputContext)
113+
defer close(wrapped.input)
114+
defer close(wrapped.successes)
115+
defer close(wrapped.errors)
99116

117+
for {
118+
select {
100119
case msg, ok := <-producer.Successes():
101120
if !ok {
102-
return // producer was closed, so exit
121+
return
103122
}
104-
endAsyncProducerTracer(spans, msg, nil)
123+
endAsyncProducerTracer(wrapped, msg, nil)
105124
wrapped.successes <- msg
106125

107126
case e, ok := <-producer.Errors():
108127
if !ok {
109-
return // producer was closed
128+
return
110129
}
111-
endAsyncProducerTracer(spans, e.Msg, e.Err)
130+
endAsyncProducerTracer(wrapped, e.Msg, e.Err)
112131
wrapped.errors <- e
113132
}
114133
}
@@ -139,27 +158,33 @@ func newAsyncProducerTracer(ctx context.Context, addrs []string, msg *sarama.Pro
139158
return tracer
140159
}
141160

142-
func saveAsyncProducerTracer(config *sarama.Config, spans map[string]pinpoint.Tracer, span pinpoint.Tracer) {
161+
func saveAsyncProducerTracer(config *sarama.Config, wrapped *asyncProducer, span pinpoint.Tracer) {
143162
if config.Producer.Return.Successes && span.IsSampled() {
144-
spans[span.AsyncSpanId()] = span
163+
wrapped.spansLock.Lock()
164+
defer wrapped.spansLock.Unlock()
165+
166+
wrapped.spans[span.AsyncSpanId()] = span
145167
} else {
146168
span.EndSpanEvent()
147169
span.EndSpan()
148170
}
149171
}
150172

151-
func endAsyncProducerTracer(spans map[string]pinpoint.Tracer, msg *sarama.ProducerMessage, err error) {
173+
func endAsyncProducerTracer(wrapped *asyncProducer, msg *sarama.ProducerMessage, err error) {
152174
headers := &distributedTracingContextWriterProducer{msg}
153175
if id := headers.Get(HeaderAsyncSpanId); id != "" {
154-
if span, ok := spans[id]; ok {
176+
wrapped.spansLock.Lock()
177+
defer wrapped.spansLock.Unlock()
178+
179+
if span, ok := wrapped.spans[id]; ok {
155180
//fmt.Printf("Get HeaderAsyncSpanId :%s, topic : %s\n", id, msg.Topic)
156181
if err != nil {
157182
span.SpanEvent().SetError(err)
158183
}
159184
span.EndSpanEvent()
160185
span.EndSpan()
161186

162-
delete(spans, id)
187+
delete(wrapped.spans, id)
163188
}
164189
}
165190
}

0 commit comments

Comments
 (0)