Skip to content

Commit 820eb04

Browse files
authored
feat: integration of job status update flow (#4)
1 parent b8a34e8 commit 820eb04

File tree

14 files changed

+387
-72
lines changed

14 files changed

+387
-72
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Update of processing jobs
2+
3+
Revision ID: dbc9c2bd9a57
4+
Revises: b3ba8db2adef
5+
Create Date: 2025-08-13 18:00:40.408757
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'dbc9c2bd9a57'
16+
down_revision: Union[str, Sequence[str], None] = 'b3ba8db2adef'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.alter_column('processing_jobs', 'platform_job_id',
25+
existing_type=sa.VARCHAR(),
26+
nullable=False)
27+
op.alter_column('processing_jobs', 'parameters',
28+
existing_type=sa.VARCHAR(),
29+
nullable=False)
30+
op.alter_column('processing_jobs', 'service_record',
31+
existing_type=sa.VARCHAR(),
32+
nullable=False)
33+
# ### end Alembic commands ###
34+
35+
36+
def downgrade() -> None:
37+
"""Downgrade schema."""
38+
# ### commands auto generated by Alembic - please adjust! ###
39+
op.alter_column('processing_jobs', 'service_record',
40+
existing_type=sa.VARCHAR(),
41+
nullable=True)
42+
op.alter_column('processing_jobs', 'parameters',
43+
existing_type=sa.VARCHAR(),
44+
nullable=True)
45+
op.alter_column('processing_jobs', 'platform_job_id',
46+
existing_type=sa.VARCHAR(),
47+
nullable=True)
48+
# ### end Alembic commands ###

app/database/models/processing_job.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ class ProcessingJobRecord(Base):
2323
Enum(ProcessingStatusEnum), index=True
2424
)
2525
user_id: Mapped[str] = mapped_column(String, index=True)
26-
platform_job_id: Mapped[Optional[str]] = mapped_column(String, index=True)
27-
parameters: Mapped[Optional[str]] = mapped_column(String, index=False)
26+
platform_job_id: Mapped[str] = mapped_column(String, index=True)
27+
parameters: Mapped[str] = mapped_column(String, index=False)
2828
result_link: Mapped[Optional[str]] = mapped_column(String, index=False)
29-
service_record: Mapped[Optional[str]] = mapped_column(String, index=True)
29+
service_record: Mapped[str] = mapped_column(String, index=True)
3030
created: Mapped[datetime.datetime] = mapped_column(
3131
DateTime, default=datetime.datetime.utcnow, index=True
3232
)
@@ -63,6 +63,15 @@ def get_jobs_by_user_id(database: Session, user_id: str) -> List[ProcessingJobRe
6363
)
6464

6565

66+
def get_job_by_id(database: Session, job_id: int) -> Optional[ProcessingJobRecord]:
67+
logger.info(f"Retrieving processing job with ID {job_id}")
68+
return (
69+
database.query(ProcessingJobRecord)
70+
.filter(ProcessingJobRecord.id == job_id)
71+
.first()
72+
)
73+
74+
6675
def get_job_by_user_id(
6776
database: Session, job_id: int, user_id: str
6877
) -> Optional[ProcessingJobRecord]:
@@ -74,3 +83,19 @@ def get_job_by_user_id(
7483
)
7584
.first()
7685
)
86+
87+
88+
def update_job_status_by_id(
89+
database: Session, job_id: int, status: ProcessingStatusEnum
90+
):
91+
logger.info(f"Updating the status of processing job with ID {job_id} to {status}")
92+
job = get_job_by_id(database, job_id)
93+
94+
if job:
95+
job.status = status
96+
database.commit()
97+
database.refresh(job)
98+
else:
99+
logger.warning(
100+
f"Could not update job status of job {job_id} as it could not be found in the database"
101+
)

app/platforms/base.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from abc import ABC, abstractmethod
22

3-
from app.schemas import ServiceDetails
3+
from app.schemas import ProcessingStatusEnum, ServiceDetails
44

55

66
class BaseProcessingPlatform(ABC):
@@ -20,3 +20,16 @@ def execute_job(self, title: str, details: ServiceDetails, parameters: dict) ->
2020
:return: Return the ID of the job that was created
2121
"""
2222
pass
23+
24+
@abstractmethod
25+
def get_job_status(
26+
self, job_id: str, details: ServiceDetails
27+
) -> ProcessingStatusEnum:
28+
"""
29+
Retrieve the job status of a processing job that is running on the platform.
30+
31+
:param job_id: The ID of the job on the platform
32+
:param details: The service details containing the service ID and application.
33+
:return: Return the processing status
34+
"""
35+
pass

app/platforms/implementations/ogc_api_process.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from app.platforms.base import BaseProcessingPlatform
44
from app.platforms.dispatcher import register_processing_platform
5-
from app.schemas import ProcessTypeEnum, ServiceDetails
5+
from app.schemas import ProcessTypeEnum, ProcessingStatusEnum, ServiceDetails
66

77
logger = logging.getLogger(__name__)
88

@@ -14,16 +14,14 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform):
1414
"""
1515

