Skip to content

Commit 24b44c0

Browse files
committed
Trace all Oban events
1 parent 2225a9b commit 24b44c0

File tree

5 files changed

+216
-13
lines changed

5 files changed

+216
-13
lines changed

instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,17 @@ defmodule OpentelemetryOban do
2626
@doc """
2727
Initializes and configures telemetry handlers.
2828
29-
By default jobs and plugins are traced. If you wish to trace only jobs then
30-
use:
29+
By default everything is traced. If you wish to trace only jobs then use:
3130
3231
OpentelemetryOban.setup(trace: [:jobs])
3332
34-
Note that if you don't trace plugins, but inside the plugins, there are spans
35-
from other instrumentation libraries (e.g. ecto) then these will still be
36-
traced. This setting controls only the spans that are created by
37-
opentelemetry_oban.
33+
Note that if you don't trace plugins or internal, there will be spans from
34+
other instrumentation libraries (e.g. ecto) that would be traced. This setting
35+
controls only the spans that are created by opentelemetry_oban.
3836
"""
3937
@spec setup() :: :ok
4038
def setup(opts \\ []) do
41-
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
39+
trace = Keyword.get(opts, :trace, [:jobs, :plugins, :internal])
4240

4341
if Enum.member?(trace, :jobs) do
4442
OpentelemetryOban.JobHandler.attach()
@@ -48,6 +46,10 @@ defmodule OpentelemetryOban do
4846
OpentelemetryOban.PluginHandler.attach()
4947
end
5048

49+
if Enum.member?(trace, :internal) do
50+
OpentelemetryOban.InternalHandler.attach()
51+
end
52+
5153
:ok
5254
end
5355

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
defmodule OpentelemetryOban.InternalHandler do
2+
alias OpenTelemetry.Span
3+
alias OpenTelemetry.SemanticConventions.Trace
4+
5+
require Trace
6+
7+
@tracer_id __MODULE__
8+
9+
def attach() do
10+
:telemetry.attach_many(
11+
{__MODULE__, :internal},
12+
Enum.flat_map(
13+
[
14+
[:engine, :init],
15+
[:engine, :refresh],
16+
[:engine, :put_meta],
17+
[:engine, :check_available],
18+
[:engine, :cancel_all_jobs],
19+
[:engine, :fetch_jobs],
20+
[:engine, :insert_all_jobs],
21+
[:engine, :prune_all_jobs],
22+
[:engine, :stage_jobs],
23+
[:engine, :cancel_job],
24+
[:engine, :complete_job],
25+
[:engine, :discard_job],
26+
[:engine, :error_job],
27+
[:engine, :insert_job],
28+
[:engine, :snooze_job],
29+
[:notifier, :notify],
30+
[:peer, :election]
31+
],
32+
fn event ->
33+
[
34+
[:oban | event ++ [:start]],
35+
[:oban | event ++ [:stop]],
36+
[:oban | event ++ [:exception]]
37+
]
38+
end
39+
),
40+
&__MODULE__.handle_oban_event/4,
41+
[]
42+
)
43+
end
44+
45+
def handle_oban_event(event, _measurements, metadata, _config) do
46+
[op | rest] = Enum.reverse(event)
47+
48+
case op do
49+
:start ->
50+
OpentelemetryTelemetry.start_telemetry_span(__MODULE__, Enum.join(Enum.reverse(rest), "."), metadata, %{kind: :consumer})
51+
52+
:stop ->
53+
OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)
54+
55+
:exception ->
56+
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
57+
58+
Span.record_exception(ctx, metadata.reason, metadata.stacktrace)
59+
Span.set_status(ctx, OpenTelemetry.status(:error, Exception.message(metadata.reason)))
60+
61+
OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)
62+
end
63+
end
64+
end

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ defmodule OpentelemetryOban.JobHandler do
1414

