|
35 | 35 |
|
36 | 36 | from scheduler.helpers.queues import Queue, perform_job
|
37 | 37 | from scheduler.helpers.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
|
38 |
| -from scheduler.helpers.utils import utcnow, current_timestamp |
| 38 | +from scheduler.helpers.utils import utcnow |
39 | 39 |
|
40 | 40 | try:
|
41 | 41 | from setproctitle import setproctitle as setprocname
|
@@ -625,7 +625,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
|
625 | 625 | self.wait_for_job_execution_process()
|
626 | 626 | break
|
627 | 627 |
|
628 |
| - self.maintain_heartbeats(job, queue) |
| 628 | + self._model.heartbeat(self.connection, self.job_monitoring_interval + 60) |
629 | 629 |
|
630 | 630 | except OSError as e:
|
631 | 631 | # In case we encountered an OSError due to EINTR (which is
|
@@ -685,17 +685,6 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
|
685 | 685 | self.perform_job(job, queue)
|
686 | 686 | self._model.set_field("state", WorkerStatus.IDLE, connection=self.connection)
|
687 | 687 |
|
688 |
| - def maintain_heartbeats(self, job: JobModel, queue: Queue) -> None: |
689 |
| - """Updates worker and job's last heartbeat field.""" |
690 |
| - with self.connection.pipeline() as pipeline: |
691 |
| - self._model.heartbeat(pipeline, self.job_monitoring_interval + 60) |
692 |
| - ttl = self.get_heartbeat_ttl(job) |
693 |
| - |
694 |
| - queue.active_job_registry.add(pipeline, self.name, current_timestamp() + ttl, update_existing_only=False) |
695 |
| - results = pipeline.execute() |
696 |
| - if results[2] == 1: |
697 |
| - job.delete(self.connection) |
698 |
| - |
699 | 688 | def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
|
700 | 689 | """This is the entry point of the newly spawned job execution process.
|
701 | 690 | After fork()'ing, assure we are generating random sequences that are different from the worker.
|
@@ -785,10 +774,8 @@ def perform_job(self, job: JobModel, queue: Queue) -> bool:
|
785 | 774 | logger.debug(f"[Worker {self.name}/{self._pid}]: Performing {job.name} code.")
|
786 | 775 |
|
787 | 776 | try:
|
788 |
| - with self.connection.pipeline() as pipeline: |
789 |
| - self.worker_before_execution(job, connection=pipeline) |
790 |
| - job.prepare_for_execution(self.name, queue.active_job_registry, connection=pipeline) |
791 |
| - pipeline.execute() |
| 777 | + self.worker_before_execution(job, connection=queue.connection) |
| 778 | + job.prepare_for_execution(self.name, queue.active_job_registry, connection=queue.connection) |
792 | 779 | timeout = job.timeout or SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT
|
793 | 780 | with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(timeout, JobTimeoutException, job_name=job.name):
|
794 | 781 | logger.debug(f"[Worker {self.name}/{self._pid}]: Performing job `{job.name}`...")
|
|
0 commit comments