Skip to content

Commit 6252dc1

Browse files
committed
fix:add some logging
1 parent 186d6dd commit 6252dc1

File tree

4 files changed

+24
-32
lines changed

4 files changed

+24
-32
lines changed

scheduler/helpers/queues/queue_logic.py

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,12 @@ def create_and_enqueue_job(
244244
raise TypeError(f"Invalid type for when=`{when}`")
245245
return job_model
246246

247-
def job_handle_success(self, job: JobModel, result: Any, result_ttl: int, connection: ConnectionType):
247+
def job_handle_success(
248+
self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int, connection: ConnectionType
249+
):
248250
"""Saves and cleanup job after successful execution"""
249251
job.after_execution(
250-
result_ttl,
252+
job_info_ttl,
251253
JobStatus.FINISHED,
252254
prev_registry=self.active_job_registry,
253255
new_registry=self.finished_job_registry,
@@ -280,40 +282,26 @@ def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str,
280282
exc_string=exc_string,
281283
)
282284

283-
def run_job(self, job: JobModel) -> JobModel:
284-
"""Run the job
285-
:param job: The job to run
286-
:returns: The job result
287-
"""
285+
def run_sync(self, job: JobModel) -> JobModel:
286+
"""Run a job synchronously, meaning on the same process the method was called."""
287+
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
288288
try:
289289
result = perform_job(job, self.connection)
290290

291-
result_ttl = job.success_ttl
292291
with self.connection.pipeline() as pipeline:
293-
self.job_handle_success(job, result=result, result_ttl=result_ttl, connection=pipeline)
294-
job.expire(result_ttl, connection=pipeline)
292+
self.job_handle_success(
293+
job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl, connection=pipeline
294+
)
295+
295296
pipeline.execute()
296-
except Exception as e:
297+
except Exception as e: # noqa
297298
logger.warning(f"Job {job.name} failed with exception: {e}")
298299
with self.connection.pipeline() as pipeline:
299300
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
300301
self.job_handle_failure(JobStatus.FAILED, job, exc_string, pipeline)
301302
pipeline.execute()
302303
return job
303304

304-
def run_sync(self, job: JobModel) -> JobModel:
305-
"""Run a job synchronously, meaning on the same process the method was called."""
306-
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
307-
308-
try:
309-
self.run_job(job)
310-
except: # noqa
311-
with self.connection.pipeline() as pipeline:
312-
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
313-
self.job_handle_failure(JobStatus.FAILED, job, exc_string, pipeline)
314-
pipeline.execute()
315-
return job
316-
317305
@classmethod
318306
def dequeue_any(
319307
cls,

scheduler/redis_models/job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, co
126126

127127
def after_execution(
128128
self,
129-
result_ttl: int,
129+
job_info_ttl: int,
130130
status: JobStatus,
131131
connection: ConnectionType,
132132
prev_registry: Optional[JobNamesRegistry] = None,
@@ -138,8 +138,8 @@ def after_execution(
138138
self.last_heartbeat = self.ended_at
139139
if prev_registry is not None:
140140
prev_registry.delete(connection, self.name)
141-
if new_registry is not None and result_ttl != 0:
142-
new_registry.add(connection, self.name, current_timestamp() + result_ttl)
141+
if new_registry is not None and job_info_ttl != 0:
142+
new_registry.add(connection, self.name, current_timestamp() + job_info_ttl)
143143
self.save(connection=connection)
144144

145145
@property

scheduler/tests/test_views/test_job_detail_action.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ def test_single_job_action_enqueue_job_sync_queue(self):
8888
job = queue.create_and_enqueue_job(test_job)
8989
job_list.append(job)
9090

91-
# This job is deferred
92-
93-
self.assertEqual(job_list[-1].get_status(connection=queue.connection), JobStatus.FAILED)
91+
self.assertEqual(job_list[-1].status, JobStatus.FINISHED)
9492
self.assertIsNotNone(job_list[-1].enqueued_at)
9593

9694
# Try to force enqueue last job should do nothing
@@ -102,3 +100,4 @@ def test_single_job_action_enqueue_job_sync_queue(self):
102100
tmp = JobModel.get(job_list[-1].name, connection=queue.connection)
103101
self.assertEqual(tmp.get_status(connection=queue.connection), JobStatus.FINISHED)
104102
self.assertIsNotNone(tmp.enqueued_at)
103+
self.assertGreater(tmp.enqueued_at, job_list[-1].enqueued_at)

scheduler/worker/worker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,13 @@ def handle_job_success(self, job: JobModel, return_value: Any, queue: Queue):
792792
with self.connection.pipeline() as pipeline:
793793
while True:
794794
try:
795-
queue.job_handle_success(job, result=return_value, result_ttl=job.success_ttl, connection=pipeline)
795+
queue.job_handle_success(
796+
job,
797+
result=return_value,
798+
job_info_ttl=job.job_info_ttl,
799+
result_ttl=job.success_ttl,
800+
connection=pipeline,
801+
)
796802
self._model.current_job_name = None
797803
self._model.successful_job_count += 1
798804
self._model.completed_jobs += 1
@@ -801,7 +807,6 @@ def handle_job_success(self, job: JobModel, return_value: Any, queue: Queue):
801807

802808
job.expire(job.success_ttl, connection=pipeline)
803809
logger.debug(f"[Worker {self.name}/{self._pid}]: Removing job {job.name} from active_job_registry")
804-
queue.active_job_registry.delete(pipeline, job.name)
805810
pipeline.execute()
806811
logger.debug(
807812
f"[Worker {self.name}/{self._pid}]: Finished handling successful execution of job {job.name}"

0 commit comments

Comments
 (0)