-
Notifications
You must be signed in to change notification settings - Fork 798
Description
Describe your environment
Python 3.8
confluent-kafka 1.8.2
opentelemetry-instrumentation-confluent-kafka 0.37b0
Steps to reproduce
Called the poll
method on the ProxiedConsumer from a while true loop, as it's the recommended usage from the confluent-kafka Consumer example given here: https://github.com/confluentinc/confluent-kafka-python#basic-consumer-example
What is the expected behavior?
The poll
method on ProxiedConsumer calls wrap_poll
method, which should:
- First ensure that the
record
returned from calling the underlying confluent-kafka.Consumer's poll method exists. - After checking that the record exists, it should extract the context from the record headers, and then start a span using this context, so that the span stays linked to the spans that have been created before the consumer received the current kafka message.
What is the actual behavior?
- The
wrap_poll
method is creating a span each time it is called. It starts this span before extracting context from the kafka message, so this span is no longer linked to any previous spans. Whereas it should only create a span after checking that the received record is not None (here) and is an actual kafka message. - Since the span started before checking if record exists here is started as current span, the span that's started after record is received will use the current span's context even if the links contain the context from the message headers.
- Lastly,
wrap_poll
returns the record even if the record is None, it should only return record if the record exists.
Additional context
I want to confirm if the way I'm using ProxiedConsumer and its poll method is correct. Here's my understanding and how I'm using it:
ProxiedConsumer is a wrapper around the confluent-kafka Consumer class. ProxiedConsumer has a method call poll
, which calls ConfluentKafkaInstrumentor's wrap_poll
method. And wrap_poll
method calls the underlying Consumer's poll method with the user specified timeout.
As a user of opentelemetry-instrumentation-confluent-kafka, one would create the ProxiedConsumer and use it as follows based on the docs:
c = confluent_kafka.Consumer({ 'bootstrap.servers': 'localhost:29092' })
consumer = ConfluentKafkaInstrumentor().instrument_consumer(c, tracer_provider=tracer_provider)
consumer.subscribe(['mytopic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
process(msg) // process is just some method to handle messages
As in the code above, the ProxiedConsumer's poll would be called from a while True loop, with a timeout of 1, resulting in spans per second, even if the msg from kafka could be None
.
- The
wrap_poll
method should instead start the span only if the record it gets from callingpoll
actually exists. - It should start this span after extracting context from the record headers and using that as the context while starting span as current span.
- After it starts the span, it should also inject the current context in the record headers, so that future spans for this record are created using the current span's tracing headers