Skip to content

Commit 2973bee

Browse files
committed
adding easy callback to check number of pending tasks on thread workers; this can be used to guarantee the download gets finished
1 parent d67fdd6 commit 2973bee

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,14 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):
400400
self._download_pool.shutdown()
401401
self._download_pool = None
402402

403+
if self._download_pool is not None:
404+
# Wait for downloads to complete before shutting down
405+
_log.info("Waiting for download tasks to complete before stopping...")
406+
while self._download_pool.num_pending_tasks() > 0:
407+
time.sleep(0.5)
408+
self._download_pool.shutdown()
409+
self._download_pool = None
410+
403411
if self._thread is not None:
404412
self._stop_thread = True
405413
if timeout_seconds is _UNSET:
@@ -513,6 +521,8 @@ def run_jobs(
513521
).values()
514522
)
515523
> 0
524+
525+
or self._worker_pool.num_pending_tasks() > 0
516526
):
517527
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
518528
stats["run_jobs loop"] += 1
@@ -523,7 +533,20 @@ def run_jobs(
523533
stats["sleep"] += 1
524534

525535
# TODO; run post process after shutdown once more to ensure completion?
526-
self.stop_job_thread()
536+
# Wait for all download tasks to complete
537+
if self._download_results and self._download_pool is not None:
538+
_log.info("Waiting for download tasks to complete...")
539+
while self._download_pool.num_pending_tasks() > 0:
540+
self._process_threadworker_updates(
541+
worker_pool=self._download_pool,
542+
job_db=job_db,
543+
stats=stats
544+
)
545+
time.sleep(1) # Brief pause to avoid busy waiting
546+
_log.info("All download tasks completed.")
547+
548+
self._worker_pool.shutdown()
549+
self._download_pool.shutdown()
527550

528551
return stats
529552

openeo/extra/job_management/_thread_worker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ def process_futures(self, timeout: Union[float, None] = 0) -> Tuple[List[_TaskRe
251251

252252
self._future_task_pairs = to_keep
253253
return results, len(to_keep)
254+
255+
def num_pending_tasks(self) -> int:
256+
"""Return the number of tasks that are still pending (not completed)."""
257+
return len(self._future_task_pairs)
254258

255259
def shutdown(self) -> None:
256260
"""Shuts down the thread pool gracefully."""

0 commit comments

Comments
 (0)