Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ def start_span(name, _id, payload)
span = tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links)
end

{ span: span, ctx_token: attach_consumer_context(span) }
{ span: span, ctx_token: attach_consumer_context(span, parent_context) }
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
# 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK.
# @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
# @param parent_context [Context] The context to use as the parent for the consumer context
# @return [Numeric] Context token that must be detached when finished
def attach_consumer_context(span)
consumer_context = OpenTelemetry::Trace.context_with_span(span)
def attach_consumer_context(span, parent_context)
consumer_context = OpenTelemetry::Trace.context_with_span(span, parent_context: parent_context)
internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context)

OpenTelemetry::Context.attach(internal_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,6 @@
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)

_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
Expand All @@ -225,6 +210,21 @@
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test case needs to be run with an async queue adapter

ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
perform_enqueued_jobs

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
end

Expand All @@ -251,18 +251,6 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
Expand All @@ -284,6 +272,20 @@
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
BaggageJob.perform_later
end
perform_enqueued_jobs

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
end

Expand Down
Loading