Skip to content

Commit 6125e1e

Browse files
authored
feat: support for upscaling endpoints (#11)
* docs: updated examples * docs: updated tiles example * docs: fixed wording on unit_jobs schema * feat: setup of upscale task endpoints * feat: added saving of upscaling tasks to db * chore: import cleanup * chore: cleanup imports * feat: integrated status retrieval f or upscaling tasks * feat: integration of status retrieval * feat: added more details to the job summary to prevent sending too many requests * test: fixed tests * fix: linting issues
1 parent fe082b2 commit 6125e1e

23 files changed

+1545
-120
lines changed

alembic/env.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
from alembic import context
88

99
from app.database.db import Base # import your Base here
10-
from app.database.models.processing_job import ProcessingJobRecord # import your models here
10+
from app.database.models.processing_job import (
11+
ProcessingJobRecord,
12+
)
13+
from app.database.models.upscaling_task import UpscalingTaskRecord
1114

1215

1316
# this is the Alembic Config object, which provides
@@ -71,13 +74,11 @@ def run_migrations_online() -> None:
7174
config.get_section(config.config_ini_section, {}),
7275
prefix="sqlalchemy.",
7376
poolclass=pool.NullPool,
74-
url=get_url()
77+
url=get_url(),
7578
)
7679

7780
with connectable.connect() as connection:
78-
context.configure(
79-
connection=connection, target_metadata=target_metadata
80-
)
81+
context.configure(connection=connection, target_metadata=target_metadata)
8182

8283
with context.begin_transaction():
8384
context.run_migrations()
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""creation of upscaling tables
2+
3+
Revision ID: 046ada00e750
4+
Revises: 6105be2a2b21
5+
Create Date: 2025-08-22 13:22:24.266047
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
import sqlalchemy as sa
13+
14+
from sqlalchemy.dialects.postgresql import ENUM as PG_ENUM
15+
16+
17+
# revision identifiers, used by Alembic.
18+
revision: str = "046ada00e750"
19+
down_revision: Union[str, Sequence[str], None] = "6105be2a2b21"
20+
branch_labels: Union[str, Sequence[str], None] = None
21+
depends_on: Union[str, Sequence[str], None] = None
22+
23+
process_type_enum = PG_ENUM(
24+
"OPENEO", "OGC_API_PROCESS", name="processtypeenum", create_type=False
25+
)
26+
27+
processing_status_enum = PG_ENUM(
28+
"CREATED",
29+
"QUEUED",
30+
"RUNNING",
31+
"FINISHED",
32+
"CANCELED",
33+
"FAILED",
34+
"UNKNOWN",
35+
name="processingstatusenum",
36+
create_type=False,
37+
)
38+
39+
40+
def upgrade() -> None:
41+
"""Upgrade schema."""
42+
# ### commands auto generated by Alembic - please adjust! ###
43+
op.create_table(
44+
"upscaling_tasks",
45+
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
46+
sa.Column("title", sa.String(), nullable=False),
47+
sa.Column("label", process_type_enum, nullable=False),
48+
sa.Column("status", processing_status_enum, nullable=False),
49+
sa.Column("user_id", sa.String(), nullable=False),
50+
sa.Column("service", sa.String(), nullable=False),
51+
sa.Column("created", sa.DateTime(), nullable=False),
52+
sa.Column("updated", sa.DateTime(), nullable=False),
53+
sa.PrimaryKeyConstraint("id"),
54+
)
55+
op.create_index(
56+
op.f("ix_upscaling_tasks_created"), "upscaling_tasks", ["created"], unique=False
57+
)
58+
op.create_index(
59+
op.f("ix_upscaling_tasks_id"), "upscaling_tasks", ["id"], unique=False
60+
)
61+
op.create_index(
62+
op.f("ix_upscaling_tasks_label"), "upscaling_tasks", ["label"], unique=False
63+
)
64+
op.create_index(
65+
op.f("ix_upscaling_tasks_service"), "upscaling_tasks", ["service"], unique=False
66+
)
67+
op.create_index(
68+
op.f("ix_upscaling_tasks_status"), "upscaling_tasks", ["status"], unique=False
69+
)
70+
op.create_index(
71+
op.f("ix_upscaling_tasks_title"), "upscaling_tasks", ["title"], unique=False
72+
)
73+
op.create_index(
74+
op.f("ix_upscaling_tasks_updated"), "upscaling_tasks", ["updated"], unique=False
75+
)
76+
op.create_index(
77+
op.f("ix_upscaling_tasks_user_id"), "upscaling_tasks", ["user_id"], unique=False
78+
)
79+
op.add_column(
80+
"processing_jobs", sa.Column("upscaling_task_id", sa.Integer(), nullable=True)
81+
)
82+
op.create_foreign_key(
83+
None,
84+
"processing_jobs",
85+
"upscaling_tasks",
86+
["upscaling_task_id"],
87+
["id"],
88+
ondelete="SET NULL",
89+
)
90+
# ### end Alembic commands ###
91+
92+
93+
def downgrade() -> None:
94+
"""Downgrade schema."""
95+
# ### commands auto generated by Alembic - please adjust! ###
96+
op.drop_constraint(None, "processing_jobs", type_="foreignkey")
97+
op.drop_column("processing_jobs", "upscaling_task_id")
98+
op.drop_index(op.f("ix_upscaling_tasks_user_id"), table_name="upscaling_tasks")
99+
op.drop_index(op.f("ix_upscaling_tasks_updated"), table_name="upscaling_tasks")
100+
op.drop_index(op.f("ix_upscaling_tasks_title"), table_name="upscaling_tasks")
101+
op.drop_index(op.f("ix_upscaling_tasks_status"), table_name="upscaling_tasks")
102+
op.drop_index(op.f("ix_upscaling_tasks_service"), table_name="upscaling_tasks")
103+
op.drop_index(op.f("ix_upscaling_tasks_label"), table_name="upscaling_tasks")
104+
op.drop_index(op.f("ix_upscaling_tasks_id"), table_name="upscaling_tasks")
105+
op.drop_index(op.f("ix_upscaling_tasks_created"), table_name="upscaling_tasks")
106+
op.drop_table("upscaling_tasks")
107+
# ### end Alembic commands ###

