Skip to content

Commit 2a0c83e

Browse files
committed
feat: added sync endpoint
1 parent 4fdf668 commit 2a0c83e

File tree

13 files changed

+442
-11
lines changed

13 files changed

+442
-11
lines changed

app/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from app.services.tiles.base import load_grids
77
from app.config.logger import setup_logging
88
from app.config.settings import settings
9-
from app.routers import jobs_status, unit_jobs, health, tiles, upscale_tasks
9+
from app.routers import jobs_status, unit_jobs, health, tiles, upscale_tasks, sync_jobs
1010

1111
setup_logging()
1212

@@ -33,5 +33,6 @@
3333
app.include_router(tiles.router)
3434
app.include_router(jobs_status.router)
3535
app.include_router(unit_jobs.router)
36+
app.include_router(sync_jobs.router)
3637
app.include_router(upscale_tasks.router)
3738
app.include_router(health.router)

app/platforms/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from abc import ABC, abstractmethod
22

3+
from fastapi import Response
4+
35
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum
46
from app.schemas.unit_job import ServiceDetails
57

@@ -33,6 +35,27 @@ async def execute_job(
3335
"""
3436
pass
3537

38+
@abstractmethod
39+
async def execute_synchronous_job(
40+
self,
41+
user_token: str,
42+
title: str,
43+
details: ServiceDetails,
44+
parameters: dict,
45+
format: OutputFormatEnum,
46+
) -> Response:
47+
"""
48+
Execute a processing job synchronously on the platform with the given service ID
49+
and parameters.
50+
51+
:param title: The title of the job to be executed.
52+
:param details: The service details containing the service ID and application.
53+
:param parameters: The parameters required for the job execution.
54+
:param format: Format of the output result.
55+
:return: Return the result of the job.
56+
"""
57+
pass
58+
3659
@abstractmethod
3760
async def get_job_status(
3861
self, user_token: str, job_id: str, details: ServiceDetails

app/platforms/implementations/ogc_api_process.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from fastapi import Response
12
from app.platforms.base import BaseProcessingPlatform
23
from app.platforms.dispatcher import register_platform
34
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
@@ -22,6 +23,16 @@ async def execute_job(
2223
) -> str:
2324
raise NotImplementedError("OGC API Process job execution not implemented yet.")
2425

26+
async def execute_synchronous_job(
27+
self,
28+
user_token: str,
29+
title: str,
30+
details: ServiceDetails,
31+
parameters: dict,
32+
format: OutputFormatEnum,
33+
) -> Response:
34+
raise NotImplementedError("OGC API Process job execution not implemented yet.")
35+
2536
async def get_job_status(
2637
self, user_token: str, job_id: str, details: ServiceDetails
2738
) -> ProcessingStatusEnum:

app/platforms/implementations/openeo.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22

3+
from fastapi import Response
34
import jwt
45
import openeo
56
import requests
@@ -179,14 +180,9 @@ def _get_process_id(self, url: str) -> str:
179180

180181
return process_id
181182

182-
async def execute_job(
183-
self,
184-
user_token: str,
185-
title: str,
186-
details: ServiceDetails,
187-
parameters: dict,
188-
format: OutputFormatEnum,
189-
) -> str:
183+
async def _build_datacube(
184+
self, user_token: str, title: str, details: ServiceDetails, parameters: dict
185+
) -> openeo.DataCube:
190186
process_id = self._get_process_id(details.application)
191187

192188
logger.debug(
@@ -195,14 +191,42 @@ async def execute_job(
195191
)
196192

197193
connection = await self._setup_connection(user_token, details.endpoint)
198-
service = connection.datacube_from_process(
194+
return connection.datacube_from_process(
199195
process_id=process_id, namespace=details.application, **parameters
200196
)
197+
198+
async def execute_job(
199+
self,
200+
user_token: str,
201+
title: str,
202+
details: ServiceDetails,
203+
parameters: dict,
204+
format: OutputFormatEnum,
205+
) -> str:
206+
service = await self._build_datacube(user_token, title, details, parameters)
201207
job = service.create_job(title=title, out_format=format)
208+
logger.info(f"Executing OpenEO batch job with title={title}")
202209
job.start()
203210

204211
return job.job_id
205212

213+
async def execute_synchronous_job(
214+
self,
215+
user_token: str,
216+
title: str,
217+
details: ServiceDetails,
218+
parameters: dict,
219+
format: OutputFormatEnum,
220+
) -> Response:
221+
service = await self._build_datacube(user_token, title, details, parameters)
222+
logger.info("Executing synchronous OpenEO job")
223+
response = service.execute(auto_decode=False)
224+
return Response(
225+
content=response.content,
226+
status_code=response.status_code,
227+
media_type=response.headers.get("Content-Type"),
228+
)
229+
206230
def _map_openeo_status(self, status: str) -> ProcessingStatusEnum:
207231
"""
208232
Map the status returned by openEO to a status known within the API.

app/routers/sync_jobs.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
from typing import Annotated
2+
from fastapi import Body, APIRouter, Depends, HTTPException, Response, status
3+
from loguru import logger
4+
5+
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
6+
from app.schemas.unit_job import (
7+
BaseJobRequest,
8+
ServiceDetails,
9+
)
10+
from app.auth import oauth2_scheme
11+
from app.services.processing import (
12+
create_synchronous_job,
13+
)
14+
15+
16+
# from app.auth import get_current_user
17+
18+
router = APIRouter()
19+
20+
21+
@router.post(
22+
"/sync_jobs",
23+
status_code=status.HTTP_201_CREATED,
24+
tags=["Unit Jobs"],
25+
summary="Create a new processing job",
26+
)
27+
async def create_sync_job(
28+
payload: Annotated[
29+
BaseJobRequest,
30+
Body(
31+
openapi_examples={
32+
"openEO Example": {
33+
"summary": "Valid openEO job request",
34+
"description": "The following example demonstrates how to create a processing "
35+
"job using an openEO-based service. This example triggers the "
36+
"[`variability map`](https://github.com/ESA-APEx/apex_algorithms/blob/main/algo"
37+
"rithm_catalog/vito/variabilitymap/records/variabilitymap.json) "
38+
"process using the CDSE openEO Federation. In this case the `endpoint`"
39+
"represents the URL of the openEO backend and the `application` refers to the "
40+
"User Defined Process (UDP) that is being executed on the backend.",
41+
"value": BaseJobRequest(
42+
label=ProcessTypeEnum.OPENEO,
43+
title="Example openEO Job",
44+
service=ServiceDetails(
45+
endpoint="https://openeofed.dataspace.copernicus.eu",
46+
application="https://raw.githubusercontent.com/ESA-APEx/apex_algorithms"
47+
"/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/"
48+
"variabilitymap.json",
49+
),
50+
format=OutputFormatEnum.GEOTIFF,
51+
parameters={
52+
"spatial_extent": {
53+
"type": "FeatureCollection",
54+
"features": [
55+
{
56+
"type": "Feature",
57+
"properties": {},
58+
"geometry": {
59+
"coordinates": [
60+
[
61+
[
62+
5.170043941798298,
63+
51.25050990858725,
64+
],
65+
[
66+
5.171035037521989,
67+
51.24865722468999,
68+
],
69+
[
70+
5.178521828188366,
71+
51.24674578027137,
72+
],
73+
[
74+
5.179084341977159,
75+
51.24984764553983,
76+
],
77+
[
78+
5.170043941798298,
79+
51.25050990858725,
80+
],
81+
]
82+
],
83+
"type": "Polygon",
84+
},
85+
}
86+
],
87+
},
88+
"temporal_extent": ["2025-05-01", "2025-05-01"],
89+
},
90+
).model_dump(),
91+
}
92+
},
93+
),
94+
],
95+
token: str = Depends(oauth2_scheme),
96+
) -> Response:
97+
"""Initiate a synchronous processing job with the provided data and return the result."""
98+
try:
99+
return await create_synchronous_job(token, payload)
100+
except HTTPException as e:
101+
raise e
102+
except Exception as e:
103+
logger.exception(f"Error creating synchronous job: {e}")
104+
raise HTTPException(
105+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
106+
detail=f"An error occurred while creating the synchronous job: {e}",
107+
)

app/schemas/enum.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ class OutputFormatEnum(str, Enum):
2020
GEOJSON = "geojson"
2121
GEOTIFF = "gtiff"
2222
NETCDF = "netcdf"
23+
JSON = "json"

app/services/processing.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
from typing import List, Optional
33

4+
from fastapi import Response
45
from loguru import logger
56
from app.auth import get_current_user_id
67
from app.database.models.processing_job import (
@@ -192,3 +193,20 @@ async def get_processing_job_by_user_id(
192193
created=record.created,
193194
updated=record.updated,
194195
)
196+
197+
198+
async def create_synchronous_job(
199+
user_token: str,
200+
request: BaseJobRequest,
201+
) -> Response:
202+
logger.info(f"Creating synchronous job with summary: {request}")
203+
204+
platform = get_processing_platform(request.label)
205+
206+
return await platform.execute_synchronous_job(
207+
user_token=user_token,
208+
title=request.title,
209+
details=request.service,
210+
parameters=request.parameters,
211+
format=request.format,
212+
)

0 commit comments

Comments
 (0)