Skip to content

Commit ef60993

Browse files
committed
openeo: added poc implementation for executing a job
1 parent 1b0594c commit ef60993

File tree

6 files changed

+116
-18
lines changed

6 files changed

+116
-18
lines changed

app/config/logger.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
"uvicorn.error": {"level": "INFO"},
2626
"uvicorn.access": {"level": "INFO"},
2727
# custom API loggers
28-
"routers": {"level": "DEBUG"}, # all your routers
29-
"services": {"level": "DEBUG"}, # all your services
28+
"app.routers": {"level": "DEBUG"}, # all your routers
29+
"app.services": {"level": "DEBUG"}, # all your services
30+
"app.platforms": {"level": "DEBUG"}, # all platform implementations
3031
},
3132
}
3233

app/platforms/base.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from abc import ABC, abstractmethod
22

3-
from app.schemas import ProcessingJobSummary
3+
from app.schemas import ProcessingJobSummary, ServiceDetails
44

55

66
class BaseProcessingPlatform(ABC):
@@ -10,11 +10,12 @@ class BaseProcessingPlatform(ABC):
1010
"""
1111

1212
@abstractmethod
13-
def execute_job(self, service_id: str, parameters: dict) -> ProcessingJobSummary:
13+
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> ProcessingJobSummary:
1414
"""
1515
Execute a processing job on the platform with the given service ID and parameters.
1616
17-
:param service_id: The ID of the service to execute.
17+
:param title: The title of the job to be executed.
18+
:param details: The service details containing the service ID and application.
1819
:param parameters: The parameters required for the job execution.
1920
:return: A ProcessingJobSummary object containing the job details.
2021
"""

app/platforms/ogc_api_process.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33

44
from app.platforms.base import BaseProcessingPlatform
5-
from app.schemas import ProcessingJobSummary
5+
from app.schemas import ProcessingJobSummary, ServiceDetails
66

77

88
logger = logging.getLogger(__name__)
@@ -13,13 +13,13 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform):
1313
This class handles the execution of processing jobs on the OGC API Process platform.
1414
"""
1515

16-
def execute_job(self, service_id: str, parameters: dict) -> ProcessingJobSummary:
16+
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> ProcessingJobSummary:
1717
"""
1818
Execute a processing job on the OGC API Process platform with the given service ID and parameters.
1919
20-
:param service_id: The ID of the service to execute.
20+
:param title: The title of the job to be executed.
21+
:param details: The service details containing the service ID and application.
2122
:param parameters: The parameters required for the job execution.
2223
:return: A ProcessingJobSummary object containing the job details.
2324
"""
24-
logger.debug(f"Executing OGC API Process job with service_id={service_id} and parameters={parameters}")
2525
raise NotImplementedError("OGC API Process job execution not implemented yet.")

app/platforms/openeo.py

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11

22
import logging
3+
import os
4+
import openeo
5+
import re
6+
7+
import urllib
8+
9+
import requests
310

411
from app.platforms.base import BaseProcessingPlatform
5-
from app.schemas import ProcessingJobSummary
12+
from app.schemas import ProcessingJobSummary, ProcessingStatusEnum, ServiceDetails
13+
14+
from dotenv import load_dotenv
615

16+
load_dotenv()
717

818
logger = logging.getLogger(__name__)
919

