Skip to content

Commit 63d742c

Browse files
committed
abstract task name in thread pool
1 parent 1f027cd commit 63d742c

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
658658
df_idx=i,
659659
)
660660
_log.info(f"Submitting task {task} to thread pool")
661-
self._worker_pool.submit_task(task)
661+
self._worker_pool.submit_task(task=task, pool_name="job_start")
662662

663663
stats["job_queued_for_start"] += 1
664664
df.loc[i, "status"] = "queued_for_start"
@@ -771,7 +771,7 @@ def on_job_done(self, job: BatchJob, row):
771771
if self._worker_pool is None:
772772
self._worker_pool = _JobManagerWorkerThreadPool()
773773

774-
self._worker_pool.submit_task(task)
774+
self._worker_pool.submit_task(task=task, pool_name="job_download")
775775

776776
def on_job_error(self, job: BatchJob, row):
777777
"""

openeo/extra/job_management/_thread_worker.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ class _JobManagerWorkerThreadPool:
268268

269269
"""
270270
Generic wrapper that manages multiple thread pools with a dict.
271-
Uses task class names as pool names automatically.
272271
"""
273272

274273
def __init__(self, pool_configs: Optional[Dict[str, int]] = None):
@@ -285,17 +284,18 @@ def _get_pool_name_for_task(self, task: Task) -> str:
285284
"""
286285
return task.__class__.__name__
287286

288-
def submit_task(self, task: Task) -> None:
287+
def submit_task(self, task: Task, pool_name: str = "default") -> None:
289288
"""
290-
Submit a task to a pool named after its class.
289+
Submit a task to a specific pool.
291290
Creates pool dynamically if it doesn't exist.
292-
"""
293-
pool_name = self._get_pool_name_for_task(task)
294291
292+
:param task: The task to execute
293+
:param pool_name: Which pool to use (default, download, etc.)
294+
"""
295295
if pool_name not in self._pools:
296296
# Create pool on-demand
297297
max_workers = self._pool_configs.get(pool_name, 1) # Default 1 worker
298-
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers, name=pool_name)
298+
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers)
299299
_log.info(f"Created pool '{pool_name}' with {max_workers} workers")
300300

301301
self._pools[pool_name].submit_task(task)

0 commit comments

Comments
 (0)