1515
defp attach_job_start_handler() do
1616
:telemetry.attach(
17-
"#{__MODULE__}.job_start",
17+
{__MODULE__, [:job, :start]},
1818
[:oban, :job, :start],
1919
&__MODULE__.handle_job_start/4,
2020
[]
@@ -23,7 +23,7 @@ defmodule OpentelemetryOban.JobHandler do
2323

2424
defp attach_job_stop_handler() do
2525
:telemetry.attach(
26-
"#{__MODULE__}.job_stop",
26+
{__MODULE__, [:job, :stop]},
2727
[:oban, :job, :stop],
2828
&__MODULE__.handle_job_stop/4,
2929
[]
@@ -32,7 +32,7 @@ defmodule OpentelemetryOban.JobHandler do
3232

3333
defp attach_job_exception_handler() do
3434
:telemetry.attach(
35-
"#{__MODULE__}.job_exception",
35+
{__MODULE__, [:job, :exception]},
3636
[:oban, :job, :exception],
3737
&__MODULE__.handle_job_exception/4,
3838
[]

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule OpentelemetryOban.PluginHandler do
1212

1313
defp attach_plugin_start_handler() do
1414
:telemetry.attach(
15-
"#{__MODULE__}.plugin_start",
15+
{__MODULE__, [:plugin, :start]},
1616
[:oban, :plugin, :start],
1717
&__MODULE__.handle_plugin_start/4,
1818
[]
@@ -21,7 +21,7 @@ defmodule OpentelemetryOban.PluginHandler do
2121

2222
defp attach_plugin_stop_handler() do
2323
:telemetry.attach(
24-
"#{__MODULE__}.plugin_stop",
24+
{__MODULE__, [:plugin, :stop]},
2525
[:oban, :plugin, :stop],
2626
&__MODULE__.handle_plugin_stop/4,
2727
[]
@@ -30,7 +30,7 @@ defmodule OpentelemetryOban.PluginHandler do
3030

3131
defp attach_plugin_exception_handler() do
3232
:telemetry.attach(
33-
"#{__MODULE__}.plugin_exception",
33+
{__MODULE__, [:plugin, :exception]},
3434
[:oban, :plugin, :exception],
3535
&__MODULE__.handle_plugin_exception/4,
3636
[]
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
defmodule OpentelemetryOban.InternalHandlerTest do
2+
use DataCase
3+
4+
require OpenTelemetry.Tracer
5+
require OpenTelemetry.Span
6+
require Record
7+
8+
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
9+
Record.defrecord(name, spec)
10+
end
11+
12+
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
13+
Record.defrecord(name, spec)
14+
end
15+
16+
@events [
17+
[:engine, :init],
18+
[:engine, :refresh],
19+
[:engine, :put_meta],
20+
[:engine, :check_available],
21+
[:engine, :cancel_all_jobs],
22+
[:engine, :fetch_jobs],
23+
[:engine, :insert_all_jobs],
24+
[:engine, :prune_all_jobs],
25+
[:engine, :stage_jobs],
26+
[:engine, :cancel_job],
27+
[:engine, :complete_job],
28+
[:engine, :discard_job],
29+
[:engine, :error_job],
30+
[:engine, :insert_job],
31+
[:engine, :snooze_job],
32+
[:notifier, :notify],
33+
[:peer, :election]
34+
]
35+
36+
setup do
37+
:application.stop(:opentelemetry)
38+
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)
39+
40+
:application.set_env(:opentelemetry, :processors, [
41+
{:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}}
42+
])
43+
44+
:application.start(:opentelemetry)
45+
46+
TestHelpers.remove_oban_handlers()
47+
OpentelemetryOban.setup(trace: [:internal])
48+
49+
:ok
50+
end
51+
52+
test "does not create spans when internal tracing is disabled" do
53+
TestHelpers.remove_oban_handlers()
54+
OpentelemetryOban.setup(trace: [])
55+
56+
execute_internal_event([:peer, :election])
57+
58+
refute_receive {:span, span(name: "oban.peer.election")}
59+
end
60+
61+
test "records span on internal execution" do
62+
execute_internal_event([:peer, :election])
63+
64+
assert_receive {:span, span(name: "oban.peer.election")}
65+
end
66+
67+
test "records span on error" do
68+
:telemetry.execute(
69+
[:oban, :peer, :election, :start],
70+
%{system_time: System.system_time()},
71+
%{}
72+
)
73+
74+
exception = %UndefinedFunctionError{
75+
arity: 0,
76+
function: :error,
77+
message: nil,
78+
module: Some,
79+
reason: nil
80+
}
81+
82+
:telemetry.execute(
83+
[:oban, :peer, :election, :exception],
84+
%{duration: 444},
85+
%{
86+
kind: :error,
87+
stacktrace: [
88+
{Some, :error, [], []}
89+
],
90+
reason: exception
91+
}
92+
)
93+
94+
expected_status = OpenTelemetry.status(:error, Exception.message(exception))
95+
96+
assert_receive {:span,
97+
span(
98+
name: "oban.peer.election",
99+
events: events,
100+
status: ^expected_status
101+
)}
102+
103+
[
104+
event(
105+
name: :exception,
106+
attributes: event_attributes
107+
)
108+
] = :otel_events.list(events)
109+
110+
assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
111+
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
112+
end
113+
114+
for event <- @events do
115+
test "#{inspect([:oban | event])} spans" do
116+
execute_internal_event(unquote(event))
117+
118+
assert_receive {:span, span(name: "oban.#{unquote(Enum.join(event, "."))}")}
119+
120+
:ok
121+
end
122+
end
123+
124+
defp execute_internal_event(event) do
125+
:telemetry.execute(
126+
[:oban | event ++ [:start]],
127+
%{system_time: System.system_time()},
128+
%{}
129+
)
130+
131+
:telemetry.execute(
132+
[:oban | event ++ [:stop]],
133+
%{duration: 42069},
134+
%{}
135+
)
136+
end
137+
end

0 commit comments

Comments
 (0)