@@ -160,6 +160,40 @@ async def test_job_successful(arq_redis: ArqRedis, worker, caplog):
160160 assert 'X.XXs → testing:foobar()\n X.XXs ← testing:foobar ● 42' in log
161161
162162
163+ async def test_job_retry_race_condition (arq_redis : ArqRedis , worker ):
164+ async def retry_job (ctx ):
165+ if ctx ['job_try' ] == 1 :
166+ raise Retry (defer = 10 )
167+
168+ job_id = 'testing'
169+ await arq_redis .enqueue_job ('retry_job' , _job_id = job_id )
170+
171+ worker_one : Worker = worker (functions = [func (retry_job , name = 'retry_job' )])
172+ worker_two : Worker = worker (functions = [func (retry_job , name = 'retry_job' )])
173+
174+ assert worker_one .jobs_complete == 0
175+ assert worker_one .jobs_failed == 0
176+ assert worker_one .jobs_retried == 0
177+
178+ assert worker_two .jobs_complete == 0
179+ assert worker_two .jobs_failed == 0
180+ assert worker_two .jobs_retried == 0
181+
182+ await worker_one .start_jobs ([job_id .encode ()])
183+ await asyncio .gather (* worker_one .tasks .values ())
184+
185+ await worker_two .start_jobs ([job_id .encode ()])
186+ await asyncio .gather (* worker_two .tasks .values ())
187+
188+ assert worker_one .jobs_complete == 0
189+ assert worker_one .jobs_failed == 0
190+ assert worker_one .jobs_retried == 1
191+
192+ assert worker_two .jobs_complete == 0
193+ assert worker_two .jobs_failed == 0
194+ assert worker_two .jobs_retried == 0
195+
196+
163197async def test_job_successful_no_result_logging (arq_redis : ArqRedis , worker , caplog ):
164198 caplog .set_level (logging .INFO )
165199 await arq_redis .enqueue_job ('foobar' , _job_id = 'testing' )
0 commit comments