Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2355))
- AwsLambdaInstrumentor sets `cloud.account.id` span attribute
([#2367](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2367))
- Create span only after record is received while polling
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to unreleased section

([#1678](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1678))


### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,29 @@ def wrap_produce(func, instance, tracer, args, kwargs):

@staticmethod
def wrap_poll(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
record = func(*args, **kwargs)
if record:
_create_new_consume_span(instance, tracer, [record])
record = func(*args, **kwargs)
if record:
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
span_name = _get_span_name("recv", record.topic())
with tracer.start_as_current_span(
span_name,
end_on_exit=True,
kind=trace.SpanKind.CONSUMER,
context=ctx,
) as current_consume_span:
_enrich_span(
instance._current_consume_span,
current_consume_span,
record.topic(),
record.partition(),
record.offset(),
operation=MessagingOperationValues.PROCESS,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return record
propagate.inject(
record.headers(),
setter=_kafka_setter,
)
return record
return None

@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
Expand Down
Loading