@@ -18,9 +18,8 @@ defmodule OpentelemetryOban do
1818
1919 alias Ecto.Changeset
2020 alias OpenTelemetry.Span
21- alias OpenTelemetry.SemanticConventions.Trace
21+ alias OpenTelemetry.SemConv.Incubating.MessagingAttributes
2222
23- require Trace
2423 require OpenTelemetry.Tracer
2524
2625 @ doc """
@@ -53,9 +52,9 @@ defmodule OpentelemetryOban do
5352
5453 def insert ( name \\ Oban , % Changeset { } = changeset ) do
5554 attributes = attributes_before_insert ( changeset )
56- worker = Changeset . get_field ( changeset , :worker )
55+ span_name = span_name ( attributes )
5756
58- OpenTelemetry.Tracer . with_span " #{ worker } send" , attributes: attributes , kind: :producer do
57+ OpenTelemetry.Tracer . with_span span_name , attributes: attributes , kind: :producer do
5958 changeset = add_tracing_information_to_meta ( changeset )
6059
6160 case Oban . insert ( name , changeset ) do
@@ -75,9 +74,9 @@ defmodule OpentelemetryOban do
7574
7675 def insert! ( name \\ Oban , % Changeset { } = changeset ) do
7776 attributes = attributes_before_insert ( changeset )
78- worker = Changeset . get_field ( changeset , :worker )
77+ span_name = span_name ( attributes )
7978
80- OpenTelemetry.Tracer . with_span " #{ worker } send" , attributes: attributes , kind: :producer do
79+ OpenTelemetry.Tracer . with_span span_name , attributes: attributes , kind: :producer do
8180 changeset = add_tracing_information_to_meta ( changeset )
8281
8382 try do
@@ -101,10 +100,10 @@ defmodule OpentelemetryOban do
101100 end
102101
103102 def insert_all ( name , changesets ) when is_list ( changesets ) do
104- # changesets in insert_all can include different workers and different
105- # queues. This means we cannot provide much information here, but we can
106- # still record the insert and propagate the context information.
107- OpenTelemetry.Tracer . with_span :"Oban bulk insert" , kind: :producer do
103+ attributes = attributes_before_insert ( changesets )
104+ span_name = span_name ( attributes )
105+
106+ OpenTelemetry.Tracer . with_span span_name , kind: :producer , attributes: attributes do
108107 changesets = Enum . map ( changesets , & add_tracing_information_to_meta / 1 )
109108 Oban . insert_all ( name , changesets )
110109 end
@@ -125,23 +124,76 @@ defmodule OpentelemetryOban do
125124 Changeset . change ( changeset , % { meta: new_meta } )
126125 end
127126
127+ defp attributes_before_insert ( changesets ) when is_list ( changesets ) do
128+ { queues , workers } =
129+ Enum . reduce ( changesets , { [ ] , [ ] } , fn changeset , { queues , workers } ->
130+ queue = Changeset . get_field ( changeset , :queue )
131+ worker = Changeset . get_field ( changeset , :worker )
132+
133+ { Enum . uniq ( [ queue | queues ] ) , Enum . uniq ( [ worker | workers ] ) }
134+ end )
135+
136+ % {
137+ unquote ( MessagingAttributes . messaging_system ( ) ) => :oban ,
138+ unquote ( MessagingAttributes . messaging_operation_name ( ) ) => :send ,
139+ unquote ( MessagingAttributes . messaging_operation_type ( ) ) =>
140+ unquote ( MessagingAttributes . messaging_operation_type_values ( ) . publish )
141+ }
142+ # If the attribute value is the same for all messages in the batch, the instrumentation SHOULD set such attribute on the span representing the batch operation.
143+ |> then ( fn attributes ->
144+ case queues do
145+ [ queue ] ->
146+ Map . put ( attributes , unquote ( MessagingAttributes . messaging_consumer_group_name ( ) ) , queue )
147+
148+ _ ->
149+ attributes
150+ end
151+ end )
152+ |> then ( fn attributes ->
153+ case workers do
154+ [ worker ] ->
155+ Map . put ( attributes , unquote ( MessagingAttributes . messaging_destination_name ( ) ) , worker )
156+
157+ _ ->
158+ attributes
159+ end
160+ end )
161+ end
162+
128163 defp attributes_before_insert ( changeset ) do
129164 queue = Changeset . get_field ( changeset , :queue )
130165 worker = Changeset . get_field ( changeset , :worker )
131166
132167 % {
133- Trace . messaging_system ( ) => :oban ,
134- Trace . messaging_destination ( ) => queue ,
135- Trace . messaging_destination_kind ( ) => :queue ,
168+ unquote ( MessagingAttributes . messaging_system ( ) ) => :oban ,
169+ unquote ( MessagingAttributes . messaging_consumer_group_name ( ) ) => queue ,
170+ unquote ( MessagingAttributes . messaging_destination_name ( ) ) => worker ,
171+ unquote ( MessagingAttributes . messaging_operation_name ( ) ) => :send ,
172+ unquote ( MessagingAttributes . messaging_operation_type ( ) ) =>
173+ unquote ( MessagingAttributes . messaging_operation_type_values ( ) . publish ) ,
136174 :"oban.job.worker" => worker
137175 }
138176 end
139177
140178 defp attributes_after_insert ( job ) do
141179 % {
180+ unquote ( MessagingAttributes . messaging_message_id ( ) ) => job . id ,
142181 "oban.job.job_id": job . id ,
143182 "oban.job.priority": job . priority ,
144183 "oban.job.max_attempts": job . max_attempts
145184 }
146185 end
186+
187+ # `messaging.destination.name` SHOULD be used when the destination is known to be neither temporary nor anonymous.
188+ defp span_name ( % {
189+ unquote ( MessagingAttributes . messaging_operation_name ( ) ) => operation ,
190+ unquote ( MessagingAttributes . messaging_destination_name ( ) ) => destination
191+ } ) do
192+ "#{ operation } #{ destination } "
193+ end
194+
195+ # If a corresponding `{destination}` value is not available for a specific operation, the instrumentation SHOULD omit the {destination}.
196+ defp span_name ( % { unquote ( MessagingAttributes . messaging_operation_name ( ) ) => operation } ) do
197+ "#{ operation } "
198+ end
147199end
0 commit comments