Skip to content

Commit c4bba22

Browse files
authored
Merge branch 'master' into save_safety
2 parents ee0486b + 8fb4b6e commit c4bba22

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

scheduler/redis_models/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def get(cls, name: str, connection: ConnectionType) -> Optional[Self]:
176176
try:
177177
return cls.deserialize(decode_dict(res, set()))
178178
except Exception as e:
179-
logger.warning(f"Failed to deserialize {name}: {e}")
179+
logger.warning(f"Failed to deserialize {name}: {e}", exc_info=True)
180180
return None
181181

182182
@classmethod

scheduler/redis_models/job.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, co
122122
before execution begins.
123123
:param worker_name: The name of the worker
124124
:param registry: The registry to add the job to
125-
:param current_pid: The current process id
126125
:param connection: The connection to the broker
127126
"""
128127
self.worker_name = worker_name

scheduler/worker/worker.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ def handle_job_failure(self, job: JobModel, queue: Queue, exc_string: str = "")
281281
if job.status == JobStatus.FAILED:
282282
self._model.failed_job_count += 1
283283
self._model.completed_jobs += 1
284-
if job.started_at and job.ended_at:
284+
if job.started_at is not None and job.ended_at is not None:
285285
self._model.total_working_time_ms += (job.ended_at - job.started_at).microseconds / 1000.0
286286
self._model.save(connection=queue.connection)
287287

@@ -613,9 +613,11 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
613613
break
614614
except JobExecutionMonitorTimeoutException:
615615
# job execution process has not exited yet and is still running. Send a heartbeat to keep the worker alive.
616-
working_time = (utcnow() - job.started_at).total_seconds()
617-
self._model.set_current_job_working_time(working_time, self.connection)
618-
616+
if job.started_at is not None:
617+
working_time = (utcnow() - job.started_at).total_seconds()
618+
self._model.set_current_job_working_time(working_time, self.connection)
619+
else:
620+
logger.warning("[Worker {self.name}/{self._pid}]: job.started_at is None, cannot set working time")
619621
# Kill the job from this side if something is really wrong (interpreter lock/etc).
620622
if job.timeout != -1 and self._model.current_job_working_time > (job.timeout + 60):
621623
self._model.heartbeat(self.connection, self.job_monitoring_interval + 60)
@@ -703,7 +705,7 @@ def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
703705
random.seed()
704706
self.setup_job_execution_process_signals()
705707
self._is_job_execution_process = True
706-
job = JobModel.get(job.name, self.connection)
708+
job = JobModel.get(job.name, queue.connection)
707709
try:
708710
self.perform_job(job, queue)
709711
except: # noqa

0 commit comments

Comments
 (0)