Skip to content

Conversation

mrajashree
Copy link
Contributor

@mrajashree mrajashree commented Feb 16, 2023

Description

Fixes #1674

The package confluent-kafka has a class Consumer, which has a method called poll. As per docs, the recommended usage is calling consumer.poll infinitely, so it keeps polling the brokers for messages, and then each time it's called, the code is supposed to check if the message is None or has any error before trying to process the message. (Ref: https://github.com/confluentinc/confluent-kafka-python#basic-consumer-example)

The package opentelemetry-instrumentation-confluent-kafka offers a wrapper around confluent-kafka's consumer, called ProxiedConsumer. ProxiedConsumer also has a method call poll, which calls ConfluentKafkaInstrumentor's wrap_poll method. This wrap_poll method calls the underlying consumer's poll method with the user specified timeout.
The confluent_kafka.Consumer.poll method is supposed to be called from an application from within an infinite loop, which means the wrap_poll method will also be called with each iteration of the infinite loop. This is the observed behavior with the current implementation of wrap_poll:

  1. The wrap_poll method is creating a span each time it is called. If we go by this example where the consumer.poll is called with a timeout of 1 second, the current wrap_poll implementation will create a span per second.
  2. It starts this span before extracting context from the kafka message, so this span is no longer linked to any previous spans. Whereas it should only create a span after checking that the received record is not None (here) and is an actual kafka message.
  3. Since the span started before checking if record exists here is started as current span, the span that's started after record is received will use the current span's context even if the links contain the context from the message headers.
  4. Lastly, wrap_poll returns the record even if the record is None, it should only return record if the record exists.

This PR fixes the above issues. This is assuming my understanding and usage of ProxiedConsumer is right, please correct me if not. Here's a sample code snippet based on the docs:

c = confluent_kafka.Consumer({ 'bootstrap.servers': 'localhost:29092' })
consumer = ConfluentKafkaInstrumentor().instrument_consumer(c, tracer_provider=tracer_provider)
consumer.subscribe(['mytopic'])
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    process(msg) // process is just some method to handle messages

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Tested this change and verified that the span created by wrap_poll stays linked to all spans created previously for the current trace. Also the spans created after wrap_poll stay linked to the same trace for a kafka message.

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

@shalevr
Copy link
Member

shalevr commented Feb 27, 2023

Please add a changelog entry

@srikanthccv
Copy link
Member

There was probably a reason why span links were used. Please take a look at the change again.

@mrajashree
Copy link
Contributor Author

There was probably a reason why span links were used. Please take a look at the change again.

@srikanthccv yes I wrote a bit in the PR description about the usage of links here and why I think it's not the right choice here. Here's what I think:

  1. The span links were used to link the span started here to the span associated with the sender of kafka message.
  2. Links by definition can point to spans within the same trace or across traces. In this case we want the new span to be a part of the same trace as the span from which the kafka message was sent.
  3. Passing in links alone won't ensure that the new span will belong to the current trace. Because as per the start_span code here, the context argument is checked first, and if it's not passed, then the new span is considered as the root span. So while it will stay linked to the previous spans, it won't be part of a same trace.
  4. So as a proposed change in this PR, I'm starting the new span by passing it the context extracted from previous spans. This way I've confirmed that all spans associated to a kafka message belong to the same trace.

Let me know what you think

@owenhaynes
Copy link

@mrajashree I think the link code needs to stay as it was mostly correct as it was mostly following https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/#batch-receiving as Kafka is a batch receiver.

Also there is some more background here https://opentelemetry.io/blog/2022/instrument-kafka-clients/

@ocelotl
Copy link
Contributor

ocelotl commented Jul 12, 2023

@mrajashree do you have any thoughts on this after the comment from @owenhaynes?

@Samira-El
Copy link

Samira-El commented Aug 17, 2023

Hi, anything can be done to push this through?

It would be great to have the consumer use the context from the kafka message and be in the same trace as the producer, this would be much more useful in a distributed system to trace async flows.

In this example https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#topic-with-multiple-consumers

the spans created by the consumers have the same parent as the producer.

@alexchowle
Copy link

It would be very useful to resolve this issue. As it stands we don't have viable distributed tracing - just separate Producer and Consumer Spans

@mrajashree
Copy link
Contributor Author

hi @ocelotl and @owenhaynes, as per the opentelemtry semantic conventions for messaging, context propagation is a must, which is not being done currently https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#context-propagation

A message may traverse many different components and layers in one or more intermediaries when it is propagated from the producer to the consumer(s). To be able to correlate consumer traces with producer traces using the existing context propagation mechanisms, all components must propagate context down the chain.

There's some more background on this here: https://github.com/open-telemetry/oteps/blob/main/text/trace/0205-messaging-semantic-conventions-context-propagation.md

This issue was also fixed in the java lib (open-telemetry/opentelemetry-java-instrumentation#3529)

I think if we need to use links, we also need to figure out a way of propagating context and not starting a new trace each time consumer receives a message

@alolita
Copy link
Member

alolita commented Jun 4, 2024

Hi @lzchen @ocelotl @shalevr - can you please take a look at this PR and merge. As the contributor @mrajashree mentioned this fix has been done in other language libraries and is needed by downstream by end-users. Thanks for your help in getting this fix in.

@luisRubiera
Copy link

can someone please review this PR ? it has been waiting for so log :(

@emdneto emdneto requested a review from a team as a code owner October 16, 2024 22:17
@xrmx
Copy link
Contributor

xrmx commented Oct 22, 2024

@luisRubiera tests are red, needs to be updated to match whatever changed in the meantime

([#2355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2355))
- AwsLambdaInstrumentor sets `cloud.account.id` span attribute
([#2367](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2367))
- Create span only after record is received while polling
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to unreleased section

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Reviewed PRs that need fixes

Development

Successfully merging this pull request may close these issues.

ProxiedConsumer is creating spans each time it polls instead of only when it receives a message