Skip to content

Commit fe0253a

Browse files
authored
Solution: Support manual job triggering (#459)
* Seems that run_job is working. Needs a test * scheduler run_job tests and some docs * changes requested in PR 459
1 parent 8f00232 commit fe0253a

File tree

6 files changed

+70
-0
lines changed

6 files changed

+70
-0
lines changed

lib/quantum.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ defmodule Quantum do
148148
"""
149149
@callback activate_job(GenStage.stage(), atom) :: :ok
150150

151+
@doc """
152+
Runs a job by name once
153+
"""
154+
@callback run_job(GenStage.stage(), atom) :: :ok
155+
151156
@doc """
152157
Resolves a job by name
153158
"""
@@ -289,6 +294,12 @@ defmodule Quantum do
289294
GenStage.cast(server, {:change_state, name, :active})
290295
end
291296

297+
@impl behaviour
298+
def run_job(server \\ __job_broadcaster__(), name)
299+
when is_atom(name) or is_reference(name) do
300+
GenStage.cast(server, {:run_job, name})
301+
end
302+
292303
@impl behaviour
293304
def find_job(server \\ __job_broadcaster__(), name)
294305
when is_atom(name) or is_reference(name) do

lib/quantum/execution_broadcaster.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ defmodule Quantum.ExecutionBroadcaster do
121121
{[], %{state | uninitialized_jobs: [job | uninitialized_jobs]}}
122122
end
123123

124+
def handle_event(
125+
{:run, %Job{name: name} = job},
126+
%State{debug_logging: debug_logging} = state
127+
) do
128+
debug_logging &&
129+
Logger.debug(fn ->
130+
"[#{inspect(Node.self())}][#{__MODULE__}] Running job #{inspect(name)} once"
131+
end)
132+
133+
{[%ExecuteEvent{job: job}], state}
134+
end
135+
124136
def handle_event(
125137
{:remove, name},
126138
%State{

lib/quantum/job_broadcaster.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,27 @@ defmodule Quantum.JobBroadcaster do
300300
end
301301
end
302302

303+
def handle_cast(
304+
{:run_job, name},
305+
%State{
306+
jobs: jobs,
307+
debug_logging: debug_logging
308+
} = state
309+
) do
310+
debug_logging &&
311+
Logger.debug(fn ->
312+
"[#{inspect(Node.self())}][#{__MODULE__}] Running job #{inspect(name)} once"
313+
end)
314+
315+
case Map.fetch(jobs, name) do
316+
:error ->
317+
{:noreply, [], state}
318+
319+
{:ok, job} ->
320+
{:noreply, [{:run, job}], state}
321+
end
322+
end
323+
303324
def handle_cast(
304325
:delete_all,
305326
%State{

pages/runtime.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ Activate an inactive job:
3030
YourApp.Scheduler.activate_job(:ticker)
3131
```
3232

33+
Run a job once outside of normal schedule:
34+
```elixir
35+
YourApp.Scheduler.run_job(:ticker)
36+
```
37+
3338
Find a job:
3439
```elixir
3540
YourApp.Scheduler.find_job(:ticker)

test/quantum/execution_broadcaster_test.exs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ defmodule Quantum.ExecutionBroadcasterTest do
7777
end)
7878
end
7979

80+
test "run_job triggers job to run once", %{producer: producer} do
81+
job = TestScheduler.new_job()
82+
83+
TestProducer.send(producer, {:run, job})
84+
85+
assert_receive {:received, %ExecuteEvent{job: ^job}}
86+
end
87+
8088
test "normal schedule triggers once per second", %{producer: producer} do
8189
job =
8290
TestScheduler.new_job()

test/quantum/job_broadcaster_test.exs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,19 @@ defmodule Quantum.JobBroadcasterTest do
213213
end) =~ "Adding job #Reference"
214214
end
215215

216+
@tag listen_storage: true
217+
test "run", %{broadcaster: broadcaster, active_job: active_job} do
218+
test_id = "run-job-handler"
219+
220+
:ok = attach_telemetry(:run, test_id, self())
221+
222+
TestScheduler.add_job(broadcaster, active_job)
223+
TestScheduler.run_job(broadcaster, active_job.name)
224+
225+
assert_receive {:received, {:add, ^active_job}}
226+
assert_receive {:received, {:run, ^active_job}}
227+
end
228+
216229
@tag listen_storage: true
217230
test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do
218231
test_id = "add-inactive-job-handler"

0 commit comments

Comments
 (0)