From c9f7bca0e3f2d91c19524e9d782db8895f7a4d04 Mon Sep 17 00:00:00 2001 From: Dan Schultzer <1254724+danschultzer@users.noreply.github.com> Date: Sat, 14 Dec 2024 23:23:19 -0800 Subject: [PATCH] Conform attributes to semantic conventions 1.27 --- .../opentelemetry_oban/CHANGELOG.md | 11 ++ .../lib/opentelemetry_oban.ex | 78 ++++++++++--- .../lib/opentelemetry_oban/job_handler.ex | 25 +++-- .../lib/opentelemetry_oban/plugin_handler.ex | 9 +- .../plugin_handler_test.exs | 8 +- .../test/opentelemetry_oban_test.exs | 106 ++++++++++-------- 6 files changed, 163 insertions(+), 74 deletions(-) diff --git a/instrumentation/opentelemetry_oban/CHANGELOG.md b/instrumentation/opentelemetry_oban/CHANGELOG.md index 259f6b9..ac4956e 100644 --- a/instrumentation/opentelemetry_oban/CHANGELOG.md +++ b/instrumentation/opentelemetry_oban/CHANGELOG.md @@ -7,3 +7,14 @@ Forked from [`:opentelemetry_oban` `1.1.1`](https://github.com/open-telemetry/op ### Changed - Updated to support `:opentelemetry_semantic_conventions` `1.27` +- Conforms to `1.27` semantic conventions: + - Changed `messaging.destination` to `messaging.consumer.group.name` + - Removed `messaging.destination.kind` + - Changed span name `{destination} {operation}` to `{operation} {destination}` + - Changed span name `{plugin} process` to `oban.plugin {plugin}` + - Changed span name `Oban bulk insert` to `{operation} {destination}` when all job types are the same + - Changed span name `Oban bulk insert` to `{operation}` when job types are the different + - Added `messaging.destination.name` + - Added `messaging.operation.name` + - Added `messaging.operation.type` + - Added `messaging.message.id` diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index 79451de..f58ce50 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -18,9 +18,8 @@ defmodule OpentelemetryOban do alias Ecto.Changeset alias OpenTelemetry.Span - alias OpenTelemetry.SemanticConventions.Trace + alias OpenTelemetry.SemConv.Incubating.MessagingAttributes - require Trace require OpenTelemetry.Tracer @doc """ @@ -53,9 +52,9 @@ defmodule OpentelemetryOban do def insert(name \\ Oban, %Changeset{} = changeset) do attributes = attributes_before_insert(changeset) - worker = Changeset.get_field(changeset, :worker) + span_name = span_name(attributes) - OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do changeset = add_tracing_information_to_meta(changeset) case Oban.insert(name, changeset) do @@ -75,9 +74,9 @@ defmodule OpentelemetryOban do def insert!(name \\ Oban, %Changeset{} = changeset) do attributes = attributes_before_insert(changeset) - worker = Changeset.get_field(changeset, :worker) + span_name = span_name(attributes) - OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do changeset = add_tracing_information_to_meta(changeset) try do @@ -101,10 +100,10 @@ defmodule OpentelemetryOban do end def insert_all(name, changesets) when is_list(changesets) do - # changesets in insert_all can include different workers and different - # queues. This means we cannot provide much information here, but we can - # still record the insert and propagate the context information. - OpenTelemetry.Tracer.with_span :"Oban bulk insert", kind: :producer do + attributes = attributes_before_insert(changesets) + span_name = span_name(attributes) + + OpenTelemetry.Tracer.with_span span_name, kind: :producer, attributes: attributes do changesets = Enum.map(changesets, &add_tracing_information_to_meta/1) Oban.insert_all(name, changesets) end @@ -125,23 +124,76 @@ defmodule OpentelemetryOban do Changeset.change(changeset, %{meta: new_meta}) end + defp attributes_before_insert(changesets) when is_list(changesets) do + {queues, workers} = + Enum.reduce(changesets, {[], []}, fn changeset, {queues, workers} -> + queue = Changeset.get_field(changeset, :queue) + worker = Changeset.get_field(changeset, :worker) + + {Enum.uniq([queue | queues]), Enum.uniq([worker | workers])} + end) + + %{ + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_operation_name()) => :send, + unquote(MessagingAttributes.messaging_operation_type()) => + unquote(MessagingAttributes.messaging_operation_type_values().publish) + } + # If the attribute value is the same for all messages in the batch, the instrumentation SHOULD set such attribute on the span representing the batch operation. + |> then(fn attributes -> + case queues do + [queue] -> + Map.put(attributes, unquote(MessagingAttributes.messaging_consumer_group_name()), queue) + + _ -> + attributes + end + end) + |> then(fn attributes -> + case workers do + [worker] -> + Map.put(attributes, unquote(MessagingAttributes.messaging_destination_name()), worker) + + _ -> + attributes + end + end) + end + defp attributes_before_insert(changeset) do queue = Changeset.get_field(changeset, :queue) worker = Changeset.get_field(changeset, :worker) %{ - :"messaging.system" => :oban, - :"messaging.destination" => queue, - :"messaging.destination_kind" => :queue, + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_consumer_group_name()) => queue, + unquote(MessagingAttributes.messaging_destination_name()) => worker, + unquote(MessagingAttributes.messaging_operation_name()) => :send, + unquote(MessagingAttributes.messaging_operation_type()) => + unquote(MessagingAttributes.messaging_operation_type_values().publish), :"oban.job.worker" => worker } end defp attributes_after_insert(job) do %{ + unquote(MessagingAttributes.messaging_message_id()) => job.id, "oban.job.job_id": job.id, "oban.job.priority": job.priority, "oban.job.max_attempts": job.max_attempts } end + + # `messaging.destination.name` SHOULD be used when the destination is known to be neither temporary nor anonymous. + defp span_name(%{ + unquote(MessagingAttributes.messaging_operation_name()) => operation, + unquote(MessagingAttributes.messaging_destination_name()) => destination + }) do + "#{operation} #{destination}" + end + + # If a corresponding `{destination}` value is not available for a specific operation, the instrumentation SHOULD omit the {destination}. + defp span_name(%{unquote(MessagingAttributes.messaging_operation_name()) => operation}) do + "#{operation}" + end end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 5394ebf..d10aad1 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -1,8 +1,6 @@ defmodule OpentelemetryOban.JobHandler do alias OpenTelemetry.Span - alias OpenTelemetry.SemanticConventions.Trace - - require Trace + alias OpenTelemetry.SemConv.Incubating.MessagingAttributes @tracer_id __MODULE__ @@ -60,10 +58,12 @@ defmodule OpentelemetryOban.JobHandler do OpenTelemetry.Tracer.set_current_span(:undefined) attributes = %{ - :"messaging.system" => :oban, - :"messaging.destination" => queue, - :"messaging.destination_kind" => :queue, - :"messaging.operation" => :process, + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_destination_name()) => worker, + unquote(MessagingAttributes.messaging_consumer_group_name()) => queue, + unquote(MessagingAttributes.messaging_operation_name()) => :process, + unquote(MessagingAttributes.messaging_operation_type()) => :process, + unquote(MessagingAttributes.messaging_message_id()) => id, :"oban.job.job_id" => id, :"oban.job.worker" => worker, :"oban.job.priority" => priority, @@ -73,15 +73,20 @@ defmodule OpentelemetryOban.JobHandler do :"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at) } - span_name = "#{worker} process" - - OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{ + OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name(attributes), metadata, %{ kind: :consumer, links: links, attributes: attributes }) end + defp span_name(%{ + unquote(MessagingAttributes.messaging_destination_name()) => destination_name, + unquote(MessagingAttributes.messaging_operation_name()) => operation + }) do + "#{operation} #{destination_name}" + end + def handle_job_stop(_event, _measurements, metadata, _config) do OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index e77b8cb..d1c2108 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -38,14 +38,19 @@ defmodule OpentelemetryOban.PluginHandler do end def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do + attributes = %{"oban.plugin": plugin} + span_name = span_name(attributes) + OpentelemetryTelemetry.start_telemetry_span( @tracer_id, - "#{plugin} process", + span_name, metadata, - %{attributes: %{"oban.plugin": plugin}} + %{attributes: attributes} ) end + defp span_name(%{"oban.plugin": plugin}), do: "oban.plugin #{inspect(plugin)}" + def handle_plugin_stop(_event, _measurements, metadata, _config) do Tracer.set_attributes(end_span_plugin_attrs(metadata)) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs index aceab17..5a18bbb 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -45,7 +45,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do %{plugin: Elixir.Oban.Plugins.Stager} ) - refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + refute_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")} end test "records span on plugin execution" do @@ -61,7 +61,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do %{plugin: Elixir.Oban.Plugins.Stager} ) - assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + assert_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")} end test "records span on plugin error" do @@ -96,7 +96,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do assert_receive {:span, span( - name: "Elixir.Oban.Plugins.Stager process", + name: "oban.plugin Oban.Plugins.Stager", events: events, status: ^expected_status )} @@ -210,7 +210,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do end defp receive_span_attrs(name) do - name = "#{name} process" + name = "oban.plugin #{inspect(name)}" assert_receive( {:span, span(name: ^name, attributes: attributes)}, diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs index e12603c..7e5afa3 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -37,7 +37,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: attributes, parent_span_id: :undefined, kind: :producer, @@ -45,13 +45,16 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "oban.job.job_id": _job_id, + "messaging.system": :oban, + "messaging.destination.name": "TestJob", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :send, + "messaging.operation.type": :publish, + "messaging.message.id": job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, - "oban.job.worker": "TestJob", - "messaging.system": :oban + "oban.job.worker": "TestJob" } = :otel_attributes.map(attributes) end @@ -66,7 +69,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: _attributes, trace_id: ^root_trace_id, parent_span_id: ^root_span_id, @@ -89,7 +92,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: _attributes, trace_id: send_trace_id, span_id: send_span_id, @@ -99,7 +102,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -121,7 +124,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -138,24 +141,26 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: attributes, kind: :consumer, status: :undefined )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJob", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJob", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJob" } = :otel_attributes.map(attributes) end @@ -167,7 +172,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", attributes: attributes, kind: :consumer, events: events, @@ -175,17 +180,19 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJobThatReturnsError", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJobThatReturnsError", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJobThatReturnsError" } = :otel_attributes.map(attributes) [ @@ -209,14 +216,14 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError send", + name: "send TestJobThatReturnsError", trace_id: send_trace_id, span_id: send_span_id )} assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", status: ^expected_status, trace_id: first_process_trace_id, links: job_1_links @@ -226,7 +233,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", status: ^expected_status, trace_id: second_process_trace_id, links: job_2_links @@ -245,7 +252,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatThrowsException process", + name: "process TestJobThatThrowsException", attributes: attributes, kind: :consumer, events: events, @@ -253,17 +260,19 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJobThatThrowsException", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJobThatThrowsException", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJobThatThrowsException" } = :otel_attributes.map(attributes) [ @@ -283,7 +292,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobWithInnerSpan process", + name: "process TestJobWithInnerSpan", kind: :consumer, trace_id: trace_id, span_id: process_span_id @@ -301,8 +310,8 @@ defmodule OpentelemetryObanTest do test "OpentelemetryOban.insert!/2 returns job on successful insert" do %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, span(name: "TestJob send")} - assert_receive {:span, span(name: "TestJob process")} + assert_receive {:span, span(name: "send TestJob")} + assert_receive {:span, span(name: "process TestJob")} end test "OpentelemetryOban.insert!/2 raises an error on failed insert" do @@ -317,7 +326,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", events: events, status: ^expected_status )} @@ -332,30 +341,37 @@ defmodule OpentelemetryObanTest do assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - refute_received {:span, span(name: "TestJob process")} + refute_received {:span, span(name: "process TestJob")} end test "tracing information is propagated when using insert_all/2" do OpentelemetryOban.insert_all([ TestJob.new(%{}), - TestJob.new(%{}) + TestJobWithInnerSpan.new(%{}) ]) assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) assert_receive {:span, span( - name: :"Oban bulk insert", - attributes: _attributes, + name: "send", + attributes: attributes, trace_id: send_trace_id, span_id: send_span_id, kind: :producer, status: :undefined )} + assert %{ + "messaging.system": :oban, + "messaging.operation.name": :send, + "messaging.operation.type": :publish, + "messaging.consumer.group.name": "events" + } = :otel_attributes.map(attributes) + assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -367,7 +383,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJobWithInnerSpan", attributes: _attributes, kind: :consumer, status: :undefined, @@ -387,6 +403,6 @@ defmodule OpentelemetryObanTest do test "works with Oban.Testing.perform_job helper function" do Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) - assert_receive {:span, span(name: "TestJob process")} + assert_receive {:span, span(name: "process TestJob")} end end