Skip to content

Commit 1fb9842

Browse files
kiriusm2Kirill Matveev
andauthored
Fix jobs timeout (#248)
* Fix jobs timeout #247 * Bump codecov-action to v1.5.2 * Fix test Co-authored-by: Kirill Matveev <[email protected]>
1 parent 759fe03 commit 1fb9842

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

arq/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,10 +519,9 @@ async def job_failed(exc: BaseException) -> None:
519519
logger.info('%6.2fs → %s(%s)%s', (start_ms - enqueue_time_ms) / 1000, ref, s, extra)
520520
self.job_tasks[job_id] = task = self.loop.create_task(function.coroutine(ctx, *args, **kwargs))
521521

522-
cancel_handler = self.loop.call_at(self.loop.time() + timeout_s, task.cancel)
523522
# run repr(result) and extra inside try/except as they can raise exceptions
524523
try:
525-
result = await task
524+
result = await asyncio.wait_for(task, timeout_s)
526525
except (Exception, asyncio.CancelledError) as e:
527526
exc_extra = getattr(e, 'extra', None)
528527
if callable(exc_extra):
@@ -532,7 +531,6 @@ async def job_failed(exc: BaseException) -> None:
532531
result_str = '' if result is None else truncate(repr(result))
533532
finally:
534533
del self.job_tasks[job_id]
535-
cancel_handler.cancel()
536534

537535
except (Exception, asyncio.CancelledError) as e:
538536
finished_ms = timestamp_ms()

tests/test_worker.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,3 +817,21 @@ async def wait_and_abort(job, delay=0.1):
817817
assert worker.aborting_tasks == set()
818818
assert worker.tasks == {}
819819
assert worker.job_tasks == {}
820+
821+
822+
async def test_job_timeout(arq_redis: ArqRedis, worker, caplog):
823+
async def longfunc(ctx):
824+
await asyncio.sleep(0.3)
825+
826+
caplog.set_level(logging.ERROR)
827+
await arq_redis.enqueue_job('longfunc', _job_id='testing')
828+
worker: Worker = worker(functions=[func(longfunc, name='longfunc')], job_timeout=0.2, poll_delay=0.1)
829+
assert worker.jobs_complete == 0
830+
assert worker.jobs_failed == 0
831+
assert worker.jobs_retried == 0
832+
await worker.main()
833+
assert worker.jobs_complete == 0
834+
assert worker.jobs_failed == 1
835+
assert worker.jobs_retried == 0
836+
log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records))
837+
assert 'X.XXs ! testing:longfunc failed, TimeoutError:' in log

0 commit comments

Comments
 (0)