diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index 7a4a41a5..55e57439 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -18,9 +18,8 @@ defmodule OpentelemetryOban do alias Ecto.Changeset alias OpenTelemetry.Span - alias OpenTelemetry.SemanticConventions.Trace + alias OpenTelemetry.SemConv.Incubating.MessagingAttributes - require Trace require OpenTelemetry.Tracer @doc """ @@ -53,9 +52,9 @@ defmodule OpentelemetryOban do def insert(name \\ Oban, %Changeset{} = changeset) do attributes = attributes_before_insert(changeset) - worker = Changeset.get_field(changeset, :worker) + span_name = span_name(attributes) - OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do changeset = add_tracing_information_to_meta(changeset) case Oban.insert(name, changeset) do @@ -75,9 +74,9 @@ defmodule OpentelemetryOban do def insert!(name \\ Oban, %Changeset{} = changeset) do attributes = attributes_before_insert(changeset) - worker = Changeset.get_field(changeset, :worker) + span_name = span_name(attributes) - OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do changeset = add_tracing_information_to_meta(changeset) try do @@ -101,10 +100,10 @@ defmodule OpentelemetryOban do end def insert_all(name, changesets) when is_list(changesets) do - # changesets in insert_all can include different workers and different - # queues. This means we cannot provide much information here, but we can - # still record the insert and propagate the context information. - OpenTelemetry.Tracer.with_span :"Oban bulk insert", kind: :producer do + attributes = attributes_before_insert(changesets) + span_name = span_name(attributes) + + OpenTelemetry.Tracer.with_span span_name, kind: :producer, attributes: attributes do changesets = Enum.map(changesets, &add_tracing_information_to_meta/1) Oban.insert_all(name, changesets) end @@ -125,23 +124,76 @@ defmodule OpentelemetryOban do Changeset.change(changeset, %{meta: new_meta}) end + defp attributes_before_insert(changesets) when is_list(changesets) do + {queues, workers} = + Enum.reduce(changesets, {[], []}, fn changeset, {queues, workers} -> + queue = Changeset.get_field(changeset, :queue) + worker = Changeset.get_field(changeset, :worker) + + {Enum.uniq([queue | queues]), Enum.uniq([worker | workers])} + end) + + %{ + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_operation_name()) => :send, + unquote(MessagingAttributes.messaging_operation_type()) => + unquote(MessagingAttributes.messaging_operation_type_values().publish) + } + # If the attribute value is the same for all messages in the batch, the instrumentation SHOULD set such attribute on the span representing the batch operation. + |> then(fn attributes -> + case queues do + [queue] -> + Map.put(attributes, unquote(MessagingAttributes.messaging_consumer_group_name()), queue) + + _ -> + attributes + end + end) + |> then(fn attributes -> + case workers do + [worker] -> + Map.put(attributes, unquote(MessagingAttributes.messaging_destination_name()), worker) + + _ -> + attributes + end + end) + end + defp attributes_before_insert(changeset) do queue = Changeset.get_field(changeset, :queue) worker = Changeset.get_field(changeset, :worker) %{ - Trace.messaging_system() => :oban, - Trace.messaging_destination() => queue, - Trace.messaging_destination_kind() => :queue, + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_consumer_group_name()) => queue, + unquote(MessagingAttributes.messaging_destination_name()) => worker, + unquote(MessagingAttributes.messaging_operation_name()) => :send, + unquote(MessagingAttributes.messaging_operation_type()) => + unquote(MessagingAttributes.messaging_operation_type_values().publish), :"oban.job.worker" => worker } end defp attributes_after_insert(job) do %{ + unquote(MessagingAttributes.messaging_message_id()) => job.id, "oban.job.job_id": job.id, "oban.job.priority": job.priority, "oban.job.max_attempts": job.max_attempts } end + + # `messaging.destination.name` SHOULD be used when the destination is known to be neither temporary nor anonymous. + defp span_name(%{ + unquote(MessagingAttributes.messaging_operation_name()) => operation, + unquote(MessagingAttributes.messaging_destination_name()) => destination + }) do + "#{operation} #{destination}" + end + + # If a corresponding `{destination}` value is not available for a specific operation, the instrumentation SHOULD omit the {destination}. + defp span_name(%{unquote(MessagingAttributes.messaging_operation_name()) => operation}) do + "#{operation}" + end end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 8d14f781..d10aad1d 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -1,8 +1,6 @@ defmodule OpentelemetryOban.JobHandler do alias OpenTelemetry.Span - alias OpenTelemetry.SemanticConventions.Trace - - require Trace + alias OpenTelemetry.SemConv.Incubating.MessagingAttributes @tracer_id __MODULE__ @@ -60,10 +58,12 @@ defmodule OpentelemetryOban.JobHandler do OpenTelemetry.Tracer.set_current_span(:undefined) attributes = %{ - Trace.messaging_system() => :oban, - Trace.messaging_destination() => queue, - Trace.messaging_destination_kind() => :queue, - Trace.messaging_operation() => :process, + unquote(MessagingAttributes.messaging_system()) => :oban, + unquote(MessagingAttributes.messaging_destination_name()) => worker, + unquote(MessagingAttributes.messaging_consumer_group_name()) => queue, + unquote(MessagingAttributes.messaging_operation_name()) => :process, + unquote(MessagingAttributes.messaging_operation_type()) => :process, + unquote(MessagingAttributes.messaging_message_id()) => id, :"oban.job.job_id" => id, :"oban.job.worker" => worker, :"oban.job.priority" => priority, @@ -73,15 +73,20 @@ defmodule OpentelemetryOban.JobHandler do :"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at) } - span_name = "#{worker} process" - - OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{ + OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name(attributes), metadata, %{ kind: :consumer, links: links, attributes: attributes }) end + defp span_name(%{ + unquote(MessagingAttributes.messaging_destination_name()) => destination_name, + unquote(MessagingAttributes.messaging_operation_name()) => operation + }) do + "#{operation} #{destination_name}" + end + def handle_job_stop(_event, _measurements, metadata, _config) do OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index e77b8cb3..d1c2108b 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -38,14 +38,19 @@ defmodule OpentelemetryOban.PluginHandler do end def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do + attributes = %{"oban.plugin": plugin} + span_name = span_name(attributes) + OpentelemetryTelemetry.start_telemetry_span( @tracer_id, - "#{plugin} process", + span_name, metadata, - %{attributes: %{"oban.plugin": plugin}} + %{attributes: attributes} ) end + defp span_name(%{"oban.plugin": plugin}), do: "oban.plugin #{inspect(plugin)}" + def handle_plugin_stop(_event, _measurements, metadata, _config) do Tracer.set_attributes(end_span_plugin_attrs(metadata)) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) diff --git a/instrumentation/opentelemetry_oban/mix.exs b/instrumentation/opentelemetry_oban/mix.exs index 8d28d566..da050240 100644 --- a/instrumentation/opentelemetry_oban/mix.exs +++ b/instrumentation/opentelemetry_oban/mix.exs @@ -45,10 +45,10 @@ defmodule OpentelemetryOban.MixProject do defp deps do [ {:oban, "~> 2.0"}, - {:opentelemetry_api, "~> 1.2"}, + {:opentelemetry_api, "~> 1.4"}, {:opentelemetry_telemetry, "~> 1.1"}, - {:opentelemetry_semantic_conventions, "~> 0.2"}, - {:opentelemetry, "~> 1.0", only: [:test]}, + {:opentelemetry_semantic_conventions, "~> 1.27"}, + {:opentelemetry, "~> 1.4", only: [:test]}, {:opentelemetry_exporter, "~> 1.0", only: [:test]}, {:telemetry, "~> 0.4 or ~> 1.0"}, {:ex_doc, "~> 0.36", only: [:dev], runtime: false}, diff --git a/instrumentation/opentelemetry_oban/mix.lock b/instrumentation/opentelemetry_oban/mix.lock index f3b71378..7277d199 100644 --- a/instrumentation/opentelemetry_oban/mix.lock +++ b/instrumentation/opentelemetry_oban/mix.lock @@ -17,10 +17,10 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "oban": {:hex, :oban, "2.17.4", "3ebe79dc0cad16f23e5feea418f9bc5b07d453b8fb7caf376d812be96157a5c5", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "71a804abea3bb7e104782a5b5337cbab76c1a56b9689a6d5159a3873c93898b6"}, - "opentelemetry": {:hex, :opentelemetry, "1.3.1", "f0a342a74379e3540a634e7047967733da4bc8b873ec9026e224b2bd7369b1fc", [:rebar3], [{:opentelemetry_api, "~> 1.2.2", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "de476b2ac4faad3e3fe3d6e18b35dec9cb338c3b9910c2ce9317836dacad3483"}, - "opentelemetry_api": {:hex, :opentelemetry_api, "1.2.2", "693f47b0d8c76da2095fe858204cfd6350c27fe85d00e4b763deecc9588cf27a", [:mix, :rebar3], [{:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "dc77b9a00f137a858e60a852f14007bb66eda1ffbeb6c05d5fe6c9e678b05e9d"}, + "opentelemetry": {:hex, :opentelemetry, "1.5.0", "7dda6551edfc3050ea4b0b40c0d2570423d6372b97e9c60793263ef62c53c3c2", [:rebar3], [{:opentelemetry_api, "~> 1.4", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "cdf4f51d17b592fc592b9a75f86a6f808c23044ba7cf7b9534debbcc5c23b0ee"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"}, "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.6.0", "f4fbf69aa9f1541b253813221b82b48a9863bc1570d8ecc517bc510c0d1d3d8c", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.3", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.2", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "1802d1dca297e46f21e5832ecf843c451121e875f73f04db87355a6cb2ba1710"}, - "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "0.2.0", "b67fe459c2938fcab341cb0951c44860c62347c005ace1b50f8402576f241435", [:mix, :rebar3], [], "hexpm", "d61fa1f5639ee8668d74b527e6806e0503efc55a42db7b5f39939d84c07d6895"}, + "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.1", "4a73bfa29d7780ffe33db345465919cef875034854649c37ac789eb8e8f38b21", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ee43b14e6866123a3ee1344e3c0d3d7591f4537542c2a925fcdbf46249c9b50b"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs index 87e93aac..5a18bbb1 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -45,7 +45,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do %{plugin: Elixir.Oban.Plugins.Stager} ) - refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + refute_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")} end test "records span on plugin execution" do @@ -61,7 +61,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do %{plugin: Elixir.Oban.Plugins.Stager} ) - assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + assert_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")} end test "records span on plugin error" do @@ -96,14 +96,14 @@ defmodule OpentelemetryOban.PluginHandlerTest do assert_receive {:span, span( - name: "Elixir.Oban.Plugins.Stager process", + name: "oban.plugin Oban.Plugins.Stager", events: events, status: ^expected_status )} [ event( - name: "exception", + name: :exception, attributes: event_attributes ) ] = :otel_events.list(events) @@ -210,7 +210,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do end defp receive_span_attrs(name) do - name = "#{name} process" + name = "oban.plugin #{inspect(name)}" assert_receive( {:span, span(name: ^name, attributes: attributes)}, diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs index de3828de..7e5afa36 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -37,7 +37,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: attributes, parent_span_id: :undefined, kind: :producer, @@ -45,13 +45,16 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "oban.job.job_id": _job_id, + "messaging.system": :oban, + "messaging.destination.name": "TestJob", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :send, + "messaging.operation.type": :publish, + "messaging.message.id": job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, - "oban.job.worker": "TestJob", - "messaging.system": :oban + "oban.job.worker": "TestJob" } = :otel_attributes.map(attributes) end @@ -66,7 +69,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: _attributes, trace_id: ^root_trace_id, parent_span_id: ^root_span_id, @@ -89,7 +92,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", attributes: _attributes, trace_id: send_trace_id, span_id: send_span_id, @@ -99,7 +102,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -121,7 +124,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -138,24 +141,26 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: attributes, kind: :consumer, status: :undefined )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJob", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJob", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJob" } = :otel_attributes.map(attributes) end @@ -167,7 +172,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", attributes: attributes, kind: :consumer, events: events, @@ -175,22 +180,24 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJobThatReturnsError", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJobThatReturnsError", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJobThatReturnsError" } = :otel_attributes.map(attributes) [ event( - name: "exception", + name: :exception, attributes: event_attributes ) ] = :otel_events.list(events) @@ -209,14 +216,14 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError send", + name: "send TestJobThatReturnsError", trace_id: send_trace_id, span_id: send_span_id )} assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", status: ^expected_status, trace_id: first_process_trace_id, links: job_1_links @@ -226,7 +233,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatReturnsError process", + name: "process TestJobThatReturnsError", status: ^expected_status, trace_id: second_process_trace_id, links: job_2_links @@ -245,7 +252,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobThatThrowsException process", + name: "process TestJobThatThrowsException", attributes: attributes, kind: :consumer, events: events, @@ -253,22 +260,24 @@ defmodule OpentelemetryObanTest do )} assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, + "messaging.system": :oban, + "messaging.destination.name": "TestJobThatThrowsException", + "messaging.consumer.group.name": "events", + "messaging.operation.name": :process, + "messaging.operation.type": :process, + "messaging.message.id": job_id, "oban.job.attempt": 1, "oban.job.inserted_at": _inserted_at, - "oban.job.job_id": _job_id, + "oban.job.job_id": job_id, "oban.job.max_attempts": 1, "oban.job.priority": 0, "oban.job.scheduled_at": _scheduled_at, - "oban.job.worker": "TestJobThatThrowsException", - "messaging.operation": :process, - "messaging.system": :oban + "oban.job.worker": "TestJobThatThrowsException" } = :otel_attributes.map(attributes) [ event( - name: "exception", + name: :exception, attributes: event_attributes ) ] = :otel_events.list(events) @@ -283,7 +292,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJobWithInnerSpan process", + name: "process TestJobWithInnerSpan", kind: :consumer, trace_id: trace_id, span_id: process_span_id @@ -301,8 +310,8 @@ defmodule OpentelemetryObanTest do test "OpentelemetryOban.insert!/2 returns job on successful insert" do %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, span(name: "TestJob send")} - assert_receive {:span, span(name: "TestJob process")} + assert_receive {:span, span(name: "send TestJob")} + assert_receive {:span, span(name: "process TestJob")} end test "OpentelemetryOban.insert!/2 raises an error on failed insert" do @@ -317,14 +326,14 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob send", + name: "send TestJob", events: events, status: ^expected_status )} [ event( - name: "exception", + name: :exception, attributes: event_attributes ) ] = :otel_events.list(events) @@ -332,30 +341,37 @@ defmodule OpentelemetryObanTest do assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - refute_received {:span, span(name: "TestJob process")} + refute_received {:span, span(name: "process TestJob")} end test "tracing information is propagated when using insert_all/2" do OpentelemetryOban.insert_all([ TestJob.new(%{}), - TestJob.new(%{}) + TestJobWithInnerSpan.new(%{}) ]) assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) assert_receive {:span, span( - name: :"Oban bulk insert", - attributes: _attributes, + name: "send", + attributes: attributes, trace_id: send_trace_id, span_id: send_span_id, kind: :producer, status: :undefined )} + assert %{ + "messaging.system": :oban, + "messaging.operation.name": :send, + "messaging.operation.type": :publish, + "messaging.consumer.group.name": "events" + } = :otel_attributes.map(attributes) + assert_receive {:span, span( - name: "TestJob process", + name: "process TestJob", attributes: _attributes, kind: :consumer, status: :undefined, @@ -367,7 +383,7 @@ defmodule OpentelemetryObanTest do assert_receive {:span, span( - name: "TestJob process", + name: "process TestJobWithInnerSpan", attributes: _attributes, kind: :consumer, status: :undefined, @@ -387,6 +403,6 @@ defmodule OpentelemetryObanTest do test "works with Oban.Testing.perform_job helper function" do Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) - assert_receive {:span, span(name: "TestJob process")} + assert_receive {:span, span(name: "process TestJob")} end end