Skip to content

Commit e922d4a

Browse files
mrmarcsmithMarc Smithmaennchen
authored
Solution: Add telemetry to track metrics (#453)
* finished adding, deleteing and updating jobs telemetry tests * Fixed CI async overlay by passing a unique value for each test * lint * added logs to debug CI failure * added Job_name to prevent collision in CI * fixed typo * added telemetry job span tracking * fixed readme * fixed dialyzer error * requested changes * lint * Injected scheduler to incude in telemetry metrics * lint * Apply suggestions from code review Co-authored-by: Jonatan Männchen <[email protected]> * fixed tests, dialyzer doesnt like Module.t * Module.t() to atom Co-authored-by: Marc Smith <[email protected]> Co-authored-by: Jonatan Männchen <[email protected]>
1 parent 5467582 commit e922d4a

File tree

11 files changed

+474
-40
lines changed

11 files changed

+474
-40
lines changed

lib/quantum/executor.ex

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ defmodule Quantum.Executor do
2525
@spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok
2626
# Execute task on all given nodes without checking for overlap
2727
defp execute(
28-
%StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging},
28+
%StartOpts{
29+
task_supervisor_reference: task_supervisor,
30+
debug_logging: debug_logging,
31+
scheduler: scheduler
32+
},
2933
%Job{overlap: true} = job,
3034
node
3135
) do
32-
run(node, job, task_supervisor, debug_logging)
36+
run(node, job, task_supervisor, debug_logging, scheduler)
3337

3438
:ok
3539
end
@@ -39,7 +43,8 @@ defmodule Quantum.Executor do
3943
%StartOpts{
4044
task_supervisor_reference: task_supervisor,
4145
task_registry_reference: task_registry,
42-
debug_logging: debug_logging
46+
debug_logging: debug_logging,
47+
scheduler: scheduler
4348
},
4449
%Job{overlap: false, name: job_name} = job,
4550
node
@@ -51,7 +56,7 @@ defmodule Quantum.Executor do
5156

5257
case TaskRegistry.mark_running(task_registry, job_name, node) do
5358
:marked_running ->
54-
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging)
59+
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging, scheduler)
5560

5661
receive do
5762
{^ref, _} ->
@@ -69,8 +74,8 @@ defmodule Quantum.Executor do
6974
end
7075

7176
# Ececute the given function on a given node via the task supervisor
72-
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t()
73-
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do
77+
@spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t()
78+
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do
7479
debug_logging &&
7580
Logger.debug(fn ->
7681
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
@@ -84,6 +89,15 @@ defmodule Quantum.Executor do
8489
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
8590
end)
8691

92+
# Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality
93+
start_monotonic_time = :erlang.monotonic_time()
94+
95+
:telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{
96+
job_name: job_name,
97+
node: inspect(node),
98+
scheduler: scheduler
99+
})
100+
87101
try do
88102
execute_task(task)
89103
catch
@@ -94,6 +108,16 @@ defmodule Quantum.Executor do
94108
inspect(job_name)
95109
}, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}"
96110
end)
111+
112+
duration = :erlang.monotonic_time() - start_monotonic_time
113+
114+
:telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{
115+
job_name: job_name,
116+
node: inspect(node),
117+
reason: value,
118+
stacktrace: __STACKTRACE__,
119+
scheduler: scheduler
120+
})
97121
else
98122
result ->
99123
debug_logging &&
@@ -102,6 +126,14 @@ defmodule Quantum.Executor do
102126
inspect(job_name)
103127
}, which yielded result: #{inspect(result)}"
104128
end)
129+
130+
duration = :erlang.monotonic_time() - start_monotonic_time
131+
132+
:telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{
133+
job_name: job_name,
134+
node: inspect(node),
135+
scheduler: scheduler
136+
})
105137
end
106138

107139
:ok

lib/quantum/executor/start_opts.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ defmodule Quantum.Executor.StartOpts do
66
@type t :: %__MODULE__{
77
task_supervisor_reference: GenServer.server(),
88
task_registry_reference: GenServer.server(),
9-
debug_logging: boolean
9+
debug_logging: boolean,
10+
scheduler: atom()
1011
}
1112

1213
@enforce_keys [
1314
:task_supervisor_reference,
1415
:task_registry_reference,
15-
:debug_logging
16+
:debug_logging,
17+
:scheduler
1618
]
1719
defstruct @enforce_keys
1820
end

