Skip to content

Commit ca48b98

Browse files
committed
feat: added support for format
1 parent bab0e13 commit ca48b98

File tree

14 files changed

+130
-78
lines changed

14 files changed

+130
-78
lines changed

app/platforms/base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from abc import ABC, abstractmethod
22

3-
from app.schemas.enum import ProcessingStatusEnum
3+
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum
44
from app.schemas.unit_job import ServiceDetails
55

66
from stac_pydantic import Collection
@@ -13,13 +13,16 @@ class BaseProcessingPlatform(ABC):
1313
"""
1414

1515
@abstractmethod
16-
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> str:
16+
def execute_job(
17+
self, title: str, details: ServiceDetails, parameters: dict, format: OutputFormatEnum
18+
) -> str:
1719
"""
1820
Execute a processing job on the platform with the given service ID and parameters.
1921
2022
:param title: The title of the job to be executed.
2123
:param details: The service details containing the service ID and application.
2224
:param parameters: The parameters required for the job execution.
25+
:param format: Format of the output result.
2326
:return: Return the ID of the job that was created
2427
"""
2528
pass

app/platforms/implementations/ogc_api_process.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from app.platforms.base import BaseProcessingPlatform
22
from app.platforms.dispatcher import register_platform
3-
from app.schemas.enum import ProcessTypeEnum, ProcessingStatusEnum
3+
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
44
from app.schemas.unit_job import ServiceDetails
55
from stac_pydantic import Collection
66

@@ -12,7 +12,9 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform):
1212
This class handles the execution of processing jobs on the OGC API Process platform.
1313
"""
1414

15-
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> str:
15+
def execute_job(
16+
self, title: str, details: ServiceDetails, parameters: dict, format: OutputFormatEnum
17+
) -> str:
1618
raise NotImplementedError("OGC API Process job execution not implemented yet.")
1719

1820
def get_job_status(

app/platforms/implementations/openeo.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from app.platforms.base import BaseProcessingPlatform
1313
from app.platforms.dispatcher import register_platform
14-
from app.schemas.enum import ProcessTypeEnum, ProcessingStatusEnum
14+
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
1515
from app.schemas.unit_job import ServiceDetails
1616
from stac_pydantic import Collection
1717

@@ -143,7 +143,9 @@ def _get_process_id(self, url: str) -> str:
143143

144144
return process_id
145145

146-
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> str:
146+
def execute_job(
147+
self, title: str, details: ServiceDetails, parameters: dict, format: OutputFormatEnum
148+
) -> str:
147149
try:
148150
process_id = self._get_process_id(details.application)
149151

@@ -156,7 +158,7 @@ def execute_job(self, title: str, details: ServiceDetails, parameters: dict) ->
156158
service = connection.datacube_from_process(
157159
process_id=process_id, namespace=details.application, **parameters
158160
)
159-
job = service.create_job(title=title)
161+
job = service.create_job(title=title, out_format=format)
160162
job.start()
161163

162164
return job.job_id

app/routers/unit_jobs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from sqlalchemy.orm import Session
55

66
from app.database.db import get_db
7-
from app.schemas.enum import ProcessTypeEnum
7+
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
88
from app.schemas.unit_job import (
99
BaseJobRequest,
1010
ProcessingJob,
@@ -53,6 +53,7 @@ async def create_unit_job(
5353
"/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/"
5454
"variabilitymap.json",
5555
),
56+
format=OutputFormatEnum.GEOTIFF,
5657
parameters={
5758
"spatial_extent": {
5859
"type": "FeatureCollection",

app/routers/upscale_tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from sqlalchemy.orm import Session
1616

1717
from app.database.db import SessionLocal, get_db
18-
from app.schemas.enum import ProcessTypeEnum
18+
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum
1919
from app.schemas.unit_job import (
2020
ServiceDetails,
2121
)
@@ -66,6 +66,7 @@ async def create_upscale_task(
6666
"/32ea3c9a6fa24fe063cb59164cd318cceb7209b0/openeo_udp/variabilitymap/"
6767
"variabilitymap.json",
6868
),
69+
format=OutputFormatEnum.GEOTIFF,
6970
parameters={
7071
"temporal_extent": ["2025-05-01", "2025-05-01"],
7172
},

app/schemas/enum.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,9 @@ class ProcessingStatusEnum(str, Enum):
1414
CANCELED = "canceled"
1515
FAILED = "failed"
1616
UNKNOWN = "unknown"
17+
18+
19+
class OutputFormatEnum(str, Enum):
20+
GEOJSON = "geojson"
21+
GEOTIFF = "gtiff"
22+
NETCDF = "netcdf"

app/schemas/unit_job.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from pydantic import BaseModel, Field
44

5-
from app.schemas.enum import ProcessingStatusEnum, ProcessTypeEnum
5+
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum
66

77

88
class ServiceDetails(BaseModel):
@@ -77,3 +77,6 @@ class BaseJobRequest(BaseModel):
7777
...,
7878
description="JSON representing the parameters for the service execution",
7979
)
80+
format: OutputFormatEnum = Field(
81+
..., description="Expected format of the output results"
82+
)

app/services/processing.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ def create_processing_job(
4141

4242
try:
4343
job_id = platform.execute_job(
44-
title=request.title, details=request.service, parameters=request.parameters
44+
title=request.title,
45+
details=request.service,
46+
parameters=request.parameters,
47+
format=request.format,
4548
)
4649
except Exception:
4750
job_id = None
4851
logger.exception(f"Could not create processing job for user {user}")
4952

50-
print(f"JOB IS EQUAL TO {job_id}")
5153
record = ProcessingJobRecord(
5254
title=request.title,
5355
label=request.label,
@@ -58,9 +60,7 @@ def create_processing_job(
5860
service=request.service.model_dump_json(),
5961
upscaling_task_id=upscaling_task_id,
6062
)
61-
print(f"RECORD IS EQUAL TO {record.status}")
6263
record = save_job_to_db(database, record)
63-
print(f"RECORD IS EQUAL TO {record.status}")
6464
return ProcessingJobSummary(
6565
id=record.id,
6666
title=record.title,

app/services/upscaling.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def create_upscaling_processing_jobs(
5252
label=request.label,
5353
service=request.service,
5454
parameters={**request.parameters, request.dimension.name: value},
55+
format=request.format,
5556
),
5657
upscaling_task_id=upscaling_task_id,
5758
)

0 commit comments

Comments
 (0)