Skip to content

Commit ed66aed

Browse files
committed
feat: added dynamic registration of platforms
1 parent 9949663 commit ed66aed

File tree

4 files changed

+90
-37
lines changed

4 files changed

+90
-37
lines changed

app/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
from fastapi import FastAPI
2+
3+
from app.platforms.dispatcher import load_processing_platforms
24
from .config.logger import setup_logging
35
from .config.settings import settings
46
from .routers import jobs_status, unit_jobs, health
57

68
setup_logging()
79

10+
load_processing_platforms()
11+
812
app = FastAPI(
913
title=settings.app_name,
1014
description=settings.app_description,

app/platforms/dispatcher.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,29 @@
11

2+
import importlib
3+
import logging
4+
import app.platforms.implementations
5+
import pkgutil
6+
from typing import Dict, Type
27
from app.platforms.base import BaseProcessingPlatform
3-
from app.platforms.ogc_api_process import OGCAPIProcessPlatform
4-
from app.platforms.openeo import OpenEOPlatform
58
from app.schemas import ProcessType
69

10+
PROCESSING_PLATFORMS: Dict[ProcessType, Type[BaseProcessingPlatform]] = {}
11+
12+
logger = logging.getLogger(__name__)
13+
14+
def load_processing_platforms():
15+
"""Dynamically load all processing platform implementations."""
16+
for _, module_name, _ in pkgutil.iter_modules(app.platforms.implementations.__path__):
17+
importlib.import_module(f"app.platforms.implementations.{module_name}")
18+
19+
def register_processing_platform(service_type: ProcessType, cls: Type[BaseProcessingPlatform]):
20+
""""Register a new processing platform class for a specific service type.
21+
22+
:param service_type: The type of service for which to register the platform.
23+
:param cls: The class that implements BaseProcessingPlatform.
24+
"""
25+
logger.debug(f"Registering processing platform with class {cls} for service type: {service_type}")
26+
PROCESSING_PLATFORMS[service_type] = cls
727

828
def get_processing_platform(service_type: ProcessType) -> BaseProcessingPlatform:
929
"""
@@ -12,9 +32,7 @@ def get_processing_platform(service_type: ProcessType) -> BaseProcessingPlatform
1232
:param service_type: The type of service for which to get the processing platform.
1333
:return: An instance of a class that implements BaseProcessingPlatform.
1434
"""
15-
if service_type == ProcessType.OPENEO:
16-
return OpenEOPlatform()
17-
elif service_type == ProcessType.OGC_API_PROCESS:
18-
return OGCAPIProcessPlatform()
19-
else:
35+
try:
36+
return PROCESSING_PLATFORMS[service_type]()
37+
except KeyError:
2038
raise ValueError(f"Unsupported service type: {service_type}")

app/platforms/ogc_api_process.py renamed to app/platforms/implementations/ogc_api_process.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import logging
33

44
from app.platforms.base import BaseProcessingPlatform
5-
from app.schemas import ProcessingJobSummary, ServiceDetails
5+
from app.platforms.dispatcher import register_processing_platform
6+
from app.schemas import ProcessType, ProcessingJobSummary, ServiceDetails
67

78

89
logger = logging.getLogger(__name__)
@@ -22,4 +23,6 @@ def execute_job(self, title: str, details: ServiceDetails, parameters: dict) ->
2223
:param parameters: The parameters required for the job execution.
2324
:return: A ProcessingJobSummary object containing the job details.
2425
"""
25-
raise NotImplementedError("OGC API Process job execution not implemented yet.")
26+
raise NotImplementedError("OGC API Process job execution not implemented yet.")
27+
28+
register_processing_platform(ProcessType.OGC_API_PROCESS, OGCAPIProcessPlatform)
Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,32 @@
11

22
import logging
33
import os
4-
import openeo
54
import re
6-
75
import urllib
86

7+
import openeo
98
import requests
9+
from dotenv import load_dotenv
1010

1111
from app.platforms.base import BaseProcessingPlatform
12-
from app.schemas import ProcessingJobSummary, ProcessingStatusEnum, ServiceDetails
13-
14-
from dotenv import load_dotenv
12+
from app.platforms.dispatcher import register_processing_platform
13+
from app.schemas import (
14+
ProcessingJobSummary,
15+
ProcessingStatusEnum,
16+
ProcessType,
17+
ServiceDetails,
18+
)
1519

1620
load_dotenv()
17-
1821
logger = logging.getLogger(__name__)
1922

23+
# Constants
24+
BACKEND_AUTH_ENV_MAP = {
25+
"openeo.dataspace.copernicus.eu": "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED",
26+
"openeofed.dataspace.copernicus.eu": "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED",
27+
}
28+
29+
2030
class OpenEOPlatform(BaseProcessingPlatform):
2131
"""
2232
OpenEO processing platform implementation.
@@ -32,8 +42,9 @@ def _setup_connection(self, url: str) -> None:
3242
connection = openeo.connect(url)
3343
provider_id, client_id, client_secret = self._get_client_credentials(url)
3444

35-
# connection.authenticate_oidc_device()
36-
45+
# @TODO: Remove the line below as this is only for local testing
46+
# connection.authenticate_oidc_device()
47+
3748
connection.authenticate_oidc_client_credentials(
3849
provider_id=provider_id,
3950
client_id=client_id,
@@ -49,29 +60,31 @@ def _get_client_credentials(self, url: str) -> tuple[str, str, str]:
4960
:param url: The URL of the OpenEO backend.
5061
:return: A tuple containing provider ID, client ID, and client secret.
5162
"""
52-
auth_env_var = self._get_client_credentials_env_var(url)
53-
if auth_env_var not in os.environ:
54-
raise ValueError(f"Environment variable {auth_env_var} not set.")
63+
env_var = self._get_client_credentials_env_var(url)
64+
credentials_str = os.getenv(env_var)
65+
66+
if not credentials_str:
67+
raise ValueError(f"Environment variable {env_var} not set.")
5568

56-
client_credentials = os.environ[auth_env_var]
57-
return client_credentials.split("/", 2)
69+
parts = credentials_str.split("/", 2)
70+
if len(parts) != 3:
71+
raise ValueError(
72+
f"Invalid client credentials format in {env_var}, expected 'provider_id/client_id/client_secret'."
73+
)
74+
return tuple(parts)
5875

5976
def _get_client_credentials_env_var(self, url: str) -> str:
6077
"""
6178
Get client credentials env var name for a given backend URL.
6279
"""
6380
if not re.match(r"https?://", url):
6481
url = f"https://{url}"
65-
parsed = urllib.parse.urlparse(url)
6682

67-
hostname = parsed.hostname
68-
if hostname in {
69-
"openeo.dataspace.copernicus.eu",
70-
"openeofed.dataspace.copernicus.eu",
71-
}:
72-
return "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED"
73-
else:
74-
raise ValueError(f"Unsupported backend: {url=} ({hostname=})")
83+
hostname = urllib.parse.urlparse(url).hostname
84+
if not hostname or hostname not in BACKEND_AUTH_ENV_MAP:
85+
raise ValueError(f"Unsupported backend: {url} (hostname={hostname})")
86+
87+
return BACKEND_AUTH_ENV_MAP[hostname]
7588

7689
def _get_process_id(self, url: str) -> str:
7790
"""
@@ -81,8 +94,18 @@ def _get_process_id(self, url: str) -> str:
8194
:return: The process ID extracted from the JSON file.
8295
"""
8396
logger.debug(f"Fetching process ID from {url}")
84-
response = requests.get(url).json()
85-
return response.get("id")
97+
try:
98+
response = requests.get(url)
99+
response.raise_for_status()
100+
except requests.RequestException as e:
101+
logger.error(f"Error fetching process ID from {url}: {e}")
102+
raise ValueError(f"Failed to fetch process ID from {url}")
103+
104+
process_id = response.json().get("id")
105+
if not process_id:
106+
raise ValueError(f"No 'id' field found in process definition at {url}")
107+
108+
return process_id
86109

87110
def execute_job(self, title: str, details: ServiceDetails , parameters: dict) -> ProcessingJobSummary:
88111
"""
@@ -96,10 +119,12 @@ def execute_job(self, title: str, details: ServiceDetails , parameters: dict) ->
96119

97120
try:
98121
process_id = self._get_process_id(details.application)
99-
if not process_id:
100-
raise ValueError(f"Process ID not found for service: {details.service}")
101122

102-
logger.debug(f"Executing OpenEO job with title={title}, service={details}, process_id={process_id} and parameters={parameters}")
123+
logger.debug(
124+
f"Executing OpenEO job with title={title}, service={details}, "
125+
f"process_id={process_id}, parameters={parameters}"
126+
)
127+
103128
connection = self._setup_connection(details.service)
104129
service = connection.datacube_from_process(
105130
process_id=process_id,
@@ -116,4 +141,7 @@ def execute_job(self, title: str, details: ServiceDetails , parameters: dict) ->
116141
)
117142
except Exception as e:
118143
logger.exception(f"Failed to execute openEO job: {e}")
119-
raise Exception("Failed to execute openEO job")
144+
raise SystemError("Failed to execute openEO job")
145+
146+
147+
register_processing_platform(ProcessType.OPENEO, OpenEOPlatform)

0 commit comments

Comments
 (0)