Skip to content

Commit fd72232

Browse files
authored
feat:(opentelemetry_broadway): add context propagation integration (#540)
Signed-off-by: Yordis Prieto <[email protected]>
1 parent 07ec3d6 commit fd72232

File tree

3 files changed

+408
-32
lines changed

3 files changed

+408
-32
lines changed

instrumentation/opentelemetry_broadway/README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
[![Hex.pm](https://img.shields.io/hexpm/v/opentelemetry_cowboy)](https://hex.pm/packages/opentelemetry_cowboy)
55
![Build Status](https://github.com/open-telemetry/opentelemetry-erlang-contrib/workflows/Erlang/badge.svg)
66

7-
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines.
7+
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with support for distributed tracing.
88

99
## Usage
1010

11-
After installing, set up the handler in your application's `Application.start/2` callback, before your top-level supervisor starts:
11+
### Basic Setup
12+
13+
For basic Broadway instrumentation, set up the handler in your application's `Application.start/2` callback:
1214

1315
```elixir
1416
def start(_type, _args) do
@@ -18,6 +20,19 @@ def start(_type, _args) do
1820
end
1921
```
2022

23+
### With Trace Propagation
24+
25+
For Broadway pipelines that need distributed tracing with linked spans across services (extracts context from message headers and creates trace links):
26+
27+
```elixir
28+
def start(_type, _args) do
29+
# Enable trace propagation from message headers
30+
OpentelemetryBroadway.setup(propagation: true)
31+
32+
Supervisor.start_link(...)
33+
end
34+
```
35+
2136
## Installation
2237

2338
This library is available on Hex:

instrumentation/opentelemetry_broadway/lib/opentelemetry_broadway.ex

Lines changed: 150 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
defmodule OpentelemetryBroadway do
22
@moduledoc """
3-
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines.
3+
OpenTelemetry tracing for [Broadway](https://elixir-broadway.org/) pipelines with optional trace propagation support.
44
5-
It supports job start, stop, and exception events.
5+
It supports job start, stop, and exception events with automatic distributed tracing context extraction.
66
77
## Usage
88
9+
### Basic Setup
10+
911
In your application's `c:Application.start/2` callback:
1012
1113
def start(_type, _args) do
@@ -14,8 +16,59 @@ defmodule OpentelemetryBroadway do
1416
# ...
1517
end
1618
19+
### With Trace Propagation
20+
21+
For Broadway pipelines that need distributed tracing context extraction from message headers/attributes:
22+
23+
def start(_type, _args) do
24+
:ok = OpentelemetryBroadway.setup(propagation: true)
25+
26+
# ...
27+
end
28+
29+
> #### Extracting Headers and Attributes {: .info}
30+
> When using trace propagation, your producer must be configured to extract headers/attributes.
31+
32+
#### RabbitMQ
33+
34+
For RabbitMQ, configure your `BroadwayRabbitMQ.Producer` with `metadata: [:headers]`:
35+
36+
Broadway.start_link(MyBroadway,
37+
name: MyBroadway,
38+
producer: [
39+
module: {BroadwayRabbitMQ.Producer,
40+
metadata: [:headers], # Required for trace propagation!
41+
}
42+
]
43+
)
44+
45+
#### Amazon SQS
46+
47+
For Amazon SQS, configure your `BroadwaySQS.Producer` to extract trace context from standard attributes or custom message attributes:
48+
49+
# For standard AWS X-Ray trace headers
50+
Broadway.start_link(MyBroadway,
51+
name: MyBroadway,
52+
producer: [
53+
module: {BroadwaySQS.Producer,
54+
attribute_names: [:aws_trace_header] # Standard attribute
55+
}
56+
]
57+
)
58+
59+
# For custom trace headers in message attributes
60+
Broadway.start_link(MyBroadway,
61+
name: MyBroadway,
62+
producer: [
63+
module: {BroadwaySQS.Producer,
64+
message_attribute_names: ["traceparent", "tracestate"] # Custom attributes
65+
}
66+
]
67+
)
1768
"""
1869

70+
alias OpenTelemetry.Ctx
71+
alias OpenTelemetry.Tracer
1972
alias OpenTelemetry.SemanticConventions
2073
alias OpenTelemetry.Span
2174
alias OpenTelemetry.SemanticConventions.Trace
@@ -24,34 +77,53 @@ defmodule OpentelemetryBroadway do
2477

2578
@tracer_id __MODULE__
2679

80+
@type setup_opts :: [propagation: boolean()]
81+
2782
@doc """
2883
Attaches the Telemetry handlers, returning `:ok` if successful.
84+
85+
## Options
86+
87+
- `propagation` - Enable trace propagation from message headers
88+
89+
## Examples
90+
91+
# Basic setup
92+
OpentelemetryBroadway.setup()
93+
94+
# With trace propagation
95+
OpentelemetryBroadway.setup(propagation: true)
96+
2997
"""
30-
@spec setup :: :ok
31-
def setup do
32-
:ok =
33-
:telemetry.attach(
34-
"#{__MODULE__}.message_start",
35-
[:broadway, :processor, :message, :start],
36-
&__MODULE__.handle_message_start/4,
37-
[]
38-
)
98+
@spec setup(setup_opts()) :: :ok
99+
def setup(opts \\ [])
39100

40-
:ok =
41-
:telemetry.attach(
42-
"#{__MODULE__}.message_stop",
43-
[:broadway, :processor, :message, :stop],
44-
&__MODULE__.handle_message_stop/4,
45-
[]
46-
)
101+
def setup(opts) do
102+
opts =
103+
opts
104+
|> Enum.into(%{})
105+
|> Map.put_new(:propagation, true)
47106

48-
:ok =
49-
:telemetry.attach(
50-
"#{__MODULE__}.job_exception",
51-
[:broadway, :processor, :message, :exception],
52-
&__MODULE__.handle_message_exception/4,
53-
[]
54-
)
107+
:telemetry.attach(
108+
"#{__MODULE__}.message_start",
109+
[:broadway, :processor, :message, :start],
110+
&__MODULE__.handle_message_start/4,
111+
opts
112+
)
113+
114+
:telemetry.attach(
115+
"#{__MODULE__}.message_stop",
116+
[:broadway, :processor, :message, :stop],
117+
&__MODULE__.handle_message_stop/4,
118+
opts
119+
)
120+
121+
:telemetry.attach(
122+
"#{__MODULE__}.message_exception",
123+
[:broadway, :processor, :message, :exception],
124+
&__MODULE__.handle_message_exception/4,
125+
opts
126+
)
55127

56128
:ok
57129
end
@@ -66,11 +138,13 @@ defmodule OpentelemetryBroadway do
66138
name: name,
67139
message: %Broadway.Message{} = message
68140
} = metadata,
69-
_config
141+
config
70142
) do
71143
span_name = "#{inspect(topology_name)}/#{Atom.to_string(processor_key)} process"
72144
client_id = inspect(name)
73145

146+
links = get_propagated_ctx(message, config)
147+
74148
attributes = %{
75149
SemanticConventions.Trace.messaging_system() => :broadway,
76150
SemanticConventions.Trace.messaging_operation() => :process,
@@ -90,6 +164,7 @@ defmodule OpentelemetryBroadway do
90164

91165
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
92166
kind: :consumer,
167+
links: links,
93168
attributes: attributes
94169
})
95170
end
@@ -139,4 +214,53 @@ defmodule OpentelemetryBroadway do
139214

140215
defp format_error(err) when is_binary(err), do: err
141216
defp format_error(err), do: inspect(err)
217+
218+
defp get_propagated_ctx(message, %{propagation: true} = _config) do
219+
message
220+
|> get_message_headers()
221+
|> Enum.map(&normalize_header/1)
222+
|> Enum.reject(&is_nil/1)
223+
|> extract_to_ctx()
224+
end
225+
226+
defp get_propagated_ctx(_message, _config), do: []
227+
228+
defp extract_to_ctx([]) do
229+
[]
230+
end
231+
232+
defp extract_to_ctx(headers) do
233+
# Extract context into separate context to avoid polluting current context
234+
parent_ctx =
235+
:otel_propagator_text_map.extract_to(Ctx.new(), headers)
236+
|> Tracer.current_span_ctx()
237+
238+
# Create links to parent if it exists
239+
case parent_ctx do
240+
:undefined -> []
241+
_ -> [OpenTelemetry.link(parent_ctx)]
242+
end
243+
end
244+
245+
# RabbitMQ: headers are in metadata.headers as a list
246+
defp get_message_headers(%Broadway.Message{metadata: %{headers: headers}}) when is_list(headers), do: headers
247+
248+
# SQS: both standard attributes and custom message attributes can contain trace context
249+
defp get_message_headers(%Broadway.Message{metadata: metadata}) do
250+
attributes = Map.get(metadata, :attributes, %{})
251+
message_attributes = Map.get(metadata, :message_attributes, %{})
252+
253+
# Combine both attribute types and convert to list
254+
attributes
255+
|> Map.merge(message_attributes)
256+
|> Enum.to_list()
257+
end
258+
259+
defp get_message_headers(_message), do: []
260+
261+
# RabbitMQ format: {key, type, value}
262+
defp normalize_header({key, _type, value}) when is_binary(key) and is_binary(value), do: {key, value}
263+
# Standard format: {key, value}
264+
defp normalize_header({key, value}) when is_binary(key) and is_binary(value), do: {key, value}
265+
defp normalize_header(_value), do: nil
142266
end

0 commit comments

Comments
 (0)