1616
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> str:
17-
"""
18-
Execute a processing job on the platform with the given service ID and parameters.
19-
20-
:param title: The title of the job to be executed.
21-
:param details: The service details containing the service ID and application.
22-
:param parameters: The parameters required for the job execution.
23-
:return: Return the ID of the job that was created
24-
"""
25-
2617
raise NotImplementedError("OGC API Process job execution not implemented yet.")
2718

19+
def get_job_status(
20+
self, job_id: str, details: ServiceDetails
21+
) -> ProcessingStatusEnum:
22+
raise NotImplementedError(
23+
"OGC API Process job status retrieval not implemented yet."
24+
)
25+
2826

2927
register_processing_platform(ProcessTypeEnum.OGC_API_PROCESS, OGCAPIProcessPlatform)

app/platforms/implementations/openeo.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from app.platforms.dispatcher import register_processing_platform
1212
from app.schemas import (
1313
ProcessTypeEnum,
14+
ProcessingStatusEnum,
1415
ServiceDetails,
1516
)
1617

@@ -104,15 +105,6 @@ def _get_process_id(self, url: str) -> str:
104105
return process_id
105106

106107
def execute_job(self, title: str, details: ServiceDetails, parameters: dict) -> str:
107-
"""
108-
Execute a processing job on the platform with the given service ID and parameters.
109-
110-
:param title: The title of the job to be executed.
111-
:param details: The service details containing the service ID and application.
112-
:param parameters: The parameters required for the job execution.
113-
:return: Return the ID of the job that was created
114-
"""
115-
116108
try:
117109
process_id = self._get_process_id(details.application)
118110

@@ -130,8 +122,47 @@ def execute_job(self, title: str, details: ServiceDetails, parameters: dict) ->
130122

131123
return job.job_id
132124
except Exception as e:
133-
logger.exception(f"Failed to execute openEO job: {e}")
134-
raise SystemError("Failed to execute openEO job")
125+
logger.exception("Failed to execute openEO job")
126+
raise SystemError("Failed to execute openEO job") from e
127+
128+
def _map_openeo_status(self, status: str) -> ProcessingStatusEnum:
129+
"""
130+
Map the status returned by openEO to a status known within the API.
131+
132+
:param status: Status text returned by openEO.
133+
:return: ProcessingStatusEnum corresponding to the input.
134+
"""
135+
136+
logger.debug("Mapping openEO status %r to ProcessingStatusEnum", status)
137+
138+
mapping = {
139+
"created": ProcessingStatusEnum.CREATED,
140+
"queued": ProcessingStatusEnum.QUEUED,
141+
"running": ProcessingStatusEnum.RUNNING,
142+
"cancelled": ProcessingStatusEnum.CANCELED,
143+
"finished": ProcessingStatusEnum.FINISHED,
144+
"error": ProcessingStatusEnum.FAILED,
145+
}
146+
147+
try:
148+
return mapping[status.lower()]
149+
except (AttributeError, KeyError):
150+
logger.warning("Mapping of unknown openEO status: %r", status)
151+
return ProcessingStatusEnum.UNKNOWN
152+
153+
def get_job_status(
154+
self, job_id: str, details: ServiceDetails
155+
) -> ProcessingStatusEnum:
156+
try:
157+
logger.debug(f"Fetching job status for openEO job with ID {job_id}")
158+
connection = self._setup_connection(details.service)
159+
job = connection.job(job_id)
160+
return self._map_openeo_status(job.status())
161+
except Exception as e:
162+
logger.exception(f"Failed to fetch status for openEO job with ID {job_id}")
163+
raise SystemError(
164+
f"Failed to fetch status openEO job with ID {job_id}"
165+
) from e
135166

136167

137168
register_processing_platform(ProcessTypeEnum.OPENEO, OpenEOPlatform)

app/routers/jobs_status.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import asyncio
12
import logging
23

3-
from fastapi import APIRouter, Depends
4+
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
45
from sqlalchemy.orm import Session
56

