Skip to content

Commit 39056a3

Browse files
authored
feat!(active-job): Normalize event messages (#1080)
1 parent 5f3f0a8 commit 39056a3

File tree

6 files changed

+28
-31
lines changed

6 files changed

+28
-31
lines changed

instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ module Instrumentation
1414
module ActiveJob
1515
# Module that contains custom event handlers, which are used to generate spans per event
1616
module Handlers
17+
EVENT_NAMESPACE = 'active_job'
18+
1719
module_function
1820

1921
# Subscribes Event Handlers to relevant ActiveJob notifications
@@ -57,7 +59,7 @@ def subscribe
5759
}
5860

5961
@subscriptions = handlers_by_pattern.map do |key, handler|
60-
::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler)
62+
::ActiveSupport::Notifications.subscribe("#{key}.#{EVENT_NAMESPACE}", handler)
6163
end
6264
end
6365

instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ def start(name, id, payload)
4040
# @param payload [Hash] containing job run information
4141
# @return [Hash] with the span and generated context tokens
4242
def start_span(name, _id, payload)
43-
span = tracer.start_span(name, attributes: @mapper.call(payload))
43+
job = payload.fetch(:job)
44+
event_name = name.delete_suffix(".#{EVENT_NAMESPACE}")
45+
span_name = span_name(job, event_name)
46+
span = tracer.start_span(span_name, attributes: @mapper.call(payload))
4447
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))
4548

4649
{ span: span, ctx_token: token }
@@ -106,6 +109,18 @@ def on_exception(exception, span)
106109
def tracer
107110
OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance.tracer
108111
end
112+
113+
private
114+
115+
def span_name(job, event_name)
116+
prefix = if @config[:span_naming] == :job_class
117+
job.class.name
118+
else
119+
job.queue_name
120+
end
121+
122+
"#{prefix} #{event_name}"
123+
end
109124
end
110125
end
111126
end

instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ module ActiveJob
1010
module Handlers
1111
# Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans
1212
class Enqueue < Default
13+
EVENT_NAME = 'publish'
14+
1315
# Overrides the `Default#start_span` method to create an egress span
1416
# and registers it with the current context
1517
#
@@ -19,22 +21,11 @@ class Enqueue < Default
1921
# @return [Hash] with the span and generated context tokens
2022
def start_span(name, _id, payload)
2123
job = payload.fetch(:job)
22-
span = tracer.start_span(span_name(job), kind: :producer, attributes: @mapper.call(payload))
24+
span_name = span_name(job, EVENT_NAME)
25+
span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
2326
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
2427
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
2528
end
26-
27-
private
28-
29-
def span_name(job)
30-
prefix = if @config[:span_naming] == :job_class
31-
job.class.name
32-
else
33-
job.queue_name
34-
end
35-
36-
"#{prefix} publish"
37-
end
3829
end
3930
end
4031
end

instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ module ActiveJob
1010
module Handlers
1111
# Handles perform.active_job to generate ingress spans
1212
class Perform < Default
13+
EVENT_NAME = 'process'
14+
1315
# Overrides the `Default#start_span` method to create an ingress span
1416
# and registers it with the current context
1517
#
@@ -19,10 +21,9 @@ class Perform < Default
1921
# @return [Hash] with the span and generated context tokens
2022
def start_span(name, _id, payload)
2123
job = payload.fetch(:job)
24+
span_name = span_name(job, EVENT_NAME)
2225
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)
2326

24-
span_name = span_name(job)
25-
2627
# TODO: Refactor into a propagation strategy
2728
propagation_style = @config[:propagation_style]
2829
if propagation_style == :child
@@ -48,18 +49,6 @@ def attach_consumer_context(span)
4849

4950
OpenTelemetry::Context.attach(internal_context)
5051
end
51-
52-
private
53-
54-
def span_name(job)
55-
prefix = if @config[:span_naming] == :job_class
56-
job.class.name
57-
else
58-
job.queue_name
59-
end
60-
61-
"#{prefix} process"
62-
end
6352
end
6453
end
6554
end

instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
let(:spans) { exporter.finished_spans }
1616
let(:publish_span) { spans.find { |s| s.name == 'default publish' } }
1717
let(:process_span) { spans.find { |s| s.name == 'default process' } }
18-
let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } }
18+
let(:discard_span) { spans.find { |s| s.name == 'default discard' } }
1919

2020
before do
2121
OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe

instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
let(:spans) { exporter.finished_spans }
1616
let(:publish_span) { spans.find { |s| s.name == 'default publish' } }
1717
let(:process_span) { spans.find { |s| s.name == 'default process' } }
18-
let(:retry_span) { spans.find { |s| s.name == 'retry_stopped.active_job' } }
18+
let(:retry_span) { spans.find { |s| s.name == 'default retry_stopped' } }
1919

2020
before do
2121
OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe

0 commit comments

Comments
 (0)