diff --git a/instrumentation/opentelemetry_broadway/README.md b/instrumentation/opentelemetry_broadway/README.md index 73c1783a..bf2bc119 100644 --- a/instrumentation/opentelemetry_broadway/README.md +++ b/instrumentation/opentelemetry_broadway/README.md @@ -4,11 +4,13 @@ [![Hex.pm](https://img.shields.io/hexpm/v/opentelemetry_cowboy)](https://hex.pm/packages/opentelemetry_cowboy) ![Build Status](https://github.com/open-telemetry/opentelemetry-erlang-contrib/workflows/Erlang/badge.svg) -OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines. +OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with support for distributed tracing. ## Usage -After installing, set up the handler in your application's `Application.start/2` callback, before your top-level supervisor starts: +### Basic Setup + +For basic Broadway instrumentation, set up the handler in your application's `Application.start/2` callback: ```elixir def start(_type, _args) do @@ -18,6 +20,19 @@ def start(_type, _args) do end ``` +### With Trace Propagation + +For Broadway pipelines that need distributed tracing with linked spans across services (extracts context from message headers and creates trace links): + +```elixir +def start(_type, _args) do + # Enable trace propagation from message headers + OpentelemetryBroadway.setup(propagation: true) + + Supervisor.start_link(...) +end +``` + ## Installation This library is available on Hex: diff --git a/instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex b/instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex index 7aefad1c..f8313232 100644 --- a/instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex +++ b/instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex @@ -1,11 +1,13 @@ defmodule OpentelemetryBroadway do @moduledoc """ - OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines. + OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with optional trace propagation support. - It supports job start, stop, and exception events. + It supports job start, stop, and exception events with automatic distributed tracing context extraction. ## Usage + ### Basic Setup + In your application's `c:Application.start/2` callback: def start(_type, _args) do @@ -14,8 +16,58 @@ defmodule OpentelemetryBroadway do # ... end + ### With Trace Propagation + + For Broadway pipelines that need distributed tracing context extraction from message headers/attributes: + + def start(_type, _args) do + :ok = OpentelemetryBroadway.setup(propagation: true) + + # ... + end + + **Important**: When using trace propagation, your producer must be configured to extract headers/attributes. + + #### RabbitMQ + + For RabbitMQ, configure your `BroadwayRabbitMQ.Producer` with `metadata: [:headers]`: + + Broadway.start_link(MyBroadway, + name: MyBroadway, + producer: [ + module: {BroadwayRabbitMQ.Producer, + metadata: [:headers], # Required for trace propagation! + } + ] + ) + + #### Amazon SQS + + For Amazon SQS, configure your `BroadwaySQS.Producer` to extract trace context from standard attributes or custom message attributes: + + # For standard AWS X-Ray trace headers + Broadway.start_link(MyBroadway, + name: MyBroadway, + producer: [ + module: {BroadwaySQS.Producer, + attribute_names: [:aws_trace_header] # Standard attribute + } + ] + ) + + # For custom trace headers in message attributes + Broadway.start_link(MyBroadway, + name: MyBroadway, + producer: [ + module: {BroadwaySQS.Producer, + message_attribute_names: ["traceparent", "tracestate"] # Custom attributes + } + ] + ) """ + alias OpenTelemetry.Ctx + alias OpenTelemetry.Tracer alias OpenTelemetry.SemanticConventions alias OpenTelemetry.Span alias OpenTelemetry.SemanticConventions.Trace @@ -24,34 +76,53 @@ defmodule OpentelemetryBroadway do @tracer_id __MODULE__ + @type setup_opts :: [propagation: boolean()] + @doc """ Attaches the Telemetry handlers, returning `:ok` if successful. + + ## Options + + - `propagation` - Enable trace propagation from message headers + + ## Examples + + # Basic setup + OpentelemetryBroadway.setup() + + # With trace propagation + OpentelemetryBroadway.setup(propagation: true) + """ - @spec setup :: :ok - def setup do - :ok = - :telemetry.attach( - "#{__MODULE__}.message_start", - [:broadway, :processor, :message, :start], - &__MODULE__.handle_message_start/4, - [] - ) + @spec setup(setup_opts()) :: :ok + def setup(opts \\ []) - :ok = - :telemetry.attach( - "#{__MODULE__}.message_stop", - [:broadway, :processor, :message, :stop], - &__MODULE__.handle_message_stop/4, - [] - ) + def setup(opts) do + opts = + opts + |> Enum.into(%{}) + |> Map.put_new(:propagation, true) - :ok = - :telemetry.attach( - "#{__MODULE__}.job_exception", - [:broadway, :processor, :message, :exception], - &__MODULE__.handle_message_exception/4, - [] - ) + :telemetry.attach( + "#{__MODULE__}.message_start", + [:broadway, :processor, :message, :start], + &__MODULE__.handle_message_start/4, + opts + ) + + :telemetry.attach( + "#{__MODULE__}.message_stop", + [:broadway, :processor, :message, :stop], + &__MODULE__.handle_message_stop/4, + opts + ) + + :telemetry.attach( + "#{__MODULE__}.message_exception", + [:broadway, :processor, :message, :exception], + &__MODULE__.handle_message_exception/4, + opts + ) :ok end @@ -66,11 +137,13 @@ defmodule OpentelemetryBroadway do name: name, message: %Broadway.Message{} = message } = metadata, - _config + config ) do span_name = "#{inspect(topology_name)}/#{Atom.to_string(processor_key)} process" client_id = inspect(name) + links = get_propagated_ctx(message, config) + attributes = %{ SemanticConventions.Trace.messaging_system() => :broadway, SemanticConventions.Trace.messaging_operation() => :process, @@ -90,6 +163,7 @@ defmodule OpentelemetryBroadway do OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{ kind: :consumer, + links: links, attributes: attributes }) end @@ -139,4 +213,53 @@ defmodule OpentelemetryBroadway do defp format_error(err) when is_binary(err), do: err defp format_error(err), do: inspect(err) + + defp get_propagated_ctx(message, %{propagation: true} = _config) do + message + |> get_message_headers() + |> Enum.map(&normalize_header/1) + |> Enum.reject(&is_nil/1) + |> extract_to_ctx() + end + + defp get_propagated_ctx(_message, _config), do: [] + + defp extract_to_ctx([]) do + [] + end + + defp extract_to_ctx(headers) do + # Extract context into separate context to avoid polluting current context + parent_ctx = + :otel_propagator_text_map.extract_to(Ctx.new(), headers) + |> Tracer.current_span_ctx() + + # Create links to parent if it exists + case parent_ctx do + :undefined -> [] + _ -> [OpenTelemetry.link(parent_ctx)] + end + end + + # RabbitMQ: headers are in metadata.headers as a list + defp get_message_headers(%Broadway.Message{metadata: %{headers: headers}}) when is_list(headers), do: headers + + # SQS: both standard attributes and custom message attributes can contain trace context + defp get_message_headers(%Broadway.Message{metadata: metadata}) do + attributes = Map.get(metadata, :attributes, %{}) + message_attributes = Map.get(metadata, :message_attributes, %{}) + + # Combine both attribute types and convert to list + attributes + |> Map.merge(message_attributes) + |> Enum.to_list() + end + + defp get_message_headers(_message), do: [] + + # RabbitMQ format: {key, type, value} + defp normalize_header({key, _type, value}) when is_binary(key) and is_binary(value), do: {key, value} + # Standard format: {key, value} + defp normalize_header({key, value}) when is_binary(key) and is_binary(value), do: {key, value} + defp normalize_header(_value), do: nil end diff --git a/instrumentation/opentelemetry_broadway/test/opentelemetry_broadway_test.exs b/instrumentation/opentelemetry_broadway/test/opentelemetry_broadway_test.exs index 8c6b48be..9f1caacb 100644 --- a/instrumentation/opentelemetry_broadway/test/opentelemetry_broadway_test.exs +++ b/instrumentation/opentelemetry_broadway/test/opentelemetry_broadway_test.exs @@ -30,10 +30,10 @@ defmodule OpentelemetryBroadwayTest do :ok end - test "records span on succesful message" do + # Basic Broadway instrumentation tests (without propagation) + test "records span on successful message" do ref = Broadway.test_message(TestBroadway, "success") - #  Confirm the message was processed assert_receive {:ack, ^ref, [%{data: "success"}], []} expected_status = OpenTelemetry.status(:ok) @@ -57,7 +57,6 @@ defmodule OpentelemetryBroadwayTest do test "records span on message which fails" do ref = Broadway.test_message(TestBroadway, "error") - #  Confirm the message was processed assert_receive {:ack, ^ref, [], [%{data: "error"}]} expected_status = OpenTelemetry.status(:error, "something went wrong") @@ -74,7 +73,6 @@ defmodule OpentelemetryBroadwayTest do test "records span on an exception being thrown" do ref = Broadway.test_message(TestBroadway, "exception") - #  Confirm the message was processed assert_receive {:ack, ^ref, [], [%{data: "exception"}]} expected_status = OpenTelemetry.status(:error, "** (RuntimeError) an exception occurred") @@ -87,4 +85,243 @@ defmodule OpentelemetryBroadwayTest do status: ^expected_status )} end + + test "extracts trace context from RabbitMQ headers when propagation enabled" do + TestHelpers.remove_handlers() + :ok = OpentelemetryBroadway.setup(propagation: true) + + _parent_span_ctx = + OpenTelemetry.Tracer.start_span("upstream-service") + |> OpenTelemetry.Tracer.set_current_span() + + trace_ctx = OpenTelemetry.Tracer.current_span_ctx() + trace_id = elem(trace_ctx, 1) + span_id = elem(trace_ctx, 2) + + OpenTelemetry.Tracer.end_span() + OpenTelemetry.Ctx.clear() + + assert_receive {:span, span(name: "upstream-service")} + + message = %Broadway.Message{ + data: "test message", + metadata: %{ + headers: [ + {"traceparent", :longstr, + "00-#{:io_lib.format("~32.16.0b", [trace_id])}-#{:io_lib.format("~16.16.0b", [span_id])}-01"}, + {"x-custom-header", :longstr, "custom-value"} + ], + routing_key: "test.queue", + exchange: "test.exchange", + delivery_tag: 123, + correlation_id: "correlation-123" + }, + acknowledger: {Broadway.NoopAcknowledger, nil, nil} + } + + start_metadata = %{ + processor_key: :default, + topology_name: :test_topology, + name: :"test_topology.Broadway.Consumer_0", + message: message + } + + :telemetry.execute( + [:broadway, :processor, :message, :start], + %{}, + start_metadata + ) + + completed_message = %{message | status: :ok} + stop_metadata = %{message: completed_message} + + :telemetry.execute( + [:broadway, :processor, :message, :stop], + %{}, + stop_metadata + ) + + assert_receive {:span, span(name: span_name, attributes: attributes, links: links)} + + assert span_name == ":test_topology/default process" + + attrs_map = :otel_attributes.map(attributes) + assert attrs_map[:"messaging.system"] == :broadway + assert attrs_map[:"messaging.operation"] == :process + + links_list = elem(links, 5) + assert length(links_list) == 1 + [link] = links_list + assert elem(link, 1) == trace_id + assert elem(link, 2) == span_id + end + + test "creates proper trace relationship when propagation enabled" do + TestHelpers.remove_handlers() + :ok = OpentelemetryBroadway.setup(propagation: true) + + # Create parent trace + parent_span = OpenTelemetry.Tracer.start_span("parent-service") + OpenTelemetry.Tracer.set_current_span(parent_span) + parent_ctx = OpenTelemetry.Tracer.current_span_ctx() + parent_trace_id = elem(parent_ctx, 1) + parent_span_id = elem(parent_ctx, 2) + + # Create traceparent header from parent context + traceparent = + "00-#{:io_lib.format("~32.16.0b", [parent_trace_id])}-#{:io_lib.format("~16.16.0b", [parent_span_id])}-01" + + OpenTelemetry.Tracer.end_span() + OpenTelemetry.Ctx.clear() + + # Consume parent span + assert_receive {:span, span(name: "parent-service", trace_id: ^parent_trace_id, span_id: ^parent_span_id)} + + # Create message with propagated context + message = %Broadway.Message{ + data: "test", + metadata: %{headers: [{"traceparent", :longstr, traceparent}]}, + acknowledger: {Broadway.NoopAcknowledger, nil, nil} + } + + # Process the message + :telemetry.execute( + [:broadway, :processor, :message, :start], + %{}, + %{ + processor_key: :default, + topology_name: :test_topology, + name: :"test_topology.Broadway.Consumer_0", + message: message + } + ) + + :telemetry.execute( + [:broadway, :processor, :message, :stop], + %{}, + %{message: %{message | status: :ok}} + ) + + # Verify the Broadway span + assert_receive {:span, + span( + name: ":test_topology/default process", + trace_id: broadway_trace_id, + span_id: broadway_span_id, + links: links + )} + + # The Broadway span should have its own trace (not continue the parent trace) + assert broadway_trace_id != parent_trace_id + assert broadway_span_id != parent_span_id + + # But it should have a link to the parent + links_list = elem(links, 5) + assert length(links_list) == 1 + [link] = links_list + # Link points to parent trace + assert elem(link, 1) == parent_trace_id + # Link points to parent span + assert elem(link, 2) == parent_span_id + end + + test "handles message without headers gracefully with propagation enabled" do + TestHelpers.remove_handlers() + :ok = OpentelemetryBroadway.setup(propagation: true) + + message = %Broadway.Message{ + data: "test message", + metadata: %{ + routing_key: "test.queue" + }, + acknowledger: {Broadway.NoopAcknowledger, nil, nil} + } + + start_metadata = %{ + processor_key: :default, + topology_name: :test_topology, + name: :"test_topology.Broadway.Consumer_0", + message: message + } + + :telemetry.execute( + [:broadway, :processor, :message, :start], + %{}, + start_metadata + ) + + completed_message = %{message | status: :ok} + stop_metadata = %{message: completed_message} + + :telemetry.execute( + [:broadway, :processor, :message, :stop], + %{}, + stop_metadata + ) + + assert_receive {:span, span(name: span_name, attributes: attributes, links: links)} + + assert span_name == ":test_topology/default process" + + attrs_map = :otel_attributes.map(attributes) + assert attrs_map[:"messaging.system"] == :broadway + + links_list = elem(links, 5) + assert length(links_list) == 0 + end + + test "handles malformed headers gracefully with propagation enabled" do + TestHelpers.remove_handlers() + :ok = OpentelemetryBroadway.setup(propagation: true) + + message = %Broadway.Message{ + data: "test message", + metadata: %{ + headers: [ + {"traceparent", :longstr, "invalid-traceparent-format"}, + # Invalid key type + {123, :longstr, "non-binary-key"}, + # Invalid value type + {"valid-key", :longstr, 456}, + # Invalid header format + nil + ] + }, + acknowledger: {Broadway.NoopAcknowledger, nil, nil} + } + + start_metadata = %{ + processor_key: :default, + topology_name: :test_topology, + name: :"test_topology.Broadway.Consumer_0", + message: message + } + + :telemetry.execute( + [:broadway, :processor, :message, :start], + %{}, + start_metadata + ) + + completed_message = %{message | status: :ok} + stop_metadata = %{message: completed_message} + + :telemetry.execute( + [:broadway, :processor, :message, :stop], + %{}, + stop_metadata + ) + + # Should create a span without links since headers were malformed + assert_receive {:span, span(name: span_name, attributes: attributes, links: links)} + + assert span_name == ":test_topology/default process" + + attrs_map = :otel_attributes.map(attributes) + assert attrs_map[:"messaging.system"] == :broadway + + # No valid trace context should result in no links + links_list = elem(links, 5) + assert length(links_list) == 0 + end end