67
from app.database.db import get_db
@@ -16,7 +17,7 @@
1617
tags=["Upscale Tasks", "Unit Jobs"],
1718
summary="Get a list of all upscaling tasks & processing jobs for the authenticated user",
1819
)
19-
async def jobs_status(
20+
async def get_jobs_status(
2021
db: Session = Depends(get_db),
2122
user: str = "foobar",
2223
) -> JobsStatusResponse:
@@ -29,3 +30,33 @@ async def jobs_status(
2930
return JobsStatusResponse(
3031
upscaling_tasks=[], processing_jobs=get_processing_jobs_by_user_id(db, user)
3132
)
33+
34+
35+
@router.websocket(
36+
"/ws/jobs_status",
37+
)
38+
async def ws_jobs_status(
39+
websocket: WebSocket, user: str = "foobar", interval: int = 10
40+
):
41+
"""
42+
Return combined list of upscaling tasks and processing jobs for the authenticated user.
43+
"""
44+
45+
await websocket.accept()
46+
logger.debug(f"WebSocket connected for user {user}")
47+
48+
db = next(get_db())
49+
try:
50+
while True:
51+
status = await get_jobs_status(db, user)
52+
await websocket.send_json(status.model_dump())
53+
54+
await asyncio.sleep(interval)
55+
56+
except WebSocketDisconnect:
57+
logger.info(f"WebSocket disconnected for user {user}")
58+
except Exception as e:
59+
logger.exception(f"Error in jobs_status_ws: {e}")
60+
await websocket.close(code=1011, reason="Error in job status websocket: {e}")
61+
finally:
62+
db.close()

app/schemas.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ class ProcessTypeEnum(str, Enum):
1111

1212
class ProcessingStatusEnum(str, Enum):
1313
CREATED = "created"
14+
QUEUED = "queued"
1415
RUNNING = "running"
1516
FINISHED = "finished"
17+
CANCELED = "canceled"
1618
FAILED = "failed"
19+
UNKNOWN = "unknown"
1720

1821

1922
# class TileRequest(BaseModel):

app/services/processing.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
get_job_by_user_id,
77
get_jobs_by_user_id,
88
save_job_to_db,
9+
update_job_status_by_id,
910
)
1011
from app.platforms.dispatcher import get_processing_platform
1112
from app.schemas import (
@@ -40,28 +41,50 @@ def create_processing_job(
4041
platform_job_id=job_id,
4142
parameters=json.dumps(summary.parameters),
4243
result_link=None,
43-
service_record=json.dumps(
44-
summary.service.model_dump_json()
45-
), # Assuming service is a dict
44+
service_record=summary.service.model_dump_json(),
4645
)
4746
record = save_job_to_db(database, record)
4847
return ProcessingJobSummary(
4948
id=record.id, title=record.title, label=summary.label, status=record.status
5049
)
5150

5251

52+
def get_job_status(job: ProcessingJobRecord) -> ProcessingStatusEnum:
53+
logger.info(f"Retrieving job status for job: {job.platform_job_id}")
54+
platform = get_processing_platform(job.label)
55+
details = ServiceDetails.model_validate_json(job.service_record)
56+
return platform.get_job_status(job.platform_job_id, details)
57+
58+
5359
def get_processing_jobs_by_user_id(
5460
database: Session, user_id: str
5561
) -> List[ProcessingJobSummary]:
5662
logger.info(f"Retrieving processing jobs for user {user_id}")
57-
return list(
58-
map(
59-
lambda x: ProcessingJobSummary(
60-
id=x.id, title=x.title, label=x.label, status=x.status
61-
),
62-
get_jobs_by_user_id(database, user_id),
63+
64+
jobs: List[ProcessingJobSummary] = []
65+
records = get_jobs_by_user_id(database, user_id)
66+
67+
inactive_statuses = {
68+
ProcessingStatusEnum.CANCELED,
69+
ProcessingStatusEnum.FAILED,
70+
ProcessingStatusEnum.FINISHED,
71+
}
72+
73+
for record in records:
74+
if record.status not in inactive_statuses:
75+
latest_status = get_job_status(record)
76+
if latest_status != record.status:
77+
update_job_status_by_id(database, record.id, latest_status)
78+
status = latest_status
79+
else:
80+
status = record.status
81+
82+
jobs.append(
83+
ProcessingJobSummary(
84+
id=record.id, title=record.title, label=record.label, status=status
85+
)
6386
)
64-
)
87+
return jobs
6588

6689

6790
def get_processing_job_by_user_id(

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pydantic
1010
pydantic-settings
1111
pydantic_core
1212
pytest
13+
pytest-asyncio
1314
pytest-cov
1415
python-dotenv
1516
requests

0 commit comments

Comments
 (0)