Skip to content

Commit 3234f4f

Browse files
committed
working on unit tests
1 parent 597997f commit 3234f4f

File tree

2 files changed

+248
-137
lines changed

2 files changed

+248
-137
lines changed

openeo/extra/job_management/_thread_worker.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def __init__(self, max_workers: int = 1, name: str = 'default'):
202202
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
203203
self._future_task_pairs: List[Tuple[concurrent.futures.Future, Task]] = []
204204
self._name = name
205+
self._max_workers = max_workers
205206

206207
def submit_task(self, task: Task) -> None:
207208
"""
@@ -272,31 +273,34 @@ class _JobManagerWorkerThreadPool:
272273

273274
def __init__(self, pool_configs: Optional[Dict[str, int]] = None):
274275
"""
275-
:param pool_configs: Dict of task_class_name -> max_workers
276-
Example: {"_JobStartTask": 1, "_JobDownloadTask": 2}
276+
:param pool_configs: Dict of pool_name -> max_workers
277+
Example: {"job_start": 1, "download": 2}
277278
"""
278279
self._pools: Dict[str, _TaskThreadPool] = {}
279280
self._pool_configs = pool_configs or {}
280-
281-
def _get_pool_name_for_task(self, task: Task) -> str:
282-
"""
283-
Get pool name from task class name.
284-
"""
285-
return task.__class__.__name__
281+
282+
# Create all pools upfront from config
283+
for pool_name, max_workers in self._pool_configs.items():
284+
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers)
285+
_log.info(f"Created pool '{pool_name}' with {max_workers} workers")
286286

287287
def submit_task(self, task: Task, pool_name: str = "default") -> None:
288288
"""
289289
Submit a task to a specific pool.
290-
Creates pool dynamically if it doesn't exist.
291-
292-
:param task: The task to execute
293-
:param pool_name: Which pool to use (default, download, etc.)
290+
Creates pool dynamically only if not in config.
294291
"""
295292
if pool_name not in self._pools:
296-
# Create pool on-demand
297-
max_workers = self._pool_configs.get(pool_name, 1) # Default 1 worker
298-
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers)
299-
_log.info(f"Created pool '{pool_name}' with {max_workers} workers")
293+
# Check if pool_name is in config but somehow wasn't created
294+
if pool_name in self._pool_configs:
295+
# This shouldn't happen, but create it
296+
max_workers = self._pool_configs[pool_name]
297+
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers, name=pool_name)
298+
_log.warning(f"Created missing pool '{pool_name}' from config with {max_workers} workers")
299+
else:
300+
# Not in config - create with default
301+
max_workers = 1
302+
self._pools[pool_name] = _TaskThreadPool(max_workers=max_workers, name=pool_name)
303+
_log.info(f"Created dynamic pool '{pool_name}' with {max_workers} workers")
300304

301305
self._pools[pool_name].submit_task(task)
302306

@@ -329,10 +333,16 @@ def shutdown(self, pool_name: Optional[str] = None) -> None:
329333
if pool_name in self._pools:
330334
self._pools[pool_name].shutdown()
331335
del self._pools[pool_name]
336+
if pool_name in self._pool_configs:
337+
del self._pool_configs[pool_name]
332338
else:
333339
for pool_name, pool in list(self._pools.items()):
334340
pool.shutdown()
335341
del self._pools[pool_name]
342+
if pool_name in self._pool_configs:
343+
del self._pool_configs[pool_name]
344+
345+
self._pool_configs.clear()
336346

337347
def list_pools(self) -> List[str]:
338348
"""List all active pool names."""

0 commit comments

Comments
 (0)