app/database/models/processing_job.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import datetime
22
from typing import List, Optional
3+
34
from loguru import logger
4-
from sqlalchemy import DateTime, Enum, Integer, String
5-
from app.database.db import Base
6-
from sqlalchemy.orm import Session, Mapped, mapped_column
5+
from sqlalchemy import DateTime, Enum, ForeignKey, Integer, String
6+
from sqlalchemy.orm import Mapped, Session, mapped_column
77

8-
from app.schemas.unit_job import ProcessTypeEnum, ProcessingStatusEnum
8+
from app.database.db import Base
9+
from app.schemas.unit_job import ProcessingStatusEnum, ProcessTypeEnum
910

1011

1112
class ProcessingJobRecord(Base):
@@ -34,6 +35,12 @@ class ProcessingJobRecord(Base):
3435
index=True,
3536
)
3637

38+
upscaling_task_id: Mapped[int | None] = mapped_column(
39+
Integer,
40+
ForeignKey("upscaling_tasks.id", ondelete="SET NULL"),
41+
nullable=True,
42+
)
43+
3744

3845
def save_job_to_db(
3946
db_session: Session, job: ProcessingJobRecord
@@ -51,11 +58,22 @@ def save_job_to_db(
5158
return job
5259

5360

54-
def get_jobs_by_user_id(database: Session, user_id: str) -> List[ProcessingJobRecord]:
55-
logger.info(f"Retrieving all processing jobs for user {user_id}")
61+
def get_jobs_by_user_id(
62+
database: Session, user_id: str, upscaling_task_id: Optional[int]
63+
) -> List[ProcessingJobRecord]:
64+
logger.info(
65+
f"Retrieving all processing jobs for user {user_id} for upscaling task {upscaling_task_id}"
66+
)
5667
return (
5768
database.query(ProcessingJobRecord)
58-
.filter(ProcessingJobRecord.user_id == user_id)
69+
.filter(
70+
ProcessingJobRecord.user_id == user_id,
71+
(
72+
ProcessingJobRecord.upscaling_task_id == upscaling_task_id
73+
if upscaling_task_id
74+
else ProcessingJobRecord.upscaling_task_id.is_(None)
75+
),
76+
)
5977
.all()
6078
)
6179

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import datetime
2+
from typing import List, Optional
3+
4+
from loguru import logger
5+
from sqlalchemy import DateTime, Enum, Integer, String
6+
from sqlalchemy.orm import Mapped, Session, mapped_column
7+
8+
from app.database.db import Base
9+
from app.schemas.unit_job import ProcessingStatusEnum, ProcessTypeEnum
10+
11+
12+
class UpscalingTaskRecord(Base):
13+
__tablename__ = "upscaling_tasks"
14+
15+
id: Mapped[int] = mapped_column(
16+
Integer, primary_key=True, index=True, autoincrement=True
17+
)
18+
title: Mapped[str] = mapped_column(String, index=True)
19+
label: Mapped[ProcessTypeEnum] = mapped_column(Enum(ProcessTypeEnum), index=True)
20+
status: Mapped[ProcessingStatusEnum] = mapped_column(
21+
Enum(ProcessingStatusEnum), index=True
22+
)
23+
user_id: Mapped[str] = mapped_column(String, index=True)
24+
service: Mapped[str] = mapped_column(String, index=True)
25+
created: Mapped[datetime.datetime] = mapped_column(
26+
DateTime, default=datetime.datetime.utcnow, index=True
27+
)
28+
updated: Mapped[datetime.datetime] = mapped_column(
29+
DateTime,
30+
default=datetime.datetime.utcnow,
31+
onupdate=datetime.datetime.utcnow,
32+
index=True,
33+
)
34+
35+
36+
def save_upscaling_task_to_db(
37+
db_session: Session, task: UpscalingTaskRecord
38+
) -> UpscalingTaskRecord:
39+
"""
40+
Save an upscaling task record to the database and update the ID of the task.
41+
42+
:param db_session: The database session to use for saving the task.
43+
:param job: The UpscalingTaskRecord instance to save.
44+
"""
45+
db_session.add(task)
46+
db_session.commit()
47+
db_session.refresh(task) # Refresh to get the ID after commit
48+
logger.debug(f"Upscale task saved with ID: {task.id}")
49+
return task
50+
51+
52+
def get_upscale_tasks_by_user_id(
53+
database: Session, user_id: str
54+
) -> List[UpscalingTaskRecord]:
55+
logger.info(f"Retrieving all upscale tasks for user {user_id}")
56+
return (
57+
database.query(UpscalingTaskRecord)
58+
.filter(UpscalingTaskRecord.user_id == user_id)
59+
.all()
60+
)
61+
62+
63+
def get_upscale_task_by_id(
64+
database: Session, task_id: int
65+
) -> Optional[UpscalingTaskRecord]:
66+
logger.info(f"Retrieving upscale task with ID {task_id}")
67+
return (
68+
database.query(UpscalingTaskRecord)
69+
.filter(UpscalingTaskRecord.id == task_id)
70+
.first()
71+
)
72+
73+
74+
def get_upscale_task_by_user_id(
75+
database: Session, task_id: int, user_id: str
76+
) -> Optional[UpscalingTaskRecord]:
77+
logger.info(f"Retrieving upscale task with ID {task_id} for user {user_id}")
78+
return (
79+
database.query(UpscalingTaskRecord)
80+
.filter(
81+
UpscalingTaskRecord.id == task_id, UpscalingTaskRecord.user_id == user_id
82+
)
83+
.first()
84+
)
85+
86+
87+
def update_upscale_task_status_by_id(
88+
database: Session, task_id: int, status: ProcessingStatusEnum
89+
):
90+
logger.info(f"Updating the status of upscale task with ID {task_id} to {status}")
91+
task = get_upscale_task_by_id(database, task_id)
92+
93+
if task:
94+
task.status = status
95+
database.commit()
96+
database.refresh(task)
97+
else:
98+
logger.warning(
99+
f"Could not update upscaling task status of task {task_id} as it could not be found "
100+
"in the database"
101+
)
102+
103+
104+
# def get_jobs_by_user_id(database: Session, user_id: str) -> List[ProcessingJobRecord]:
105+
# logger.info(f"Retrieving all processing jobs for user {user_id}")
106+
# return (
107+
# database.query(ProcessingJobRecord)
108+
# .filter(
109+
# ProcessingJobRecord.user_id == user_id,
110+
# ProcessingJobRecord.upscaling_task_id is None,
111+
# )
112+
# .all()
113+
# )

app/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from app.services.tiles.base import load_grids
66
from .config.logger import setup_logging
77
from .config.settings import settings
8-
from .routers import jobs_status, unit_jobs, health, tiles
8+
from .routers import jobs_status, unit_jobs, health, tiles, upscale_tasks
99

1010
setup_logging()
1111

@@ -27,4 +27,5 @@
2727
app.include_router(tiles.router)
2828
app.include_router(jobs_status.router)
2929
app.include_router(unit_jobs.router)
30+
app.include_router(upscale_tasks.router)
3031
app.include_router(health.router)

app/routers/jobs_status.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from app.database.db import get_db
88
from app.schemas.jobs_status import JobsStatusResponse
99
from app.services.processing import get_processing_jobs_by_user_id
10+
from app.services.upscaling import get_upscaling_tasks_by_user_id
1011

1112
router = APIRouter()
1213

@@ -25,7 +26,8 @@ async def get_jobs_status(
2526
"""
2627
logger.debug(f"Fetching jobs list for user {user}")
2728
return JobsStatusResponse(
28-
upscaling_tasks=[], processing_jobs=get_processing_jobs_by_user_id(db, user)
29+
upscaling_tasks=get_upscaling_tasks_by_user_id(db, user),
30+
processing_jobs=get_processing_jobs_by_user_id(db, user),
2931
)
3032

3133

0 commit comments

Comments
 (0)