Skip to content

Commit c736d30

Browse files
committed
feat: added support for token-based auth and token exchange in openeo
1 parent d141f6b commit c736d30

File tree

19 files changed

+1322
-311
lines changed

19 files changed

+1322
-311
lines changed

app/auth.py

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any, Dict
2+
import httpx
13
import jwt
24
from fastapi import Depends, HTTPException, WebSocket, status
35
from fastapi.security import OAuth2AuthorizationCodeBearer
@@ -49,7 +51,7 @@ def get_current_user_id(token: str = Depends(oauth2_scheme)):
4951
async def websocket_authenticate(websocket: WebSocket) -> str | None:
5052
"""
5153
Authenticate a WebSocket connection using a JWT token from query params.
52-
Returns the ID of the authenticated user payload if valid, otherwise closes the connection.
54+
Returns the token of the authenticated user payload if valid, otherwise closes the connection.
5355
"""
5456
logger.debug("Authenticating websocket")
5557
token = websocket.query_params.get("token")
@@ -59,10 +61,82 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None:
5961
return None
6062

6163
try:
62-
user_id = get_current_user_id(token)
6364
await websocket.accept()
64-
return user_id
65+
return token
6566
except Exception as e:
6667
logger.error(f"Invalid token in websocket authentication: {e}")
6768
await websocket.close(code=1008, reason="Invalid token")
6869
return None
70+
71+
72+
async def exchange_token_for_provider(
73+
initial_token: str, provider: str
74+
) -> Dict[str, Any]:
75+
"""
76+
Exchange a Keycloak access token for a token/audience targeted at `provider`
77+
using the Keycloak Token Exchange (grant_type=urn:ietf:params:oauth:grant-type:token-exchange).
78+
79+
:param initial_token: token obtained from the client (Bearer token)
80+
:param provider: target provider name or client_id.
81+
82+
:return: The token response (dict) on success.
83+
84+
:raise: Raises HTTPException with an appropriate status and message on error.
85+
"""
86+
token_url = f"{KEYCLOAK_BASE_URL}/protocol/openid-connect/token"
87+
88+
# Check if the necessary settings are in place
89+
if not settings.keycloak_client_id or not settings.keycloak_client_secret:
90+
raise HTTPException(
91+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
92+
detail="Token exchange not configured on the server (missing client credentials).",
93+
)
94+
95+
payload = {
96+
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
97+
"client_id": settings.keycloak_client_id,
98+
"client_secret": settings.keycloak_client_secret,
99+
"subject_token": initial_token,
100+
"requested_issuer": provider,
101+
}
102+
103+
try:
104+
async with httpx.AsyncClient(timeout=10.0) as client:
105+
resp = await client.post(token_url, data=payload)
106+
except httpx.RequestError as exc:
107+
logger.error(f"Token exchange network error for provider={provider}: {exc}")
108+
raise HTTPException(
109+
status_code=status.HTTP_502_BAD_GATEWAY,
110+
detail="Failed to contact the identity provider for token exchange.",
111+
)
112+
113+
# Parse response
114+
try:
115+
body = resp.json()
116+
except ValueError:
117+
logger.error(
118+
f"Token exchange invalid JSON response (status={resp.status_code})"
119+
)
120+
raise HTTPException(
121+
status_code=status.HTTP_502_BAD_GATEWAY,
122+
detail="Invalid response from identity provider during token exchange.",
123+
)
124+
125+
if resp.status_code != 200:
126+
# Keycloak returns error and error_description fields for token errors
127+
err = body.get("error_description") or body.get("error") or resp.text
128+
logger.error(
129+
"Token exchange failed",
130+
extra={"provider": provider, "status": resp.status_code, "error": err},
131+
)
132+
# Map common upstream statuses to meaningful client statuses
133+
client_status = (
134+
status.HTTP_401_UNAUTHORIZED
135+
if resp.status_code in (400, 401, 403)
136+
else status.HTTP_502_BAD_GATEWAY
137+
)
138+
139+
raise HTTPException(client_status, detail=body)
140+
141+
# Successful exchange, return token response (access_token, expires_in, etc.)
142+
return body

app/config/settings.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,11 @@ class Settings(BaseSettings):
3535
extra="allow",
3636
)
3737

38+
# openEO
39+
openeo_enable_user_credentials: bool = Field(
40+
default=False,
41+
json_schema_extra={"env": "OPENEO_ENABLE_USER_CREDENTIALS"},
42+
)
43+
3844

3945
settings = Settings()

app/platforms/base.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@ class BaseProcessingPlatform(ABC):
1313
"""
1414

1515
@abstractmethod
16-
def execute_job(
17-
self, title: str, details: ServiceDetails, parameters: dict, format: OutputFormatEnum
16+
async def execute_job(
17+
self,
18+
user_token: str,
19+
title: str,
20+
details: ServiceDetails,
21+
parameters: dict,
22+
format: OutputFormatEnum,
1823
) -> str:
1924
"""
2025
Execute a processing job on the platform with the given service ID and parameters.
2126
27+
:param user_token: The access token of the user executing the job.
2228
:param title: The title of the job to be executed.
2329
:param details: The service details containing the service ID and application.
2430
:param parameters: The parameters required for the job execution.
@@ -28,23 +34,27 @@ def execute_job(
2834
pass
2935

3036
@abstractmethod
31-
def get_job_status(
32-
self, job_id: str, details: ServiceDetails
37+
async def get_job_status(
38+
self, user_token: str, job_id: str, details: ServiceDetails
3339
) -> ProcessingStatusEnum:
3440
"""
3541
Retrieve the job status of a processing job that is running on the platform.
3642
43+
:param user_token: The access token of the user executing the job.
3744
:param job_id: The ID of the job on the platform
3845
:param details: The service details containing the service ID and application.
3946
:return: Return the processing status
4047
"""
4148
pass
4249

4350
@abstractmethod
44-
def get_job_results(self, job_id: str, details: ServiceDetails) -> Collection:
51+
async def get_job_results(
52+
self, user_token: str, job_id: str, details: ServiceDetails
53+
) -> Collection:
4554
"""
4655
Retrieve the job results of a processing job that is running on the platform.
4756
57+
:param user_token: The access token of the user executing the job.
4858
:param job_id: The ID of the job on the platform
4959
:param details: The service details containing the service ID and application.
5060
:return: STAC collection representing the results.

app/platforms/implementations/ogc_api_process.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,26 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform):
1212
This class handles the execution of processing jobs on the OGC API Process platform.
1313
"""
1414

15-
def execute_job(
16-
self, title: str, details: ServiceDetails, parameters: dict, format: OutputFormatEnum
15+
async def execute_job(
16+
self,
17+
user_token: str,
18+
title: str,
19+
details: ServiceDetails,
20+
parameters: dict,
21+
format: OutputFormatEnum,
1722
) -> str:
1823
raise NotImplementedError("OGC API Process job execution not implemented yet.")
1924

20-
def get_job_status(
21-
self, job_id: str, details: ServiceDetails
25+
async def get_job_status(
26+
self, user_token: str, job_id: str, details: ServiceDetails
2227
) -> ProcessingStatusEnum:
2328
raise NotImplementedError(
2429
"OGC API Process job status retrieval not implemented yet."
2530
)
2631

27-
def get_job_results(self, job_id: str, details: ServiceDetails) -> Collection:
32+
async def get_job_results(
33+
self, user_token: str, job_id: str, details: ServiceDetails
34+
) -> Collection:
2835
raise NotImplementedError(
2936
"OGC API Process job result retrieval not implemented yet."
3037
)

0 commit comments

Comments
 (0)