Skip to content

Commit 3989df1

Browse files
committed
feat: integration of endpoint for retrieving results
1 parent eb72cfa commit 3989df1

File tree

17 files changed

+242
-92
lines changed

17 files changed

+242
-92
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""update of result of processing job
2+
3+
Revision ID: 8a69e1ee3fef
4+
Revises: 046ada00e750
5+
Create Date: 2025-08-25 09:40:54.675672
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = '8a69e1ee3fef'
16+
down_revision: Union[str, Sequence[str], None] = '046ada00e750'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.drop_column('processing_jobs', 'result_link')
25+
# ### end Alembic commands ###
26+
27+
28+
def downgrade() -> None:
29+
"""Downgrade schema."""
30+
# ### commands auto generated by Alembic - please adjust! ###
31+
op.add_column('processing_jobs', sa.Column('result_link', sa.VARCHAR(), autoincrement=False, nullable=True))
32+
# ### end Alembic commands ###

app/database/models/processing_job.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
import json
23
from typing import List, Optional
34

45
from loguru import logger
@@ -8,6 +9,8 @@
89
from app.database.db import Base
910
from app.schemas.unit_job import ProcessingStatusEnum, ProcessTypeEnum
1011

12+
from stac_pydantic import Collection
13+
1114

1215
class ProcessingJobRecord(Base):
1316
__tablename__ = "processing_jobs"
@@ -23,7 +26,6 @@ class ProcessingJobRecord(Base):
2326
user_id: Mapped[str] = mapped_column(String, index=True)
2427
platform_job_id: Mapped[str] = mapped_column(String, index=True)
2528
parameters: Mapped[str] = mapped_column(String, index=False)
26-
result_link: Mapped[Optional[str]] = mapped_column(String, index=False)
2729
service: Mapped[str] = mapped_column(String, index=True)
2830
created: Mapped[datetime.datetime] = mapped_column(
2931
DateTime, default=datetime.datetime.utcnow, index=True
@@ -116,14 +118,12 @@ def update_job_status_by_id(
116118
)
117119

118120

119-
def update_job_result_by_id(database: Session, job_id: int, result_link: str):
120-
logger.info(
121-
f"Updating the result link of processing job with ID {job_id} to {result_link}"
122-
)
121+
def update_job_result_by_id(database: Session, job_id: int, result: Collection):
122+
logger.info(f"Updating the result link of processing job with ID {job_id}")
123123
job = get_job_by_id(database, job_id)
124124

125125
if job:
126-
job.result_link = result_link
126+
job.result = json.dumps(result)
127127
database.commit()
128128
database.refresh(job)
129129
else:

app/platforms/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from app.schemas.enum import ProcessingStatusEnum
44
from app.schemas.unit_job import ServiceDetails
55

6+
from stac_pydantic import Collection
7+
68

79
class BaseProcessingPlatform(ABC):
810
"""
@@ -36,12 +38,12 @@ def get_job_status(
3638
pass
3739

3840
@abstractmethod
39-
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
41+
def get_job_results(self, job_id: str, details: ServiceDetails) -> Collection:
4042
"""
4143
Retrieve the job results of a processing job that is running on the platform.
4244
4345
:param job_id: The ID of the job on the platform
4446
:param details: The service details containing the service ID and application.
45-
:return: URL where the job results are described
47+
:return: STAC collection representing the results.
4648
"""
4749
pass

app/platforms/implementations/ogc_api_process.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from app.platforms.dispatcher import register_platform
33
from app.schemas.enum import ProcessTypeEnum, ProcessingStatusEnum
44
from app.schemas.unit_job import ServiceDetails
5+
from stac_pydantic import Collection
56

67

78
@register_platform(ProcessTypeEnum.OGC_API_PROCESS)
@@ -21,7 +22,7 @@ def get_job_status(
2122
"OGC API Process job status retrieval not implemented yet."
2223
)
2324

24-
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
25+
def get_job_results(self, job_id: str, details: ServiceDetails) -> Collection:
2526
raise NotImplementedError(
2627
"OGC API Process job result retrieval not implemented yet."
2728
)

app/platforms/implementations/openeo.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from app.platforms.dispatcher import register_platform
1414
from app.schemas.enum import ProcessTypeEnum, ProcessingStatusEnum
1515
from app.schemas.unit_job import ServiceDetails
16+
from stac_pydantic import Collection
1617

1718
load_dotenv()
1819

@@ -201,12 +202,12 @@ def get_job_status(
201202
f"Failed to fetch status openEO job with ID {job_id}"
202203
) from e
203204

204-
def get_job_result_url(self, job_id: str, details: ServiceDetails) -> str:
205+
def get_job_results(self, job_id: str, details: ServiceDetails) -> Collection:
205206
try:
206207
logger.debug(f"Fetching job result for openEO job with ID {job_id}")
207208
connection = self._setup_connection(details.endpoint)
208209
job = connection.job(job_id)
209-
return job.get_results_metadata_url(full=True)
210+
return Collection(**job.get_results().get_metadata())
210211
except Exception as e:
211212
logger.exception(
212213
f"Failed to fetch result url for for openEO job with ID {job_id}"

app/routers/unit_jobs.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@
1111
ProcessingJobSummary,
1212
ServiceDetails,
1313
)
14-
from app.services.processing import create_processing_job, get_processing_job_by_user_id
14+
from app.services.processing import (
15+
create_processing_job,
16+
get_processing_job_by_user_id,
17+
get_processing_job_results,
18+
)
19+
20+
from stac_pydantic import Collection
1521

1622
# from app.auth import get_current_user
1723

@@ -121,3 +127,32 @@ async def get_job(
121127
detail=f"Processing job {job_id} not found",
122128
)
123129
return job
130+
131+
132+
@router.get(
133+
"/unit_jobs/{job_id}/results",
134+
tags=["Unit Jobs"],
135+
responses={404: {"description": "Processing job not found"}},
136+
)
137+
async def get_job_results(
138+
job_id: int, db: Session = Depends(get_db), user: str = "foobar"
139+
) -> Collection | None:
140+
try:
141+
result = get_processing_job_results(db, job_id, user)
142+
if not result:
143+
logger.error(
144+
f"Result for processing job {job_id} not found for user {user}"
145+
)
146+
raise HTTPException(
147+
status_code=404,
148+
detail=f"Result for processing job {job_id} not found",
149+
)
150+
return result
151+
except HTTPException as e:
152+
raise e
153+
except Exception as e:
154+
logger.exception(f"Error creating processing job for user {user}: {e}")
155+
raise HTTPException(
156+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
157+
detail=f"An error occurred while creating the processing job: {e}",
158+
)

app/schemas/unit_job.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from datetime import datetime
2-
from typing import Optional
32

43
from pydantic import BaseModel, Field
54

@@ -48,14 +47,6 @@ class ProcessingJobSummary(BaseModel):
4847
description="JSON representing the parameters for the service execution",
4948
examples=[{"param1": "value1", "param2": "value2"}],
5049
)
51-
result_link: Optional[str] = Field(
52-
...,
53-
description="URL to the results of the processing job",
54-
examples=[
55-
"https://openeofed.dataspace.copernicus.eu/jobs/"
56-
"cdse-j-25082106161041f1a151bd539f614130/results"
57-
],
58-
)
5950

6051

6152
class ProcessingJobDetails(BaseModel):

app/schemas/upscale_task.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,13 @@ class UpscalingTaskDetails(BaseModel):
4444
label=ProcessTypeEnum.OPENEO,
4545
status=ProcessingStatusEnum.FINISHED,
4646
parameters={"param1": "value1", "param2": "value2"},
47-
result_link="https://foo.bar",
4847
),
4948
ProcessingJobSummary(
5049
id=1,
5150
title="Upscaling Job 2",
5251
label=ProcessTypeEnum.OPENEO,
5352
status=ProcessingStatusEnum.RUNNING,
5453
parameters={"param1": "value1", "param2": "value2"},
55-
result_link=None,
5654
),
5755
]
5856
],

app/services/processing.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
get_job_by_user_id,
88
get_jobs_by_user_id,
99
save_job_to_db,
10-
update_job_result_by_id,
1110
update_job_status_by_id,
1211
)
1312
from app.platforms.dispatcher import get_processing_platform
@@ -21,6 +20,7 @@
2120
ServiceDetails,
2221
)
2322

23+
from stac_pydantic import Collection
2424

2525
INACTIVE_JOB_STATUSES = {
2626
ProcessingStatusEnum.CANCELED,
@@ -50,7 +50,6 @@ def create_processing_job(
5050
user_id=user,
5151
platform_job_id=job_id,
5252
parameters=json.dumps(request.parameters),
53-
result_link=None,
5453
service=request.service.model_dump_json(),
5554
upscaling_task_id=upscaling_task_id,
5655
)
@@ -61,7 +60,6 @@ def create_processing_job(
6160
label=request.label,
6261
status=record.status,
6362
parameters=request.parameters,
64-
result_link=None,
6563
)
6664

6765

@@ -74,11 +72,17 @@ def get_job_status(job: ProcessingJobRecord) -> ProcessingStatusEnum:
7472
return platform.get_job_status(job.platform_job_id, details)
7573

7674

77-
def get_job_result_url(job: ProcessingJobRecord) -> str:
78-
logger.info(f"Retrieving job result for job: {job.platform_job_id}")
79-
platform = get_processing_platform(job.label)
80-
details = ServiceDetails.model_validate_json(job.service)
81-
return platform.get_job_result_url(job.platform_job_id, details)
75+
def get_processing_job_results(
76+
database: Session, job_id: int, user_id: str
77+
) -> Collection | None:
78+
record = get_job_by_user_id(database, job_id, user_id)
79+
if not record:
80+
return None
81+
82+
logger.info(f"Retrieving job result for job: {record.platform_job_id}")
83+
platform = get_processing_platform(record.label)
84+
details = ServiceDetails.model_validate_json(record.service)
85+
return platform.get_job_results(record.platform_job_id, details)
8286

8387

8488
def _refresh_job_status(
@@ -105,19 +109,13 @@ def get_processing_jobs_by_user_id(
105109
if record.status not in INACTIVE_JOB_STATUSES:
106110
record = _refresh_job_status(database, record)
107111

108-
# Update the result if the job is finished and results weren't retrieved yet
109-
if record.status == ProcessingStatusEnum.FINISHED and not record.result_link:
110-
result_link = get_job_result_url(record)
111-
update_job_result_by_id(database, record.id, result_link)
112-
113112
jobs.append(
114113
ProcessingJobSummary(
115114
id=record.id,
116115
title=record.title,
117116
label=record.label,
118117
status=record.status,
119118
parameters=json.loads(record.parameters),
120-
result_link=record.result_link,
121119
)
122120
)
123121
return jobs
@@ -141,7 +139,6 @@ def get_processing_job_by_user_id(
141139
status=record.status,
142140
service=ServiceDetails.model_validate_json(record.service or "{}"),
143141
parameters=json.loads(record.parameters or "{}"),
144-
result_link=record.result_link,
145142
created=record.created,
146143
updated=record.updated,
147144
)

0 commit comments

Comments
 (0)