Skip to content

Commit c72fd44

Browse files
committed
include initial unit testing
1 parent d7a87da commit c72fd44

File tree

4 files changed

+76
-7
lines changed

4 files changed

+76
-7
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ def _refresh_bearer_token(self, connection: Connection, *, max_age: float = 60)
674674
else:
675675
_log.warning("Failed to proactively refresh bearer token")
676676

677-
def _process_task_results(
677+
def _process_threadworker_updates(
678678
self,
679679
worker_pool: _JobManagerWorkerThreadPool,
680680
*,
@@ -756,6 +756,9 @@ def on_job_done(self, job: BatchJob, row):
756756
download_dir=job_dir,
757757
)
758758
_log.info(f"Submitting download task {task} to download thread pool")
759+
760+
if self._download_pool is None:
761+
self._download_pool = _JobManagerWorkerThreadPool()
759762
self._download_pool.submit_task(task)
760763

761764
def on_job_error(self, job: BatchJob, row):

openeo/extra/job_management/_thread_worker.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,18 @@ def execute(self) -> _TaskResult:
142142
stats_update={"start_job error": 1},
143143
)
144144

145-
@dataclass(frozen=True)
146145
class _JobDownloadTask(ConnectedTask):
147146
"""
148147
Task for downloading job results and metadata.
149148
150149
:param download_dir:
151150
Root directory where job results and metadata will be downloaded.
152151
"""
153-
download_dir: Path
152+
def __init__(self, download_dir: Path, **kwargs):
153+
super().__init__(**kwargs)
154+
object.__setattr__(self, 'download_dir', download_dir)
154155

155156
def execute(self) -> _TaskResult:
156-
"""
157-
Download job results and metadata.
158-
"""
159157
try:
160158
job = self.get_connection(retry=True).job(self.job_id)
161159

@@ -186,7 +184,7 @@ def execute(self) -> _TaskResult:
186184
db_update={},
187185
stats_update={"job download error": 1},
188186
)
189-
187+
190188
class _JobManagerWorkerThreadPool:
191189
"""
192190
Thread pool-based worker that manages the execution of asynchronous tasks.

tests/extra/job_management/test_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,10 @@ def test_on_job_done_boolean_download(
956956

957957
# Call on_job_done
958958
manager_with_download.on_job_done(job=job, row=row)
959+
960+
# Wait for download task to complete
961+
if manager_with_download._download_pool is not None:
962+
manager_with_download._download_pool.shutdown()
959963

960964
# Verify files were downloaded and directory was created
961965
assert job_dir.exists(), "Job directory should exist when auto_download_results=True"

tests/extra/job_management/test_thread_worker.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
_JobManagerWorkerThreadPool,
1212
_JobStartTask,
1313
_TaskResult,
14+
_JobDownloadTask
1415
)
1516
from openeo.rest._testing import DummyBackend
1617

@@ -288,3 +289,66 @@ def test_job_start_task_failure(self, worker_pool, dummy_backend, caplog):
288289
assert caplog.messages == [
289290
"Failed to start job 'job-000': OpenEoApiError('[500] Internal: No job starting for you, buddy')"
290291
]
292+
293+
def test_download_task_in_pool(self, worker_pool, tmp_path):
294+
# Test that download tasks can be submitted to the thread pool
295+
# without needing actual backend functionality
296+
task = _JobDownloadTask(
297+
job_id="pool-job-123",
298+
df_idx=42,
299+
root_url="https://example.com",
300+
bearer_token="test-token",
301+
download_dir=tmp_path
302+
)
303+
304+
worker_pool.submit_task(task)
305+
results, remaining = worker_pool.process_futures(timeout=1)
306+
307+
# We can't test the actual download result without a backend,
308+
# but we can verify the task was processed
309+
assert len(results) == 1
310+
result = results[0]
311+
assert result.job_id == "pool-job-123"
312+
assert result.df_idx == 42
313+
assert remaining == 0
314+
315+
class TestJobDownloadTask:
316+
def test_download_success(self, tmp_path, caplog):
317+
caplog.set_level(logging.INFO)
318+
319+
# Test the basic functionality without complex backend setup
320+
download_dir = tmp_path / "downloads"
321+
task = _JobDownloadTask(
322+
job_id="test-job-123",
323+
df_idx=0,
324+
root_url="https://example.com",
325+
bearer_token="test-token",
326+
download_dir=download_dir
327+
)
328+
329+
# Since we can't test actual downloads without a real backend,
330+
# we'll test that the task is properly constructed and the directory is handled
331+
assert task.job_id == "test-job-123"
332+
assert task.df_idx == 0
333+
assert task.root_url == "https://example.com"
334+
assert task.download_dir == download_dir
335+
# Token should be hidden in repr
336+
assert "test-token" not in repr(task)
337+
338+
def test_download_failure_handling(self, tmp_path, caplog):
339+
caplog.set_level(logging.ERROR)
340+
341+
# Test that the task properly handles execution context
342+
# We can't easily test actual download failures without complex setup,
343+
# but we can verify the task structure and error handling approach
344+
download_dir = tmp_path / "downloads"
345+
task = _JobDownloadTask(
346+
job_id="failing-job",
347+
df_idx=1,
348+
root_url="https://example.com",
349+
bearer_token="test-token",
350+
download_dir=download_dir
351+
)
352+
353+
# The task should be properly constructed for error handling
354+
assert task.job_id == "failing-job"

0 commit comments

Comments
 (0)