Skip to content

Commit e9452c5

Browse files
Adds after_job_end hook (#355)
* Adds after_job_end hook * Fixes test
1 parent 6f68086 commit e9452c5

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

arq/worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class Worker:
150150
:param on_shutdown: coroutine function to run at shutdown
151151
:param on_job_start: coroutine function to run on job start
152152
:param on_job_end: coroutine function to run on job end
153+
:param after_job_end: coroutine function to run after job has ended and results have been recorded
153154
:param handle_signals: default true, register signal handlers,
154155
set to false when running inside other async framework
155156
:param max_jobs: maximum number of jobs to run at a time
@@ -189,6 +190,7 @@ def __init__(
189190
on_shutdown: Optional['StartupShutdown'] = None,
190191
on_job_start: Optional['StartupShutdown'] = None,
191192
on_job_end: Optional['StartupShutdown'] = None,
193+
after_job_end: Optional['StartupShutdown'] = None,
192194
handle_signals: bool = True,
193195
max_jobs: int = 10,
194196
job_timeout: 'SecondsTimedelta' = 300,
@@ -227,6 +229,7 @@ def __init__(
227229
self.on_shutdown = on_shutdown
228230
self.on_job_start = on_job_start
229231
self.on_job_end = on_job_end
232+
self.after_job_end = after_job_end
230233
self.sem = asyncio.BoundedSemaphore(max_jobs)
231234
self.job_timeout_s = to_seconds(job_timeout)
232235
self.keep_result_s = to_seconds(keep_result)
@@ -635,6 +638,9 @@ async def job_failed(exc: BaseException) -> None:
635638
)
636639
)
637640

641+
if self.after_job_end:
642+
await self.after_job_end(ctx)
643+
638644
async def finish_job(
639645
self,
640646
job_id: str,

tests/test_worker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,10 @@ async def on_end(ctx):
920920
assert ctx['job_id'] == 'testing'
921921
result['called'] += 1
922922

923+
async def after_end(ctx):
924+
assert ctx['job_id'] == 'testing'
925+
result['called'] += 2
926+
923927
async def test(ctx):
924928
return
925929

@@ -928,6 +932,7 @@ async def test(ctx):
928932
functions=[func(test, name='func')],
929933
on_job_start=on_start,
930934
on_job_end=on_end,
935+
after_job_end=after_end,
931936
job_timeout=0.2,
932937
poll_delay=0.1,
933938
)
@@ -939,7 +944,7 @@ async def test(ctx):
939944
assert worker.jobs_complete == 1
940945
assert worker.jobs_failed == 0
941946
assert worker.jobs_retried == 0
942-
assert result['called'] == 2
947+
assert result['called'] == 4
943948

944949

945950
async def test_worker_timezone_defaults_to_system_timezone(worker):

0 commit comments

Comments
 (0)