Skip to content

Commit c56587f

Browse files
authored
feat: integrated the retrieval of the results (#6)
1 parent d2d7260 commit c56587f

File tree

8 files changed

+151
-16
lines changed

8 files changed

+151
-16
lines changed

app/database/models/processing_job.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,20 @@ def update_job_status_by_id(
9999
logger.warning(
100100
f"Could not update job status of job {job_id} as it could not be found in the database"
101101
)
102+
103+
104+
def update_job_result_by_id(database: Session, job_id: int, result_link: str):
105+
logger.info(
106+
f"Updating the result link of processing job with ID {job_id} to {result_link}"
107+
)
108+
job = get_job_by_id(database, job_id)
109+
110+
if job:
111+
job.result_link = result_link
112+
database.commit()
113+
database.refresh(job)
114+
else:
115+
logger.warning(
116+
f"Could not update job result link of job {job_id} as it could not be found in "
117+
"the database"
118+
)

app/platforms/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,14 @@ def get_job_status(
3434
:return: Return the processing status
3535
"""
3636
pass
37+
38+
@abstractmethod
39+
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
40+
"""
41+
Retrieve the job results of a processing job that is running on the platform.
42+
43+
:param job_id: The ID of the job on the platform
44+
:param details: The service details containing the service ID and application.
45+
:return: URL where the job results are described
46+
"""
47+
pass

app/platforms/implementations/ogc_api_process.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,8 @@ def get_job_status(
2424
raise NotImplementedError(
2525
"OGC API Process job status retrieval not implemented yet."
2626
)
27+
28+
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
29+
raise NotImplementedError(
30+
"OGC API Process job result retrieval not implemented yet."
31+
)

app/platforms/implementations/openeo.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,17 @@ def get_job_status(
161161
raise SystemError(
162162
f"Failed to fetch status openEO job with ID {job_id}"
163163
) from e
164+
165+
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
166+
try:
167+
logger.debug(f"Fetching job result for openEO job with ID {job_id}")
168+
connection = self._setup_connection(details.service)
169+
job = connection.job(job_id)
170+
return f"{details.service}{job.get_results_metadata_url()}"
171+
except Exception as e:
172+
logger.exception(
173+
f"Failed to fetch result url for for openEO job with ID {job_id}"
174+
)
175+
raise SystemError(
176+
f"Failed to fetch result url for openEO job with ID {job_id}"
177+
) from e

app/services/processing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
get_job_by_user_id,
77
get_jobs_by_user_id,
88
save_job_to_db,
9+
update_job_result_by_id,
910
update_job_status_by_id,
1011
)
1112
from app.platforms.dispatcher import get_processing_platform
@@ -57,6 +58,13 @@ def get_job_status(job: ProcessingJobRecord) -> ProcessingStatusEnum:
5758
return platform.get_job_status(job.platform_job_id, details)
5859

5960

61+
def get_job_result_url(job: ProcessingJobRecord) -> str:
62+
logger.info(f"Retrieving job result for job: {job.platform_job_id}")
63+
platform = get_processing_platform(job.label)
64+
details = ServiceDetails.model_validate_json(job.service_record)
65+
return platform.get_job_result_url(job.platform_job_id, details)
66+
67+
6068
def get_processing_jobs_by_user_id(
6169
database: Session, user_id: str
6270
) -> List[ProcessingJobSummary]:
@@ -72,6 +80,7 @@ def get_processing_jobs_by_user_id(
7280
}
7381

7482
for record in records:
83+
# Only check status for active jobs
7584
if record.status not in inactive_statuses:
7685
latest_status = get_job_status(record)
7786
if latest_status != record.status:
@@ -80,6 +89,11 @@ def get_processing_jobs_by_user_id(
8089
else:
8190
status = record.status
8291

92+
# Update the result if the job is finished and results weren't retrieved yet
93+
if status == ProcessingStatusEnum.FINISHED and not record.result_link:
94+
result_link = get_job_result_url(record)
95+
update_job_result_by_id(database, record.id, result_link)
96+
8397
jobs.append(
8498
ProcessingJobSummary(
8599
id=record.id, title=record.title, label=record.label, status=status

tests/platforms/test_dispatcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ def execute_job(
1717
def get_job_status(self, job_id, details):
1818
return ProcessingStatusEnum.FINISHED
1919

20+
def get_job_result_url(self, job_id, details):
21+
return "https://foo.bar"
22+
2023

2124
@pytest.fixture(autouse=True)
2225
def clear_registry():

tests/platforms/test_openeo_platform.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class DummyOpenEOClient:
1313
def job(self, job_id):
1414
job = MagicMock()
1515
job.status.return_value = ProcessingStatusEnum.RUNNING
16+
job.get_results_metadata_url.return_value = "/job/results"
1617
return job
1718

1819

@@ -140,24 +141,43 @@ def test_map_openeo_status(openeo_status, expected_enum):
140141
assert result == expected_enum
141142

142143

143-
def test_get_job_status_success():
144-
platform = OpenEOPlatform()
144+
@patch.object(OpenEOPlatform, "_setup_connection")
145+
def test_get_job_status_success(mock_connection, platform):
146+
mock_connection.return_value = DummyOpenEOClient()
145147

146-
with patch.object(platform, "_setup_connection", return_value=DummyOpenEOClient()):
147-
details = ServiceDetails(service="foo", application="bar")
148-
result = platform.get_job_status("job123", details)
148+
details = ServiceDetails(service="foo", application="bar")
149+
result = platform.get_job_status("job123", details)
149150

150-
assert result == ProcessingStatusEnum.RUNNING
151+
assert result == ProcessingStatusEnum.RUNNING
151152

152153

153-
def test_get_job_status_error():
154-
platform = OpenEOPlatform()
154+
@patch.object(OpenEOPlatform, "_setup_connection")
155+
def test_get_job_status_error(mock_connection, platform):
156+
mock_connection.side_effect = RuntimeError("Connection error")
155157

156-
with patch.object(
157-
platform, "_setup_connection", side_effect=RuntimeError("Connection error")
158-
):
159-
details = ServiceDetails(service="foo", application="bar")
160-
with pytest.raises(SystemError) as exc_info:
161-
platform.get_job_status("job123", details)
158+
details = ServiceDetails(service="foo", application="bar")
159+
with pytest.raises(SystemError) as exc_info:
160+
platform.get_job_status("job123", details)
162161

163162
assert "Failed to fetch status" in str(exc_info.value)
163+
164+
165+
@patch.object(OpenEOPlatform, "_setup_connection")
166+
def test_get_job_result_success(mock_connection, platform):
167+
mock_connection.return_value = DummyOpenEOClient()
168+
169+
details = ServiceDetails(service="foo", application="bar")
170+
result = platform.get_job_result_url("job123", details)
171+
172+
assert result == "foo/job/results"
173+
174+
175+
@patch.object(OpenEOPlatform, "_setup_connection")
176+
def test_get_job_url_error(mock_connection, platform):
177+
mock_connection.side_effect = RuntimeError("Connection error")
178+
179+
details = ServiceDetails(service="foo", application="bar")
180+
with pytest.raises(SystemError) as exc_info:
181+
platform.get_job_result_url("job123", details)
182+
183+
assert "Failed to fetch result url" in str(exc_info.value)

tests/services/test_processing.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414
from app.services.processing import (
1515
create_processing_job,
16+
get_job_result_url,
1617
get_job_status,
1718
get_processing_job_by_user_id,
1819
get_processing_jobs_by_user_id,
@@ -93,7 +94,7 @@ def test_get_processing_jobs_with_active_and_inactive_statuses(
9394
platform_job_id="platform456",
9495
label=ProcessTypeEnum.OGC_API_PROCESS,
9596
title="Finished Job",
96-
status=ProcessingStatusEnum.FINISHED,
97+
status=ProcessingStatusEnum.FAILED,
9798
service_record=json.dumps({"foo": "bar"}),
9899
)
99100
mock_get_jobs.return_value = [fake_processing_job_record, inactive_job]
@@ -104,7 +105,7 @@ def test_get_processing_jobs_with_active_and_inactive_statuses(
104105
assert len(results) == 2
105106
assert isinstance(results[0], ProcessingJobSummary)
106107
assert results[0].status == ProcessingStatusEnum.RUNNING
107-
assert results[1].status == ProcessingStatusEnum.FINISHED
108+
assert results[1].status == ProcessingStatusEnum.FAILED
108109

109110
# Active job should be refreshed
110111
mock_get_job_status.assert_called_once_with(fake_processing_job_record)
@@ -136,6 +137,44 @@ def test_get_processing_jobs_no_updates(
136137
mock_update_job_status.assert_not_called()
137138

138139

140+
@patch("app.services.processing.update_job_result_by_id")
141+
@patch("app.services.processing.get_job_result_url")
142+
@patch("app.services.processing.get_jobs_by_user_id")
143+
def test_get_processing_jobs_with_finished_statuses(
144+
mock_get_jobs, mock_get_jobs_results, mock_update_job_result, fake_db_session
145+
):
146+
finished_job_no_result = ProcessingJobRecord(
147+
id=2,
148+
platform_job_id="platform456",
149+
label=ProcessTypeEnum.OGC_API_PROCESS,
150+
title="Finished Job",
151+
status=ProcessingStatusEnum.FINISHED,
152+
service_record=json.dumps({"foo": "bar"}),
153+
)
154+
finished_job_result = ProcessingJobRecord(
155+
id=3,
156+
platform_job_id="platform456",
157+
label=ProcessTypeEnum.OGC_API_PROCESS,
158+
title="Finished Job",
159+
status=ProcessingStatusEnum.FINISHED,
160+
service_record=json.dumps({"foo": "bar"}),
161+
result_link="https://foo.bar",
162+
)
163+
mock_get_jobs.return_value = [finished_job_no_result, finished_job_result]
164+
mock_get_jobs_results.return_value = "https://foo2.bar"
165+
166+
results = get_processing_jobs_by_user_id(fake_db_session, "user1")
167+
168+
assert len(results) == 2
169+
assert isinstance(results[0], ProcessingJobSummary)
170+
171+
# Active job should be refreshed
172+
mock_get_jobs_results.assert_called_once_with(finished_job_no_result)
173+
mock_update_job_result.assert_called_once_with(
174+
ANY, finished_job_no_result.id, "https://foo2.bar"
175+
)
176+
177+
139178
@patch("app.services.processing.get_processing_platform")
140179
def test_get_job_status_from_platform(mock_get_platform, fake_processing_job_record):
141180

@@ -148,6 +187,18 @@ def test_get_job_status_from_platform(mock_get_platform, fake_processing_job_rec
148187
assert status == ProcessingStatusEnum.QUEUED
149188

150189

190+
@patch("app.services.processing.get_processing_platform")
191+
def test_get_job_result_from_platform(mock_get_platform, fake_processing_job_record):
192+
193+
fake_platform = MagicMock()
194+
fake_platform.get_job_result_url.return_value = "https://foo.bar"
195+
mock_get_platform.return_value = fake_platform
196+
197+
result = get_job_result_url(fake_processing_job_record)
198+
199+
assert result == "https://foo.bar"
200+
201+
151202
@patch("app.services.processing.get_job_by_user_id")
152203
def test_get_processing_job_by_user_id(mock_get_job, fake_db_session):
153204

0 commit comments

Comments
 (0)