Skip to content

Commit 191b5e9

Browse files
committed
Update to semantic conventions 1.27
1 parent a60f775 commit 191b5e9

File tree

5 files changed

+152
-74
lines changed

5 files changed

+152
-74
lines changed

instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ defmodule OpentelemetryOban do
1818

1919
alias Ecto.Changeset
2020
alias OpenTelemetry.Span
21-
alias OpenTelemetry.SemanticConventions.Trace
21+
alias OpenTelemetry.SemConv.Incubating.MessagingAttributes
2222

23-
require Trace
2423
require OpenTelemetry.Tracer
2524

2625
@doc """
@@ -53,9 +52,9 @@ defmodule OpentelemetryOban do
5352

5453
def insert(name \\ Oban, %Changeset{} = changeset) do
5554
attributes = attributes_before_insert(changeset)
56-
worker = Changeset.get_field(changeset, :worker)
55+
span_name = span_name(attributes)
5756

58-
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
57+
OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do
5958
changeset = add_tracing_information_to_meta(changeset)
6059

6160
case Oban.insert(name, changeset) do
@@ -75,9 +74,9 @@ defmodule OpentelemetryOban do
7574

7675
def insert!(name \\ Oban, %Changeset{} = changeset) do
7776
attributes = attributes_before_insert(changeset)
78-
worker = Changeset.get_field(changeset, :worker)
77+
span_name = span_name(attributes)
7978

80-
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
79+
OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do
8180
changeset = add_tracing_information_to_meta(changeset)
8281

8382
try do
@@ -101,10 +100,10 @@ defmodule OpentelemetryOban do
101100
end
102101

103102
def insert_all(name, changesets) when is_list(changesets) do
104-
# changesets in insert_all can include different workers and different
105-
# queues. This means we cannot provide much information here, but we can
106-
# still record the insert and propagate the context information.
107-
OpenTelemetry.Tracer.with_span :"Oban bulk insert", kind: :producer do
103+
attributes = attributes_before_insert(changesets)
104+
span_name = span_name(attributes)
105+
106+
OpenTelemetry.Tracer.with_span span_name, kind: :producer, attributes: attributes do
108107
changesets = Enum.map(changesets, &add_tracing_information_to_meta/1)
109108
Oban.insert_all(name, changesets)
110109
end
@@ -125,23 +124,76 @@ defmodule OpentelemetryOban do
125124
Changeset.change(changeset, %{meta: new_meta})
126125
end
127126

127+
defp attributes_before_insert(changesets) when is_list(changesets) do
128+
{queues, workers} =
129+
Enum.reduce(changesets, {[], []}, fn changeset, {queues, workers} ->
130+
queue = Changeset.get_field(changeset, :queue)
131+
worker = Changeset.get_field(changeset, :worker)
132+
133+
{Enum.uniq([queue | queues]), Enum.uniq([worker | workers])}
134+
end)
135+
136+
%{
137+
unquote(MessagingAttributes.messaging_system()) => :oban,
138+
unquote(MessagingAttributes.messaging_operation_name()) => :send,
139+
unquote(MessagingAttributes.messaging_operation_type()) =>
140+
unquote(MessagingAttributes.messaging_operation_type_values().publish)
141+
}
142+
# If the attribute value is the same for all messages in the batch, the instrumentation SHOULD set such attribute on the span representing the batch operation.
143+
|> then(fn attributes ->
144+
case queues do
145+
[queue] ->
146+
Map.put(attributes, unquote(MessagingAttributes.messaging_consumer_group_name()), queue)
147+
148+
_ ->
149+
attributes
150+
end
151+
end)
152+
|> then(fn attributes ->
153+
case workers do
154+
[worker] ->
155+
Map.put(attributes, unquote(MessagingAttributes.messaging_destination_name()), worker)
156+
157+
_ ->
158+
attributes
159+
end
160+
end)
161+
end
162+
128163
defp attributes_before_insert(changeset) do
129164
queue = Changeset.get_field(changeset, :queue)
130165
worker = Changeset.get_field(changeset, :worker)
131166

132167
%{
133-
Trace.messaging_system() => :oban,
134-
Trace.messaging_destination() => queue,
135-
Trace.messaging_destination_kind() => :queue,
168+
unquote(MessagingAttributes.messaging_system()) => :oban,
169+
unquote(MessagingAttributes.messaging_consumer_group_name()) => queue,
170+
unquote(MessagingAttributes.messaging_destination_name()) => worker,
171+
unquote(MessagingAttributes.messaging_operation_name()) => :send,
172+
unquote(MessagingAttributes.messaging_operation_type()) =>
173+
unquote(MessagingAttributes.messaging_operation_type_values().publish),
136174
:"oban.job.worker" => worker
137175
}
138176
end
139177

140178
defp attributes_after_insert(job) do
141179
%{
180+
unquote(MessagingAttributes.messaging_message_id()) => job.id,
142181
"oban.job.job_id": job.id,
143182
"oban.job.priority": job.priority,
144183
"oban.job.max_attempts": job.max_attempts
145184
}
146185
end
186+
187+
# `messaging.destination.name` SHOULD be used when the destination is known to be neither temporary nor anonymous.
188+
defp span_name(%{
189+
unquote(MessagingAttributes.messaging_operation_name()) => operation,
190+
unquote(MessagingAttributes.messaging_destination_name()) => destination
191+
}) do
192+
"#{operation} #{destination}"
193+
end
194+
195+
# If a corresponding `{destination}` value is not available for a specific operation, the instrumentation SHOULD omit the {destination}.
196+
defp span_name(%{unquote(MessagingAttributes.messaging_operation_name()) => operation}) do
197+
"#{operation}"
198+
end
147199
end

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
defmodule OpentelemetryOban.JobHandler do
22
alias OpenTelemetry.Span
3-
alias OpenTelemetry.SemanticConventions.Trace
4-
5-
require Trace
3+
alias OpenTelemetry.SemConv.Incubating.MessagingAttributes
64

75
@tracer_id __MODULE__
86

@@ -60,10 +58,12 @@ defmodule OpentelemetryOban.JobHandler do
6058
OpenTelemetry.Tracer.set_current_span(:undefined)
6159

6260
attributes = %{
63-
Trace.messaging_system() => :oban,
64-
Trace.messaging_destination() => queue,
65-
Trace.messaging_destination_kind() => :queue,
66-
Trace.messaging_operation() => :process,
61+
unquote(MessagingAttributes.messaging_system()) => :oban,
62+
unquote(MessagingAttributes.messaging_destination_name()) => worker,
63+
unquote(MessagingAttributes.messaging_consumer_group_name()) => queue,
64+
unquote(MessagingAttributes.messaging_operation_name()) => :process,
65+
unquote(MessagingAttributes.messaging_operation_type()) => :process,
66+
unquote(MessagingAttributes.messaging_message_id()) => id,
6767
:"oban.job.job_id" => id,
6868
:"oban.job.worker" => worker,
6969
:"oban.job.priority" => priority,
@@ -73,15 +73,20 @@ defmodule OpentelemetryOban.JobHandler do
7373
:"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
7474
}
7575

76-
span_name = "#{worker} process"
77-
78-
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
76+
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name(attributes), metadata, %{
7977
kind: :consumer,
8078
links: links,
8179
attributes: attributes
8280
})
8381
end
8482

83+
defp span_name(%{
84+
unquote(MessagingAttributes.messaging_destination_name()) => destination_name,
85+
unquote(MessagingAttributes.messaging_operation_name()) => operation
86+
}) do
87+
"#{operation} #{destination_name}"
88+
end
89+
8590
def handle_job_stop(_event, _measurements, metadata, _config) do
8691
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
8792
end

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,19 @@ defmodule OpentelemetryOban.PluginHandler do
3838
end
3939

4040
def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do
41+
attributes = %{"oban.plugin": plugin}
42+
span_name = span_name(attributes)
43+
4144
OpentelemetryTelemetry.start_telemetry_span(
4245
@tracer_id,
43-
"#{plugin} process",
46+
span_name,
4447
metadata,
45-
%{attributes: %{"oban.plugin": plugin}}
48+
%{attributes: attributes}
4649
)
4750
end
4851

52+
defp span_name(%{"oban.plugin": plugin}), do: "oban.plugin #{inspect(plugin)}"
53+
4954
def handle_plugin_stop(_event, _measurements, metadata, _config) do
5055
Tracer.set_attributes(end_span_plugin_attrs(metadata))
5156
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)

instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
4545
%{plugin: Elixir.Oban.Plugins.Stager}
4646
)
4747

48-
refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
48+
refute_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")}
4949
end
5050

5151
test "records span on plugin execution" do
@@ -61,7 +61,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
6161
%{plugin: Elixir.Oban.Plugins.Stager}
6262
)
6363

64-
assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
64+
assert_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")}
6565
end
6666

6767
test "records span on plugin error" do
@@ -96,7 +96,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
9696

9797
assert_receive {:span,
9898
span(
99-
name: "Elixir.Oban.Plugins.Stager process",
99+
name: "oban.plugin Oban.Plugins.Stager",
100100
events: events,
101101
status: ^expected_status
102102
)}
@@ -210,7 +210,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
210210
end
211211

212212
defp receive_span_attrs(name) do
213-
name = "#{name} process"
213+
name = "oban.plugin #{inspect(name)}"
214214

215215
assert_receive(
216216
{:span, span(name: ^name, attributes: attributes)},

0 commit comments

Comments
 (0)