Skip to content

Commit a8a3afb

Browse files
committed
feat: poc implementation of the parameter translation
1 parent 19df780 commit a8a3afb

File tree

7 files changed

+199
-1
lines changed

7 files changed

+199
-1
lines changed

app/main.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77
from app.services.tiles.base import load_grids
88
from app.config.logger import setup_logging
99
from app.config.settings import settings
10-
from app.routers import jobs_status, unit_jobs, health, tiles, upscale_tasks, sync_jobs
10+
from app.routers import (
11+
jobs_status,
12+
unit_jobs,
13+
health,
14+
tiles,
15+
upscale_tasks,
16+
sync_jobs,
17+
parameters,
18+
)
1119

1220
setup_logging()
1321

@@ -38,3 +46,4 @@
3846
app.include_router(sync_jobs.router)
3947
app.include_router(upscale_tasks.router)
4048
app.include_router(health.router)
49+
app.include_router(parameters.router)

app/platforms/base.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from abc import ABC, abstractmethod
2+
from typing import List
23

34
from fastapi import Response
45

56
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum
7+
from app.schemas.parameters import Parameter
68
from app.schemas.unit_job import ServiceDetails
79

810
from stac_pydantic import Collection
@@ -83,3 +85,16 @@ async def get_job_results(
8385
:return: STAC collection representing the results.
8486
"""
8587
pass
88+
89+
@abstractmethod
90+
async def get_service_parameters(
91+
self, user_token: str, details: ServiceDetails
92+
) -> List[Parameter]:
93+
"""
94+
Retrieve the parameters required for a specific processing service.
95+
96+
:param user_token: The access token of the user executing the job.
97+
:param details: The service details containing the service ID and application.
98+
:return: Response containing the service parameters.
99+
"""
100+
pass

app/platforms/implementations/ogc_api_process.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from typing import List
12
from fastapi import Response
23
from app.platforms.base import BaseProcessingPlatform
34
from app.platforms.dispatcher import register_platform
45
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
6+
from app.schemas.parameters import Parameter
57
from app.schemas.unit_job import ServiceDetails
68
from stac_pydantic import Collection
79

@@ -46,3 +48,10 @@ async def get_job_results(
4648
raise NotImplementedError(
4749
"OGC API Process job result retrieval not implemented yet."
4850
)
51+
52+
async def get_service_parameters(
53+
self, user_token: str, details: ServiceDetails
54+
) -> List[Parameter]:
55+
raise NotImplementedError(
56+
"OGC API Process service parameter retrieval not implemented yet."
57+
)

app/platforms/implementations/openeo.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
from typing import List
23

34
from fastapi import Response
45
import jwt
@@ -13,6 +14,7 @@
1314
from app.platforms.base import BaseProcessingPlatform
1415
from app.platforms.dispatcher import register_platform
1516
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum
17+
from app.schemas.parameters import ParamTypeEnum, Parameter
1618
from app.schemas.unit_job import ServiceDetails
1719

1820
load_dotenv()
@@ -267,3 +269,44 @@ async def get_job_results(
267269
connection = await self._setup_connection(user_token, details.endpoint)
268270
job = connection.job(job_id)
269271
return Collection(**job.get_results().get_metadata())
272+
273+
async def get_service_parameters(
274+
self, user_token: str, details: ServiceDetails
275+
) -> List[Parameter]:
276+
parameters = []
277+
logger.debug(
278+
f"Fetching service parameters for OpenEO service at {details.application}"
279+
)
280+
udp = requests.get(details.application)
281+
udp.raise_for_status()
282+
udp_params = udp.json().get("parameters", [])
283+
284+
for param in udp_params:
285+
schemas = param.get("schema", {})
286+
if not isinstance(schemas, list):
287+
schemas = [schemas]
288+
parameters.append(
289+
Parameter(
290+
name=param.get("name"),
291+
description=param.get("description"),
292+
default=param.get("default"),
293+
optional=param.get("optional", False),
294+
type=self._get_type_from_schemas(schemas),
295+
)
296+
)
297+
298+
return parameters
299+
300+
def _get_type_from_schemas(self, schemas: dict) -> ParamTypeEnum:
301+
for schema in schemas:
302+
type = schema.get("type")
303+
subtype = schema.get("subtype")
304+
if type == "array" and subtype == "temporal-interval":
305+
return ParamTypeEnum.DATE_INTERVAL
306+
elif subtype == "bounding-box":
307+
return ParamTypeEnum.BOUNDING_BOX
308+
elif type == "boolean":
309+
return ParamTypeEnum.BOOLEAN
310+
311+
# If no matching schema found, raise an error
312+
raise ValueError(f"Unsupported parameter schemas: {schemas}")

app/routers/parameters.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from typing import Annotated, List
2+
from fastapi import Body, APIRouter, Depends, HTTPException, Response, status
3+
from loguru import logger
4+
5+
from app.schemas.enum import ProcessTypeEnum
6+
from app.schemas.parameters import ParamRequest, Parameter
7+
from app.schemas.unit_job import (
8+
ServiceDetails,
9+
)
10+
from app.auth import oauth2_scheme
11+
from app.services.processing import retrieve_service_parameters
12+
13+
14+
# from app.auth import get_current_user
15+
16+
router = APIRouter()
17+
18+
19+
@router.post(
20+
"/params",
21+
status_code=status.HTTP_200_OK,
22+
tags=["Unit Jobs"],
23+
summary="Get the parameters of a specific processing service.",
24+
)
25+
async def get_job_params(
26+
payload: Annotated[
27+
ParamRequest,
28+
Body(
29+
openapi_examples={
30+
"openEO Example": {
31+
"summary": "Retrieving the parameters for an openEO-based service",
32+
"description": "The following example demonstrates how to retrieve the "
33+
"parameters for a processing job using an openEO-based service.",
34+
"value": ParamRequest(
35+
label=ProcessTypeEnum.OPENEO,
36+
service=ServiceDetails(
37+
endpoint="https://openeofed.dataspace.copernicus.eu",
38+
application="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms"
39+
"/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/"
40+
"variabilitymap.json",
41+
),
42+
).model_dump(),
43+
}
44+
},
45+
),
46+
],
47+
token: str = Depends(oauth2_scheme),
48+
) -> List[Parameter]:
49+
"""Retrieve the parameters required for a specific processing service."""
50+
try:
51+
return await retrieve_service_parameters(token, payload)
52+
except HTTPException as e:
53+
raise e
54+
except Exception as e:
55+
logger.exception(f"Error retrieving service parameters: {e}")
56+
raise HTTPException(
57+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
58+
detail=f"An error occurred while retrieving service parameters: {e}",
59+
)

app/schemas/parameters.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from enum import Enum
2+
from typing import Any
3+
from pydantic import BaseModel, Field
4+
5+
from app.schemas.enum import ProcessTypeEnum
6+
from app.schemas.unit_job import ServiceDetails
7+
8+
9+
class ParamTypeEnum(str, Enum):
10+
DATE_INTERVAL = "date-interval"
11+
BOUNDING_BOX = "bounding-box"
12+
BOOLEAN = "boolean"
13+
14+
15+
class ParamRequest(BaseModel):
16+
label: ProcessTypeEnum = Field(
17+
...,
18+
description="Label representing the type of the service",
19+
)
20+
service: ServiceDetails = Field(
21+
..., description="Details of the service for which to retrieve the parameters"
22+
)
23+
24+
25+
class Parameter(BaseModel):
26+
name: str = Field(..., description="Name of the parameter", examples=["param1"])
27+
type: ParamTypeEnum = Field(
28+
...,
29+
description="Data type of the parameter",
30+
examples=[ParamTypeEnum.DATE_INTERVAL],
31+
)
32+
optional: bool = Field(
33+
...,
34+
description="Indicates whether the parameter is optional",
35+
examples=[False],
36+
)
37+
description: str = Field(
38+
...,
39+
description="Description of the parameter",
40+
examples=["This parameter specifies the ..."],
41+
)
42+
default: Any = Field(
43+
None,
44+
description="Default value of the parameter, if any",
45+
examples=["default_value"],
46+
)

app/services/processing.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from sqlalchemy.orm import Session
1616

1717
from app.schemas.enum import ProcessingStatusEnum
18+
from app.schemas.parameters import ParamRequest, Parameter
1819
from app.schemas.unit_job import (
1920
BaseJobRequest,
2021
ProcessingJob,
@@ -210,3 +211,19 @@ async def create_synchronous_job(
210211
parameters=request.parameters,
211212
format=request.format,
212213
)
214+
215+
216+
async def retrieve_service_parameters(
217+
user_token: str,
218+
payload: ParamRequest,
219+
) -> List[Parameter]:
220+
logger.info(
221+
f"Retrieving service parameters for service {payload.service.application} at {payload.service.endpoint}"
222+
)
223+
224+
platform = get_processing_platform(payload.label)
225+
226+
return await platform.get_service_parameters(
227+
user_token=user_token,
228+
details=payload.service,
229+
)

0 commit comments

Comments
 (0)