Skip to content

Commit fe185b7

Browse files
authored
Allow setting job_id on cron jobs (#293)
* Allow setting job_id on cron jobs * Add tests for cron job singleton
1 parent 7e9198a commit fe185b7

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

arq/cron.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class CronJob:
102102
microsecond: int
103103
run_at_startup: bool
104104
unique: bool
105+
job_id: Optional[str]
105106
timeout_s: Optional[float]
106107
keep_result_s: Optional[float]
107108
keep_result_forever: Optional[bool]
@@ -137,6 +138,7 @@ def cron(
137138
microsecond: int = 123_456,
138139
run_at_startup: bool = False,
139140
unique: bool = True,
141+
job_id: Optional[str] = None,
140142
timeout: Optional[SecondsTimedelta] = None,
141143
keep_result: Optional[float] = 0,
142144
keep_result_forever: Optional[bool] = False,
@@ -159,7 +161,8 @@ def cron(
159161
:param microsecond: microsecond(s) to run the job on,
160162
defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
161163
:param run_at_startup: whether to run as worker starts
162-
:param unique: whether the job should be only be executed once at each time
164+
:param unique: whether the job should only be executed once at each time (useful if you have multiple workers)
165+
:param job_id: ID of the job, can be used to enforce job uniqueness, spanning multiple cron schedules
163166
:param timeout: job timeout
164167
:param keep_result: how long to keep the result for
165168
:param keep_result_forever: whether to keep results forever
@@ -188,6 +191,7 @@ def cron(
188191
microsecond,
189192
run_at_startup,
190193
unique,
194+
job_id,
191195
timeout,
192196
keep_result,
193197
keep_result_forever,

arq/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,10 @@ async def run_cron(self, n: datetime, delay: float, num_windows: int = 2) -> Non
687687
# We queue up the cron if the next execution time is in the next
688688
# delay * num_windows (by default 0.5 * 2 = 1 second).
689689
if cron_job.next_run < this_hb_cutoff:
690-
job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None
690+
if cron_job.job_id:
691+
job_id: Optional[str] = cron_job.job_id
692+
else:
693+
job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None
691694
job_futures.add(
692695
self.pool.enqueue_job(
693696
cron_job.name, _job_id=job_id, _queue_name=self.queue_name, _defer_until=cron_job.next_run

tests/test_cron.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,64 @@ async def try_sleep(ctx):
161161
assert worker.jobs_complete == 1
162162
assert worker.jobs_retried == 1
163163
assert worker.jobs_failed == 0
164+
165+
166+
async def barfoo(ctx):
167+
"""In order to test cron job singleton, we must have two different functions when bursting"""
168+
return 24
169+
170+
171+
async def test_job_custom_id(worker):
172+
"""
173+
Test that two different functions with the same job_id, will only be executed once.
174+
"""
175+
worker: Worker = worker(
176+
cron_jobs=[
177+
cron(barfoo, minute=10, run_at_startup=True, job_id='singleton_job'),
178+
cron(foobar, minute=20, run_at_startup=True, job_id='singleton_job'),
179+
],
180+
poll_delay=0.01,
181+
)
182+
await worker.main()
183+
184+
assert worker.jobs_complete == 1
185+
assert worker.jobs_failed == 0
186+
assert worker.jobs_retried == 0
187+
188+
189+
async def test_job_same_function_different_id(worker):
190+
"""
191+
Set a different ID on the same function job, which runs at startup and ensure both functions are run.
192+
See next test for behaviour without setting job_id.
193+
"""
194+
worker: Worker = worker(
195+
cron_jobs=[
196+
cron(foobar, minute=10, run_at_startup=True, job_id='custom_id'),
197+
cron(foobar, minute=20, run_at_startup=True, job_id='custom_id2'),
198+
],
199+
poll_delay=0.01,
200+
)
201+
await worker.main()
202+
203+
assert worker.jobs_complete == 2
204+
assert worker.jobs_failed == 0
205+
assert worker.jobs_retried == 0
206+
207+
208+
async def test_run_at_startup_no_id_only_runs_once(worker):
209+
"""
210+
Without a custom job ID, and `run_at_startup=True` on two jobs, it will only execute once, since the job_id
211+
will be equal, and should only run once.
212+
"""
213+
worker: Worker = worker(
214+
cron_jobs=[
215+
cron(foobar, minute=10, run_at_startup=True),
216+
cron(foobar, minute=20, run_at_startup=True),
217+
],
218+
poll_delay=0.01,
219+
)
220+
await worker.main()
221+
222+
assert worker.jobs_complete == 1
223+
assert worker.jobs_failed == 0
224+
assert worker.jobs_retried == 0

0 commit comments

Comments
 (0)