lib/quantum/executor_supervisor.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ defmodule Quantum.ExecutorSupervisor do
1919
:node_selector_broadcaster_reference,
2020
:task_supervisor_reference,
2121
:task_registry_reference,
22-
:debug_logging
22+
:debug_logging,
23+
:scheduler
2324
])
2425
),
2526
name: name
@@ -49,7 +50,8 @@ defmodule Quantum.ExecutorSupervisor do
4950
Map.take(opts, [
5051
:task_supervisor_reference,
5152
:task_registry_reference,
52-
:debug_logging
53+
:debug_logging,
54+
:scheduler
5355
])
5456
)
5557

lib/quantum/executor_supervisor/init_opts.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do
77
node_selector_broadcaster_reference: GenServer.server(),
88
task_supervisor_reference: GenServer.server(),
99
task_registry_reference: GenServer.server(),
10-
debug_logging: boolean
10+
debug_logging: boolean,
11+
scheduler: atom()
1112
}
1213

1314
@enforce_keys [
1415
:node_selector_broadcaster_reference,
1516
:task_supervisor_reference,
1617
:task_registry_reference,
17-
:debug_logging
18+
:debug_logging,
19+
:scheduler
1820
]
1921
defstruct @enforce_keys
2022
end

lib/quantum/executor_supervisor/start_opts.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do
88
node_selector_broadcaster_reference: GenServer.server(),
99
task_supervisor_reference: GenServer.server(),
1010
task_registry_reference: GenServer.server(),
11-
debug_logging: boolean()
11+
debug_logging: boolean(),
12+
scheduler: atom()
1213
}
1314

1415
@enforce_keys [
1516
:name,
1617
:node_selector_broadcaster_reference,
1718
:task_supervisor_reference,
1819
:task_registry_reference,
19-
:debug_logging
20+
:debug_logging,
21+
:scheduler
2022
]
2123
defstruct @enforce_keys
2224
end

lib/quantum/job_broadcaster.ex

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ defmodule Quantum.JobBroadcaster do
6060
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
6161
end)
6262

63+
for %Job{state: :active, name: name} = job <- storage_jobs do
64+
# Send event to telemetry incase the end user wants to monitor events
65+
:telemetry.execute([:quantum, :job, :add], %{}, %{
66+
job_name: name,
67+
job: job,
68+
node: inspect(Node.self()),
69+
scheduler: scheduler
70+
})
71+
end
72+
6373
storage_jobs
6474
end
6575

@@ -99,6 +109,14 @@ defmodule Quantum.JobBroadcaster do
99109
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
100110
end)
101111

112+
# Send event to telemetry incase the end user wants to monitor events
113+
:telemetry.execute([:quantum, :job, :update], %{}, %{
114+
job_name: job_name,
115+
job: job,
116+
node: inspect(Node.self()),
117+
scheduler: state.scheduler
118+
})
119+
102120
:ok = storage.delete_job(storage_pid, job_name)
103121
:ok = storage.add_job(storage_pid, job)
104122

@@ -111,6 +129,14 @@ defmodule Quantum.JobBroadcaster do
111129
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
112130
end)
113131

132+
# Send event to telemetry incase the end user wants to monitor events
133+
:telemetry.execute([:quantum, :job, :update], %{}, %{
134+
job_name: job_name,
135+
job: job,
136+
node: inspect(Node.self()),
137+
scheduler: state.scheduler
138+
})
139+
114140
:ok = storage.delete_job(storage_pid, job_name)
115141
:ok = storage.add_job(storage_pid, job)
116142

@@ -122,6 +148,14 @@ defmodule Quantum.JobBroadcaster do
122148
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
123149
end)
124150

151+
# Send event to telemetry incase the end user wants to monitor events
152+
:telemetry.execute([:quantum, :job, :add], %{}, %{
153+
job_name: job_name,
154+
job: job,
155+
node: inspect(Node.self()),
156+
scheduler: state.scheduler
157+
})
158+
125159
:ok = storage.add_job(storage_pid, job)
126160

127161
{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
@@ -144,6 +178,14 @@ defmodule Quantum.JobBroadcaster do
144178
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
145179
end)
146180

181+
# Send event to telemetry incase the end user wants to monitor events
182+
:telemetry.execute([:quantum, :job, :update], %{}, %{
183+
job_name: job_name,
184+
job: job,
185+
node: inspect(Node.self()),
186+
scheduler: state.scheduler
187+
})
188+
147189
:ok = storage.delete_job(storage_pid, job_name)
148190
:ok = storage.add_job(storage_pid, job)
149191

