@@ -259,7 +259,7 @@ def create_and_enqueue_job(
259259 scheduled_task_id = scheduled_task_id ,
260260 )
261261 if when is None :
262- job_model = self ._enqueue_job (job_model , connection = pipeline , at_front = at_front )
262+ job_model = self .enqueue_job (job_model , connection = pipeline , at_front = at_front )
263263 else :
264264 job_model .save (connection = self .connection )
265265 self .scheduled_job_registry .schedule (self .connection , job_model , when )
@@ -287,7 +287,7 @@ def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str,
287287 exc_string = exc_string
288288 )
289289
290- def run_job (self , job : JobModel ) -> Any :
290+ def run_job (self , job : JobModel ) -> JobModel :
291291 """Run the job
292292 :param job: The job to run
293293 :returns: The job result
@@ -300,22 +300,22 @@ def run_job(self, job: JobModel) -> Any:
300300 self .job_handle_success (job , result = result , result_ttl = result_ttl , connection = pipeline )
301301 job .expire (result_ttl , connection = pipeline )
302302 pipeline .execute ()
303- return result
304- except :
303+ except Exception as e :
304+ logger . warning ( f"Job { job . name } failed with exception: { e } " )
305305 with self .connection .pipeline () as pipeline :
306306 exc_string = "" .join (traceback .format_exception (* sys .exc_info ()))
307307 self .job_handle_failure (JobStatus .FAILED , job , exc_string , pipeline )
308308 pipeline .execute ()
309+ return job
309310
310311 def retry_job (self , job : JobModel , connection : ConnectionType ):
311312 """Requeue or schedule this job for execution.
312313 If the the `retry_interval` was set on the job itself,
313314 it will calculate a scheduled time for the job to run, and instead
314315 of just regularly `enqueing` the job, it will `schedule` it.
315316
316- Args:
317- job (JobModel): The queue to retry the job on
318- connection (ConnectionType): The Redis' pipeline to use
317+ :param job: The job to retry
318+ :param connection: The broker connection to use
319319 """
320320 number_of_intervals = len (job .retry_intervals )
321321 index = max (number_of_intervals - job .retries_left , 0 )
@@ -327,9 +327,9 @@ def retry_job(self, job: JobModel, connection: ConnectionType):
327327 job .save (connection = connection )
328328 self .scheduled_job_registry .schedule (connection , job , scheduled_datetime )
329329 else :
330- self ._enqueue_job (job , connection = connection )
330+ self .enqueue_job (job , connection = connection )
331331
332- def _enqueue_job (
332+ def enqueue_job (
333333 self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
334334 ) -> JobModel :
335335 """Enqueues a job for delayed execution without checking dependencies.
@@ -369,7 +369,7 @@ def run_sync(self, job: JobModel) -> JobModel:
369369 job .prepare_for_execution ("sync" , self .active_job_registry , self .connection )
370370
371371 try :
372- result = self .run_job (job )
372+ self .run_job (job )
373373 except : # noqa
374374 with self .connection .pipeline () as pipeline :
375375 exc_string = "" .join (traceback .format_exception (* sys .exc_info ()))
@@ -503,7 +503,7 @@ def requeue_jobs(self, *job_names: str, at_front: bool = False) -> int:
503503 job .started_at = None
504504 job .ended_at = None
505505 job .save (connection = pipe )
506- self ._enqueue_job (job , connection = pipe , at_front = at_front )
506+ self .enqueue_job (job , connection = pipe , at_front = at_front )
507507 jobs_requeued += 1
508508 pipe .execute ()
509509 return jobs_requeued
0 commit comments