@@ -137,13 +137,14 @@ async def _wait_for_completion(
137137 rabbitmq_rpc_client : RabbitMQRPCClient ,
138138 * ,
139139 rpc_namespace : RPCNamespace ,
140+ method_name : RPCMethodName ,
140141 job_id : AsyncJobId ,
141142 job_id_data : AsyncJobNameData ,
142- client_timeout : int ,
143+ client_timeout : datetime . timedelta ,
143144) -> AsyncGenerator [AsyncJobStatus , None ]:
144145 try :
145146 async for attempt in AsyncRetrying (
146- stop = stop_after_delay (client_timeout ),
147+ stop = stop_after_delay (client_timeout . total_seconds () ),
147148 reraise = True ,
148149 retry = retry_if_exception_type (TryAgain ),
149150 before_sleep = before_sleep_log (_logger , logging .DEBUG ),
@@ -163,7 +164,7 @@ async def _wait_for_completion(
163164
164165 except TryAgain as exc :
165166 # this is a timeout
166- msg = f"Long running task { job_id = } , calling to timed-out after { client_timeout } seconds "
167+ msg = f"Async job { job_id = } , calling to ' { method_name } ' timed-out after { client_timeout } "
167168 raise TimeoutError (msg ) from exc
168169
169170
@@ -201,31 +202,36 @@ async def submit_and_wait(
201202 job_id_data = job_id_data ,
202203 ** kwargs ,
203204 )
205+ job_status : AsyncJobStatus | None = None
204206 async for job_status in _wait_for_completion (
205207 rabbitmq_rpc_client ,
206208 rpc_namespace = rpc_namespace ,
209+ method_name = method_name ,
207210 job_id = async_job_rpc_get .job_id ,
208211 job_id_data = job_id_data ,
209212 client_timeout = client_timeout ,
210213 ):
214+ assert job_status is not None # nosec
211215 yield AsyncJobComposedResult (job_status )
212-
213- assert job_status # nosec
214- yield AsyncJobComposedResult (
215- job_status ,
216- result (
217- rabbitmq_rpc_client ,
218- rpc_namespace = rpc_namespace ,
219- job_id = async_job_rpc_get .job_id ,
220- job_id_data = job_id_data ,
221- ),
222- )
223- except (TimeoutError , CancelledError ):
224- if async_job_rpc_get is not None :
225- await cancel (
226- rabbitmq_rpc_client ,
227- rpc_namespace = rpc_namespace ,
228- job_id = async_job_rpc_get .job_id ,
229- job_id_data = job_id_data ,
216+ if job_status :
217+ yield AsyncJobComposedResult (
218+ job_status ,
219+ result (
220+ rabbitmq_rpc_client ,
221+ rpc_namespace = rpc_namespace ,
222+ job_id = async_job_rpc_get .job_id ,
223+ job_id_data = job_id_data ,
224+ ),
230225 )
226+ except (TimeoutError , CancelledError ) as error :
227+ if async_job_rpc_get is not None :
228+ try :
229+ await cancel (
230+ rabbitmq_rpc_client ,
231+ rpc_namespace = rpc_namespace ,
232+ job_id = async_job_rpc_get .job_id ,
233+ job_id_data = job_id_data ,
234+ )
235+ except Exception as exc :
236+ raise exc from error
231237 raise
0 commit comments