Skip to content

Commit 6f68086

Browse files
authored
Fix race condition causing status not_found (#362)
Checking job status consited of three separate calls to Redis, where the first one checked if result is present and the second checked if job is in progress. If a job got completed between these two calls Job.status() would erroneously return status not_found. The solution is to make all the calls to Redis in one transaction.
1 parent 1f91c0b commit 6f68086

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

arq/jobs.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,20 @@ async def status(self) -> JobStatus:
137137
"""
138138
Status of the job.
139139
"""
140-
if await self._redis.exists(result_key_prefix + self.job_id):
140+
async with self._redis.pipeline(transaction=True) as tr:
141+
tr.exists(result_key_prefix + self.job_id) # type: ignore[unused-coroutine]
142+
tr.exists(in_progress_key_prefix + self.job_id) # type: ignore[unused-coroutine]
143+
tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine]
144+
is_complete, is_in_progress, score = await tr.execute()
145+
146+
if is_complete:
141147
return JobStatus.complete
142-
elif await self._redis.exists(in_progress_key_prefix + self.job_id):
148+
elif is_in_progress:
143149
return JobStatus.in_progress
144-
else:
145-
score = await self._redis.zscore(self._queue_name, self.job_id)
146-
if not score:
147-
return JobStatus.not_found
150+
elif score:
148151
return JobStatus.deferred if score > timestamp_ms() else JobStatus.queued
152+
else:
153+
return JobStatus.not_found
149154

150155
async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0.5) -> bool:
151156
"""

0 commit comments

Comments
 (0)