diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index 7c3fe5a88d..de97fc6da4 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -195,6 +195,11 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er span := p.tracer.StartProduceSpan(tMsg) var errChan chan error + + if deliveryChan == nil { + deliveryChan = p.events + } + deliveryChan, errChan = kafkatrace.WrapDeliveryChannel(p.tracer, deliveryChan, span, wrapEvent) p.tracer.SetProduceCheckpoint(tMsg) diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index fa19998f23..66875d0837 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -106,8 +106,9 @@ func TestConsumerChannel(t *testing.T) { func TestConsumerFunctional(t *testing.T) { for _, tt := range []struct { - name string - action consumerActionFn + name string + action consumerActionFn + useProducerEventsChannel bool }{ { name: "Poll", @@ -126,9 +127,16 @@ func TestConsumerFunctional(t *testing.T) { return c.ReadMessage(3000 * time.Millisecond) }, }, + { + name: "UseProducerEventsChannel", + action: func(c *Consumer) (*kafka.Message, error) { + return c.ReadMessage(3000 * time.Millisecond) + }, + useProducerEventsChannel: true, + }, } { t.Run(tt.name, func(t *testing.T) { - spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()}) + spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()}, tt.useProducerEventsChannel) s0 := spans[0] // produce assert.Equal(t, "kafka.produce", s0.OperationName()) @@ -143,6 +151,8 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) assert.Equal(t, "gotest", s0.Tag("messaging.destination.name")) + assert.Nil(t, s0.Tag(ext.ErrorMsg)) + assert.Nil(t, s0.Tag(ext.ErrorType)) s1 := spans[1] // consume assert.Equal(t, "kafka.consume", s1.OperationName()) @@ -332,9 +342,13 @@ func TestProduceError(t *testing.T) { spans := mt.FinishedSpans() assert.Len(t, spans, 1) + s0 := spans[0] + assert.Equal(t, "kafka.produce", s0.OperationName()) + assert.Equal(t, "Local: Invalid argument or configuration", s0.Tag(ext.ErrorMsg)) + assert.Equal(t, "kafka.Error", s0.Tag(ext.ErrorType)) } -func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]*mocktracer.Span, *kafka.Message) { +func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option, useProducerEventsChannel bool) ([]*mocktracer.Span, *kafka.Message) { if _, ok := os.LookupEnv("INTEGRATION"); !ok { t.Skip("to enable integration test, set the INTEGRATION environment variable") } @@ -348,7 +362,11 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO }, producerOpts...) require.NoError(t, err) - delivery := make(chan kafka.Event, 1) + var delivery chan kafka.Event = nil + if !useProducerEventsChannel { + delivery = make(chan kafka.Event, 1) + } + err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &testTopic, @@ -359,7 +377,16 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO }, delivery) require.NoError(t, err) - msg1, _ := (<-delivery).(*kafka.Message) + var evt kafka.Event + select { + case evt = <-p.Events(): + case evt = <-delivery: + } + msg1, ok := evt.(*kafka.Message) + require.True(t, ok) + assert.Equal(t, "key2", string(msg1.Key)) + assert.Equal(t, "value2", string(msg1.Value)) + p.Close() // next attempt to consume the message