|
2 | 2 | import threading |
3 | 3 | import time |
4 | 4 | from dataclasses import dataclass |
5 | | -from typing import Iterator, Dict, Any |
| 5 | +from typing import Iterator |
6 | 6 | from pathlib import Path |
7 | | -import json |
| 7 | +import tempfile |
| 8 | +from requests_mock import Mocker |
8 | 9 |
|
9 | 10 | import pytest |
10 | 11 |
|
@@ -82,6 +83,103 @@ def test_hide_token(self, serializer): |
82 | 83 | assert "job-123" in serialized |
83 | 84 | assert secret not in serialized |
84 | 85 |
|
| 86 | +class TestJobDownloadTask: |
| 87 | + |
| 88 | + # Use a temporary directory for safe file handling |
| 89 | + @pytest.fixture |
| 90 | + def temp_dir(self): |
| 91 | + with tempfile.TemporaryDirectory() as temp_dir: |
| 92 | + yield Path(temp_dir) |
| 93 | + |
| 94 | + def test_job_download_success(self, requests_mock: Mocker, temp_dir: Path): |
| 95 | + """ |
| 96 | + Test a successful job download and verify file content and stats update. |
| 97 | + """ |
| 98 | + job_id = "job-007" |
| 99 | + df_idx = 42 |
| 100 | + |
| 101 | + # We set up a dummy backend to simulate the job results and assert the expected calls are triggered |
| 102 | + backend = DummyBackend.at_url("https://openeo.dummy.test/", requests_mock=requests_mock) |
| 103 | + backend.next_result = b"The downloaded file content." |
| 104 | + backend.batch_jobs[job_id] = {"job_id": job_id, "pg": {}, "status": "created"} |
| 105 | + |
| 106 | + backend._set_job_status(job_id=job_id, status="finished") |
| 107 | + backend.batch_jobs[job_id]["status"] = "finished" |
| 108 | + |
| 109 | + download_dir = temp_dir / job_id / "results" |
| 110 | + download_dir.mkdir(parents=True) |
| 111 | + |
| 112 | + # Create the task instance |
| 113 | + task = _JobDownloadTask( |
| 114 | + root_url="https://openeo.dummy.test/", |
| 115 | + bearer_token="dummy-token-7", |
| 116 | + job_id=job_id, |
| 117 | + df_idx=df_idx, |
| 118 | + download_dir=download_dir, |
| 119 | + ) |
| 120 | + |
| 121 | + # Execute the task |
| 122 | + result = task.execute() |
| 123 | + |
| 124 | + # Verify TaskResult structure |
| 125 | + assert isinstance(result, _TaskResult) |
| 126 | + assert result.job_id == job_id |
| 127 | + assert result.df_idx == df_idx |
| 128 | + |
| 129 | + # Verify stats update for the MultiBackendJobManager |
| 130 | + assert result.stats_update == {"job download": 1} |
| 131 | + |
| 132 | + # Verify download content (crucial part of the unit test) |
| 133 | + downloaded_file = download_dir / "result.data" |
| 134 | + assert downloaded_file.exists() |
| 135 | + assert downloaded_file.read_bytes() == b"The downloaded file content." |
| 136 | + |
| 137 | + |
| 138 | + def test_job_download_failure(self, requests_mock: Mocker, temp_dir: Path): |
| 139 | + """ |
| 140 | + Test a failed download (e.g., bad connection) and verify error reporting. |
| 141 | + """ |
| 142 | + job_id = "job-008" |
| 143 | + df_idx = 55 |
| 144 | + |
| 145 | + # Set up dummy backend to simulate failure during results listing |
| 146 | + backend = DummyBackend.at_url("https://openeo.dummy.test/", requests_mock=requests_mock) |
| 147 | + |
| 148 | + #simulate and error when downloading the results |
| 149 | + requests_mock.get( |
| 150 | + f"https://openeo.dummy.test/jobs/{job_id}/results", |
| 151 | + status_code=500, |
| 152 | + json={"code": "InternalError", "message": "Failed to list results"}) |
| 153 | + |
| 154 | + backend.batch_jobs[job_id] = {"job_id": job_id, "pg": {}, "status": "created"} |
| 155 | + backend._set_job_status(job_id=job_id, status="finished") |
| 156 | + backend.batch_jobs[job_id]["finished"] = "error" |
| 157 | + |
| 158 | + download_dir = temp_dir / job_id / "results" |
| 159 | + download_dir.mkdir(parents=True) |
| 160 | + |
| 161 | + # Create the task instance |
| 162 | + task = _JobDownloadTask( |
| 163 | + root_url="https://openeo.dummy.test/", |
| 164 | + bearer_token="dummy-token-8", |
| 165 | + job_id=job_id, |
| 166 | + df_idx=df_idx, |
| 167 | + download_dir=download_dir, |
| 168 | + ) |
| 169 | + |
| 170 | + # Execute the task |
| 171 | + result = task.execute() |
| 172 | + |
| 173 | + # Verify TaskResult structure |
| 174 | + assert isinstance(result, _TaskResult) |
| 175 | + assert result.job_id == job_id |
| 176 | + assert result.df_idx == df_idx |
| 177 | + |
| 178 | + # Verify stats update for the MultiBackendJobManager |
| 179 | + assert result.stats_update == {"job download error": 1} |
| 180 | + |
| 181 | + # Verify no file was created (or only empty/failed files) |
| 182 | + assert not any(p.is_file() for p in download_dir.glob("*")) |
85 | 183 |
|
86 | 184 | class NopTask(Task): |
87 | 185 | """Do Nothing""" |
@@ -293,117 +391,3 @@ def test_job_start_task_failure(self, worker_pool, dummy_backend, caplog): |
293 | 391 | ] |
294 | 392 |
|
295 | 393 |
|
296 | | - |
297 | | -import tempfile |
298 | | -from requests_mock import Mocker |
299 | | -OPENEO_BACKEND = "https://openeo.dummy.test/" |
300 | | - |
301 | | -class TestJobDownloadTask: |
302 | | - |
303 | | - # Use a temporary directory for safe file handling |
304 | | - @pytest.fixture |
305 | | - def temp_dir(self): |
306 | | - with tempfile.TemporaryDirectory() as temp_dir: |
307 | | - yield Path(temp_dir) |
308 | | - |
309 | | - def test_job_download_success(self, requests_mock: Mocker, temp_dir: Path): |
310 | | - """ |
311 | | - Test a successful job download and verify file content and stats update. |
312 | | - """ |
313 | | - job_id = "job-007" |
314 | | - df_idx = 42 |
315 | | - |
316 | | - # Setup Dummy Backend |
317 | | - backend = DummyBackend.at_url(OPENEO_BACKEND, requests_mock=requests_mock) |
318 | | - backend.next_result = b"The downloaded file content." |
319 | | - |
320 | | - # Pre-set job status to "finished" so the download link is available |
321 | | - backend.batch_jobs[job_id] = {"job_id": job_id, "pg": {}, "status": "created"} |
322 | | - |
323 | | - # Need to ensure job status is "finished" for download attempt to occur |
324 | | - backend._set_job_status(job_id=job_id, status="finished") |
325 | | - backend.batch_jobs[job_id]["status"] = "finished" |
326 | | - |
327 | | - download_dir = temp_dir / job_id / "results" |
328 | | - download_dir.mkdir(parents=True) |
329 | | - |
330 | | - # Create the task instance |
331 | | - task = _JobDownloadTask( |
332 | | - root_url=OPENEO_BACKEND, |
333 | | - bearer_token="dummy-token-7", |
334 | | - job_id=job_id, |
335 | | - df_idx=df_idx, |
336 | | - download_dir=download_dir, |
337 | | - ) |
338 | | - |
339 | | - # Execute the task |
340 | | - result = task.execute() |
341 | | - |
342 | | - # 4. Assertions |
343 | | - |
344 | | - # A. Verify TaskResult structure |
345 | | - assert isinstance(result, _TaskResult) |
346 | | - assert result.job_id == job_id |
347 | | - assert result.df_idx == df_idx |
348 | | - |
349 | | - # B. Verify stats update for the MultiBackendJobManager |
350 | | - assert result.stats_update == {"job download": 1} |
351 | | - |
352 | | - # C. Verify download content (crucial part of the unit test) |
353 | | - downloaded_file = download_dir / "result.data" |
354 | | - assert downloaded_file.exists() |
355 | | - assert downloaded_file.read_bytes() == b"The downloaded file content." |
356 | | - |
357 | | - # Verify backend interaction |
358 | | - get_results_calls = [c for c in requests_mock.request_history if c.method == "GET" and f"/jobs/{job_id}/results" in c.url] |
359 | | - assert len(get_results_calls) >= 1 |
360 | | - get_asset_calls = [c for c in requests_mock.request_history if c.method == "GET" and f"/jobs/{job_id}/results/result.data" in c.url] |
361 | | - assert len(get_asset_calls) == 1 |
362 | | - |
363 | | - def test_job_download_failure(self, requests_mock: Mocker, temp_dir: Path): |
364 | | - """ |
365 | | - Test a failed download (e.g., bad connection) and verify error reporting. |
366 | | - """ |
367 | | - job_id = "job-008" |
368 | | - df_idx = 55 |
369 | | - |
370 | | - # Need to ensure job status is "finished" for download attempt to occur |
371 | | - backend = DummyBackend.at_url(OPENEO_BACKEND, requests_mock=requests_mock) |
372 | | - |
373 | | - requests_mock.get( |
374 | | - f"{OPENEO_BACKEND}jobs/{job_id}/results", |
375 | | - status_code=500, |
376 | | - json={"code": "InternalError", "message": "Failed to list results"}) |
377 | | - |
378 | | - backend.batch_jobs[job_id] = {"job_id": job_id, "pg": {}, "status": "created"} |
379 | | - |
380 | | - # Need to ensure job status is "finished" for download attempt to occur |
381 | | - backend._set_job_status(job_id=job_id, status="finished") |
382 | | - backend.batch_jobs[job_id]["status"] = "finished" |
383 | | - |
384 | | - download_dir = temp_dir / job_id / "results" |
385 | | - download_dir.mkdir(parents=True) |
386 | | - |
387 | | - # Create the task instance |
388 | | - task = _JobDownloadTask( |
389 | | - root_url=OPENEO_BACKEND, |
390 | | - bearer_token="dummy-token-8", |
391 | | - job_id=job_id, |
392 | | - df_idx=df_idx, |
393 | | - download_dir=download_dir, |
394 | | - ) |
395 | | - |
396 | | - # Execute the task |
397 | | - result = task.execute() |
398 | | - |
399 | | - # Verify TaskResult structure |
400 | | - assert isinstance(result, _TaskResult) |
401 | | - assert result.job_id == job_id |
402 | | - assert result.df_idx == df_idx |
403 | | - |
404 | | - # Verify stats update for the MultiBackendJobManager |
405 | | - assert result.stats_update == {"job download error": 1} |
406 | | - |
407 | | - # Verify no file was created (or only empty/failed files) |
408 | | - assert not any(p.is_file() for p in download_dir.glob("*")) |
409 | | - |
0 commit comments