Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er
span := p.tracer.StartProduceSpan(tMsg)

var errChan chan error

if deliveryChan == nil {
deliveryChan = p.events
}

deliveryChan, errChan = kafkatrace.WrapDeliveryChannel(p.tracer, deliveryChan, span, wrapEvent)

p.tracer.SetProduceCheckpoint(tMsg)
Expand Down
39 changes: 33 additions & 6 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ func TestConsumerChannel(t *testing.T) {

func TestConsumerFunctional(t *testing.T) {
for _, tt := range []struct {
name string
action consumerActionFn
name string
action consumerActionFn
useProducerEventsChannel bool
}{
{
name: "Poll",
Expand All @@ -126,9 +127,16 @@ func TestConsumerFunctional(t *testing.T) {
return c.ReadMessage(3000 * time.Millisecond)
},
},
{
name: "UseProducerEventsChannel",
action: func(c *Consumer) (*kafka.Message, error) {
return c.ReadMessage(3000 * time.Millisecond)
},
useProducerEventsChannel: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()})
spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()}, tt.useProducerEventsChannel)

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
Expand All @@ -143,6 +151,8 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers))
assert.Equal(t, "gotest", s0.Tag("messaging.destination.name"))
assert.Nil(t, s0.Tag(ext.ErrorMsg))
assert.Nil(t, s0.Tag(ext.ErrorType))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
Expand Down Expand Up @@ -332,9 +342,13 @@ func TestProduceError(t *testing.T) {

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
s0 := spans[0]
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "Local: Invalid argument or configuration", s0.Tag(ext.ErrorMsg))
assert.Equal(t, "kafka.Error", s0.Tag(ext.ErrorType))
}

func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]*mocktracer.Span, *kafka.Message) {
func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option, useProducerEventsChannel bool) ([]*mocktracer.Span, *kafka.Message) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
}
Expand All @@ -348,7 +362,11 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
}, producerOpts...)
require.NoError(t, err)

delivery := make(chan kafka.Event, 1)
var delivery chan kafka.Event = nil
if !useProducerEventsChannel {
delivery = make(chan kafka.Event, 1)
}

err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Expand All @@ -359,7 +377,16 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
}, delivery)
require.NoError(t, err)

msg1, _ := (<-delivery).(*kafka.Message)
var evt kafka.Event
select {
case evt = <-p.Events():
case evt = <-delivery:
}
msg1, ok := evt.(*kafka.Message)
require.True(t, ok)
assert.Equal(t, "key2", string(msg1.Key))
assert.Equal(t, "value2", string(msg1.Value))

p.Close()

// next attempt to consume the message
Expand Down
Loading