Skip to content

Commit 789b83c

Browse files
committed
refactoring
1 parent f4c5dad commit 789b83c

File tree

1 file changed

+25
-25
lines changed

1 file changed

+25
-25
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
Optional,
2222
Union,
2323
)
24-
import os
24+
2525
import numpy
2626
import pandas as pd
2727
import requests
@@ -492,7 +492,7 @@ def run_jobs(
492492
# TODO: support user-provided `stats`
493493
stats = collections.defaultdict(int)
494494

495-
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running", "downloading"]).values()) > 0:
495+
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
496496
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
497497
stats["run_jobs loop"] += 1
498498

@@ -523,7 +523,7 @@ def _job_update_loop(
523523
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
524524
if len(not_started) > 0:
525525
# Check number of jobs running at each backend
526-
running = job_db.get_by_status(statuses=["created", "queued", "running"]) #TODO I believe we need to get downloading out?
526+
running = job_db.get_by_status(statuses=["created", "queued", "running"])
527527
stats["job_db get_by_status"] += 1
528528
per_backend = running.groupby("backend_name").size().to_dict()
529529
_log.info(f"Running per backend: {per_backend}")
@@ -617,21 +617,32 @@ def on_job_done(self, job: BatchJob, row):
617617
:param job: The job that has finished.
618618
:param row: DataFrame row containing the job's metadata.
619619
"""
620+
_log.info(f"Job {job.job_id} completed. Preparing to handle completion.")
621+
620622
job_metadata = job.describe()
621623
job_dir = self.get_job_dir(job.job_id)
622624
metadata_path = self.get_job_metadata_path(job.job_id)
623625
self.ensure_job_dir_exists(job.job_id)
624626

625-
# Start download in a separate thread
626-
downloader = Thread(target=lambda: (
627-
self._job_download(job, job_dir, row) # Invoke the download logic directly
628-
))
629-
downloader.start()
630-
631-
# Write the job metadata to a file
627+
# Save metadata
628+
_log.info(f"Saving metadata for job {job.job_id} to {metadata_path}")
632629
with metadata_path.open("w", encoding="utf-8") as f:
633630
json.dump(job_metadata, f, ensure_ascii=False)
634631

632+
# Define download logic inline
633+
def download_task():
634+
try:
635+
_log.info(f"Starting download for job {job.job_id} to directory {job_dir}")
636+
job.get_results().download_files(target=job_dir)
637+
_log.info(f"Successfully downloaded job {job.job_id} results to {job_dir}")
638+
except Exception as e:
639+
_log.error(f"Error downloading job {job.job_id}: {e}")
640+
641+
# Start the download in a separate thread
642+
_log.info(f"Starting download thread for job {job.job_id}")
643+
downloader = Thread(target=download_task, daemon=True)
644+
downloader.start()
645+
635646
def _job_download(self, job, job_dir, row):
636647
"""
637648
Download the job's results and update the job status after the download completes.
@@ -713,15 +724,14 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
713724
if not job_dir.exists():
714725
job_dir.mkdir(parents=True)
715726

716-
717727
def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None):
718728
"""
719729
Tracks status (and stats) of running jobs (in place).
720730
Optionally cancels jobs when running too long.
721731
"""
722732
stats = stats if stats is not None else collections.defaultdict(int)
723733

724-
active = job_db.get_by_status(statuses=["created", "queued", "running", "downloading"]).copy()
734+
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
725735
for i in active.index:
726736
job_id = active.loc[i, "id"]
727737
backend_name = active.loc[i, "backend_name"]
@@ -738,19 +748,9 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
738748
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
739749
)
740750

741-
742-
#---------------------------------------
743-
744-
if new_status == "finished" and previous_status != "downloading":
745-
new_status = "downloading"
746-
self.on_job_done(the_job, active.loc[i])
747-
748-
if previous_status == "downloading":
749-
if self.get_job_metadata_path(job_id).exists():
750-
new_status = "finished"
751-
stats["job finished"] += 1
752-
else:
753-
new_status = "downloading"
751+
if new_status == "finished":
752+
stats["job finished"] += 1
753+
self.on_job_done(the_job, active.loc[i])
754754

755755
if previous_status != "error" and new_status == "error":
756756
stats["job failed"] += 1

0 commit comments

Comments
 (0)