@@ -12,14 +22,98 @@ class OpenEOPlatform(BaseProcessingPlatform):
1222
OpenEO processing platform implementation.
1323
This class handles the execution of processing jobs on the OpenEO platform.
1424
"""
25+
26+
def _setup_connection(self, url: str) -> None:
27+
"""
28+
Setup the connection to the OpenEO backend.
29+
This method can be used to initialize any required client or session.
30+
"""
31+
logger.debug(f"Setting up OpenEO connection to {url}")
32+
connection = openeo.connect(url)
33+
provider_id, client_id, client_secret = self._get_client_credentials(url)
34+
35+
connection.authenticate_oidc_device()
36+
37+
# connection.authenticate_oidc_client_credentials(
38+
# provider_id=provider_id,
39+
# client_id=client_id,
40+
# client_secret=client_secret,
41+
# )
42+
return connection
43+
44+
def _get_client_credentials(self, url: str) -> tuple[str, str, str]:
45+
"""
46+
Get client credentials for the OpenEO backend.
47+
This method retrieves the client credentials from environment variables.
48+
49+
:param url: The URL of the OpenEO backend.
50+
:return: A tuple containing provider ID, client ID, and client secret.
51+
"""
52+
auth_env_var = self._get_client_credentials_env_var(url)
53+
if auth_env_var not in os.environ:
54+
raise ValueError(f"Environment variable {auth_env_var} not set.")
55+
56+
client_credentials = os.environ[auth_env_var]
57+
return client_credentials.split("/", 2)
58+
59+
def _get_client_credentials_env_var(self, url: str) -> str:
60+
"""
61+
Get client credentials env var name for a given backend URL.
62+
"""
63+
if not re.match(r"https?://", url):
64+
url = f"https://{url}"
65+
parsed = urllib.parse.urlparse(url)
66+
67+
hostname = parsed.hostname
68+
if hostname in {
69+
"openeo.dataspace.copernicus.eu",
70+
"openeofed.dataspace.copernicus.eu",
71+
}:
72+
return "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED"
73+
else:
74+
raise ValueError(f"Unsupported backend: {url=} ({hostname=})")
75+
76+
def _get_process_id(self, url: str) -> str:
77+
"""
78+
Get the process ID from a JSON file hosted at the given URL.
79+
80+
:param url: The URL of the JSON file.
81+
:return: The process ID extracted from the JSON file.
82+
"""
83+
logger.debug(f"Fetching process ID from {url}")
84+
response = requests.get(url).json()
85+
return response.get("id")
1586

16-
def execute_job(self, service_id: str, parameters: dict) -> ProcessingJobSummary:
87+
def execute_job(self, title: str, details: ServiceDetails , parameters: dict) -> ProcessingJobSummary:
1788
"""
1889
Execute a processing job on the OpenEO platform with the given service ID and parameters.
1990
20-
:param service_id: The ID of the service to execute.
91+
:param title: The title of the job to be executed.
92+
:param details: The service details containing the service ID and application.
2193
:param parameters: The parameters required for the job execution.
2294
:return: A ProcessingJobSummary object containing the job details.
2395
"""
24-
logger.debug(f"Executing OpenEO job with service_id={service_id} and parameters={parameters}")
25-
raise NotImplementedError("OpenEO job execution not implemented yet.")
96+
97+
try:
98+
process_id = self._get_process_id(details.application)
99+
if not process_id:
100+
raise ValueError(f"Process ID not found for service: {details.service}")
101+
102+
logger.debug(f"Executing OpenEO job with title={title}, service={details}, process_id={process_id} and parameters={parameters}")
103+
connection = self._setup_connection(details.service)
104+
service = connection.datacube_from_process(
105+
process_id=process_id,
106+
namespace=details.application,
107+
**parameters
108+
)
109+
job = service.create_job(title=title)
110+
job.start()
111+
112+
return ProcessingJobSummary(
113+
id=job.job_id,
114+
title=title,
115+
status=ProcessingStatusEnum.CREATED
116+
)
117+
except Exception as e:
118+
logger.exception(f"Failed to execute openEO job: {e}")
119+
raise Exception("Failed to execute openEO job")

app/schemas.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ class ProcessingStatusEnum(str, Enum):
2828
# tiles: List[Tile] = []
2929

3030

31-
# Service / parameters
31+
# Service parameters
3232
class ServiceDetails(BaseModel):
33-
id: str
33+
service: str
34+
application: str
3435

3536

3637

@@ -44,7 +45,7 @@ class ServiceDetails(BaseModel):
4445

4546

4647
class ProcessingJobSummary(BaseModel):
47-
id: int
48+
id: str
4849
title: str
4950
status: ProcessingStatusEnum
5051

app/services/processing.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def create_processing_job(summary: BaseJobRequest) -> ProcessingJobSummary:
1212
platform = get_processing_platform(summary.label)
1313

1414
return platform.execute_job(
15-
service_id=summary.service.id,
15+
title=summary.title,
16+
details=summary.service,
1617
parameters=summary.parameters
1718
)

0 commit comments

Comments
 (0)