@@ -155,6 +197,14 @@ defmodule Quantum.JobBroadcaster do
155197
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
156198
end)
157199

200+
# Send event to telemetry incase the end user wants to monitor events
201+
:telemetry.execute([:quantum, :job, :update], %{}, %{
202+
job_name: job_name,
203+
job: job,
204+
node: inspect(Node.self()),
205+
scheduler: state.scheduler
206+
})
207+
158208
:ok = storage.delete_job(storage_pid, job_name)
159209
:ok = storage.add_job(storage_pid, job)
160210

@@ -166,6 +216,14 @@ defmodule Quantum.JobBroadcaster do
166216
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
167217
end)
168218

219+
# Send event to telemetry incase the end user wants to monitor events
220+
:telemetry.execute([:quantum, :job, :add], %{}, %{
221+
job_name: job_name,
222+
job: job,
223+
node: inspect(Node.self()),
224+
scheduler: state.scheduler
225+
})
226+
169227
:ok = storage.add_job(storage_pid, job)
170228

171229
{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
@@ -187,12 +245,28 @@ defmodule Quantum.JobBroadcaster do
187245
end)
188246

189247
case Map.fetch(jobs, name) do
190-
{:ok, %{state: :active}} ->
248+
{:ok, %{state: :active, name: name} = job} ->
249+
# Send event to telemetry incase the end user wants to monitor events
250+
:telemetry.execute([:quantum, :job, :delete], %{}, %{
251+
job_name: name,
252+
job: job,
253+
node: inspect(Node.self()),
254+
scheduler: state.scheduler
255+
})
256+
191257
:ok = storage.delete_job(storage_pid, name)
192258

193259
{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}
194260

195-
{:ok, %{state: :inactive}} ->
261+
{:ok, %{state: :inactive, name: name} = job} ->
262+
# Send event to telemetry incase the end user wants to monitor events
263+
:telemetry.execute([:quantum, :job, :delete], %{}, %{
264+
job_name: name,
265+
job: job,
266+
node: inspect(Node.self()),
267+
scheduler: state.scheduler
268+
})
269+
196270
:ok = storage.delete_job(storage_pid, name)
197271

198272
{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}
@@ -224,6 +298,14 @@ defmodule Quantum.JobBroadcaster do
224298
{:noreply, [], state}
225299

226300
{:ok, job} ->
301+
# Send event to telemetry incase the end user wants to monitor events
302+
:telemetry.execute([:quantum, :job, :update], %{}, %{
303+
job_name: name,
304+
job: job,
305+
node: inspect(Node.self()),
306+
scheduler: state.scheduler
307+
})
308+
227309
jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))
228310

229311
:ok = storage.update_job_state(storage_pid, job.name, new_state)
@@ -252,6 +334,16 @@ defmodule Quantum.JobBroadcaster do
252334
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
253335
end)
254336

337+
for {name, %Job{} = job} <- jobs do
338+
# Send event to telemetry incase the end user wants to monitor events
339+
:telemetry.execute([:quantum, :job, :delete], %{}, %{
340+
job_name: name,
341+
job: job,
342+
node: inspect(Node.self()),
343+
scheduler: state.scheduler
344+
})
345+
end
346+
255347
messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}
256348

257349
:ok = storage.purge(storage_pid)

lib/quantum/supervisor.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ defmodule Quantum.Supervisor do
8888
|> Map.put(:task_supervisor_reference, task_supervisor_name)
8989
|> Map.put(:task_registry_reference, task_registry_name)
9090
|> Map.put(:name, executor_supervisor_name)
91+
|> Map.put(:scheduler, scheduler)
9192
)
9293

9394
Supervisor.init(

mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ defmodule Quantum.Mixfile do
101101
{:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false},
102102
{:excoveralls, "~> 0.5", only: [:test], runtime: false},
103103
{:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false},
104-
{:credo, "~> 1.0", only: [:dev], runtime: false}
104+
{:credo, "~> 1.0", only: [:dev], runtime: false},
105+
{:telemetry, "~> 0.4"}
105106
]
106107
end
107108
end

pages/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,6 @@ Timezones can also be configured on a per-job basis. This overrides the default
154154
timezone: "America/New_York"
155155
}
156156
```
157+
158+
## Telemetry Support
159+

0 commit comments

Comments
 (0)