Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions instrumentation/opentelemetry_broadway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
[![Hex.pm](https://img.shields.io/hexpm/v/opentelemetry_cowboy)](https://hex.pm/packages/opentelemetry_cowboy)
![Build Status](https://github.com/open-telemetry/opentelemetry-erlang-contrib/workflows/Erlang/badge.svg)

OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines.
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with support for distributed tracing.

## Usage

After installing, set up the handler in your application's `Application.start/2` callback, before your top-level supervisor starts:
### Basic Setup

For basic Broadway instrumentation, set up the handler in your application's `Application.start/2` callback:

```elixir
def start(_type, _args) do
Expand All @@ -18,6 +20,19 @@ def start(_type, _args) do
end
```

### With Trace Propagation

For Broadway pipelines that need distributed tracing with linked spans across services (extracts context from message headers and creates trace links):

```elixir
def start(_type, _args) do
# Enable trace propagation from message headers
OpentelemetryBroadway.setup(propagation: true)

Supervisor.start_link(...)
end
```

## Installation

This library is available on Hex:
Expand Down
175 changes: 149 additions & 26 deletions instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
defmodule OpentelemetryBroadway do
@moduledoc """
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines.
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with optional trace propagation support.

It supports job start, stop, and exception events.
It supports job start, stop, and exception events with automatic distributed tracing context extraction.

## Usage

### Basic Setup

In your application's `c:Application.start/2` callback:

def start(_type, _args) do
Expand All @@ -14,8 +16,58 @@ defmodule OpentelemetryBroadway do
# ...
end

### With Trace Propagation

For Broadway pipelines that need distributed tracing context extraction from message headers/attributes:

def start(_type, _args) do
:ok = OpentelemetryBroadway.setup(propagation: true)

# ...
end

**Important**: When using trace propagation, your producer must be configured to extract headers/attributes.

#### RabbitMQ

For RabbitMQ, configure your `BroadwayRabbitMQ.Producer` with `metadata: [:headers]`:

Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayRabbitMQ.Producer,
metadata: [:headers], # Required for trace propagation!
}
]
)

#### Amazon SQS

For Amazon SQS, configure your `BroadwaySQS.Producer` to extract trace context from standard attributes or custom message attributes:

# For standard AWS X-Ray trace headers
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwaySQS.Producer,
attribute_names: [:aws_trace_header] # Standard attribute
}
]
)

# For custom trace headers in message attributes
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwaySQS.Producer,
message_attribute_names: ["traceparent", "tracestate"] # Custom attributes
}
]
)
"""

alias OpenTelemetry.Ctx
alias OpenTelemetry.Tracer
alias OpenTelemetry.SemanticConventions
alias OpenTelemetry.Span
alias OpenTelemetry.SemanticConventions.Trace
Expand All @@ -24,34 +76,53 @@ defmodule OpentelemetryBroadway do

@tracer_id __MODULE__

@type setup_opts :: [propagation: boolean()]

@doc """
Attaches the Telemetry handlers, returning `:ok` if successful.

## Options

- `propagation` - Enable trace propagation from message headers

## Examples

# Basic setup
OpentelemetryBroadway.setup()

# With trace propagation
OpentelemetryBroadway.setup(propagation: true)

"""
@spec setup :: :ok
def setup do
:ok =
:telemetry.attach(
"#{__MODULE__}.message_start",
[:broadway, :processor, :message, :start],
&__MODULE__.handle_message_start/4,
[]
)
@spec setup(setup_opts()) :: :ok
def setup(opts \\ [])

:ok =
:telemetry.attach(
"#{__MODULE__}.message_stop",
[:broadway, :processor, :message, :stop],
&__MODULE__.handle_message_stop/4,
[]
)
def setup(opts) do
opts =
opts
|> Enum.into(%{})
|> Map.put_new(:propagation, true)

:ok =
:telemetry.attach(
"#{__MODULE__}.job_exception",
[:broadway, :processor, :message, :exception],
&__MODULE__.handle_message_exception/4,
[]
)
:telemetry.attach(
"#{__MODULE__}.message_start",
[:broadway, :processor, :message, :start],
&__MODULE__.handle_message_start/4,
opts
)

:telemetry.attach(
"#{__MODULE__}.message_stop",
[:broadway, :processor, :message, :stop],
&__MODULE__.handle_message_stop/4,
opts
)

:telemetry.attach(
"#{__MODULE__}.message_exception",
[:broadway, :processor, :message, :exception],
&__MODULE__.handle_message_exception/4,
opts
)

:ok
end
Expand All @@ -66,11 +137,13 @@ defmodule OpentelemetryBroadway do
name: name,
message: %Broadway.Message{} = message
} = metadata,
_config
config
) do
span_name = "#{inspect(topology_name)}/#{Atom.to_string(processor_key)} process"
client_id = inspect(name)

links = get_propagated_ctx(message, config)

attributes = %{
SemanticConventions.Trace.messaging_system() => :broadway,
SemanticConventions.Trace.messaging_operation() => :process,
Expand All @@ -90,6 +163,7 @@ defmodule OpentelemetryBroadway do

OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
kind: :consumer,
links: links,
attributes: attributes
})
end
Expand Down Expand Up @@ -139,4 +213,53 @@ defmodule OpentelemetryBroadway do

defp format_error(err) when is_binary(err), do: err
defp format_error(err), do: inspect(err)

defp get_propagated_ctx(message, %{propagation: true} = _config) do
message
|> get_message_headers()
|> Enum.map(&normalize_header/1)
|> Enum.reject(&is_nil/1)
|> extract_to_ctx()
end

defp get_propagated_ctx(_message, _config), do: []

defp extract_to_ctx([]) do
[]
end

defp extract_to_ctx(headers) do
# Extract context into separate context to avoid polluting current context
parent_ctx =
:otel_propagator_text_map.extract_to(Ctx.new(), headers)
|> Tracer.current_span_ctx()

# Create links to parent if it exists
case parent_ctx do
:undefined -> []
_ -> [OpenTelemetry.link(parent_ctx)]
end
end

# RabbitMQ: headers are in metadata.headers as a list
defp get_message_headers(%Broadway.Message{metadata: %{headers: headers}}) when is_list(headers), do: headers

# SQS: both standard attributes and custom message attributes can contain trace context
defp get_message_headers(%Broadway.Message{metadata: metadata}) do
attributes = Map.get(metadata, :attributes, %{})
message_attributes = Map.get(metadata, :message_attributes, %{})

# Combine both attribute types and convert to list
attributes
|> Map.merge(message_attributes)
|> Enum.to_list()
end

defp get_message_headers(_message), do: []

# RabbitMQ format: {key, type, value}
defp normalize_header({key, _type, value}) when is_binary(key) and is_binary(value), do: {key, value}
# Standard format: {key, value}
defp normalize_header({key, value}) when is_binary(key) and is_binary(value), do: {key, value}
defp normalize_header(_value), do: nil
end
Loading
Loading