Skip to content

Commit 932423d

Browse files
committed
[DOP-31839] Add job tags
1 parent 172315a commit 932423d

28 files changed

+513
-255
lines changed

data_rentgen/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4.8
1+
0.5.0
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# SPDX-FileCopyrightText: 2024-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Add job_tag
4+
5+
Revision ID: 675ed36b7807
6+
Revises: f1c76c422433
7+
Create Date: 2026-01-21 11:30:01.406676
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "675ed36b7807"
16+
down_revision = "f1c76c422433"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.create_table(
23+
"job_tag",
24+
sa.Column("job_id", sa.BigInteger(), nullable=False),
25+
sa.Column("tag_value_id", sa.BigInteger(), nullable=False),
26+
sa.ForeignKeyConstraint(["job_id"], ["job.id"], name=op.f("fk__job_tag__job_id__job"), ondelete="CASCADE"),
27+
sa.ForeignKeyConstraint(
28+
["tag_value_id"], ["tag_value.id"], name=op.f("fk__job_tag__tag_value_id__tag_value"), ondelete="CASCADE"
29+
),
30+
sa.PrimaryKeyConstraint("job_id", "tag_value_id", name=op.f("pk__job_tag")),
31+
)
32+
op.create_index("ix__job_tag__tag_value_id", "job_tag", ["tag_value_id"], unique=False)
33+
34+
35+
def downgrade() -> None:
36+
op.drop_index("ix__job_tag__tag_value_id", table_name="job_tag")
37+
op.drop_table("job_tag")
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# SPDX-FileCopyrightText: 2024-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Rename table dataset_tags to dataset_tag
4+
5+
Revision ID: f1c76c422433
6+
Revises: 8e8891273099
7+
Create Date: 2026-01-21 11:27:20.255770
8+
9+
"""
10+
11+
from alembic import op
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "f1c76c422433"
15+
down_revision = "8e8891273099"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade() -> None:
21+
op.rename_table("dataset_tags", "dataset_tag")
22+
op.execute("ALTER INDEX ix__dataset_tags__tag_value_id RENAME TO ix__dataset_tag__tag_value_id")
23+
24+
25+
def downgrade() -> None:
26+
op.rename_table("dataset_tag", "dataset_tags")
27+
op.execute("ALTER INDEX ix__dataset_tag__tag_value_id RENAME TO ix__dataset_tags__tag_value_id")

data_rentgen/db/models/dataset.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ class Dataset(Base):
3939
doc="Dataset name, e.g. table name or filesystem path",
4040
)
4141

42-
tag_values: Mapped[set[TagValue]] = relationship(secondary=lambda: dataset_tags_table)
42+
tag_values: Mapped[set[TagValue]] = relationship(
43+
secondary=lambda: dataset_tags_table,
44+
doc="Dataset tag values",
45+
)
4346

4447
search_vector: Mapped[str] = mapped_column(
4548
TSVECTOR,
@@ -63,9 +66,9 @@ class Dataset(Base):
6366

6467

6568
dataset_tags_table: Table = Table(
66-
"dataset_tags",
69+
"dataset_tag",
6770
Base.metadata,
6871
Column("dataset_id", ForeignKey("dataset.id", ondelete="CASCADE"), primary_key=True),
6972
Column("tag_value_id", ForeignKey("tag_value.id", ondelete="CASCADE"), primary_key=True),
70-
Index("ix__dataset_tags__tag_value_id", "tag_value_id"),
73+
Index("ix__dataset_tag__tag_value_id", "tag_value_id"),
7174
)

data_rentgen/db/models/job.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33

44
from __future__ import annotations
55

6-
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, column, func, select
6+
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, Table, column, func, select
77
from sqlalchemy.dialects.postgresql import TSVECTOR
88
from sqlalchemy.orm import Mapped, column_property, mapped_column, relationship
99

1010
from data_rentgen.db.models.base import Base
1111
from data_rentgen.db.models.job_type import JobType
1212
from data_rentgen.db.models.location import Location
13+
from data_rentgen.db.models.tag_value import TagValue
1314

1415

1516
class Job(Base):
@@ -47,6 +48,11 @@ class Job(Base):
4748
select(JobType.type).where(Column("type_id") == JobType.id).scalar_subquery(),
4849
)
4950

51+
tag_values: Mapped[set[TagValue]] = relationship(
52+
secondary=lambda: job_tag_table,
53+
doc="Job tag values",
54+
)
55+
5056
search_vector: Mapped[str] = mapped_column(
5157
TSVECTOR,
5258
Computed(
@@ -66,3 +72,12 @@ class Job(Base):
6672
deferred=True,
6773
doc="Full-text search vector",
6874
)
75+
76+
77+
job_tag_table: Table = Table(
78+
"job_tag",
79+
Base.metadata,
80+
Column("job_id", ForeignKey("job.id", ondelete="CASCADE"), primary_key=True),
81+
Column("tag_value_id", ForeignKey("tag_value.id", ondelete="CASCADE"), primary_key=True),
82+
Index("ix__job_tag__tag_value_id", "tag_value_id"),
83+
)

data_rentgen/db/repositories/job.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
bindparam,
1717
cast,
1818
desc,
19+
distinct,
1920
func,
2021
select,
2122
tuple_,
2223
union,
2324
)
2425
from sqlalchemy.orm import selectinload
2526

26-
from data_rentgen.db.models import Address, Job, Location
27+
from data_rentgen.db.models import Address, Job, Location, TagValue
2728
from data_rentgen.db.repositories.base import Repository
2829
from data_rentgen.db.utils.search import make_tsquery, ts_match, ts_rank
2930
from data_rentgen.dto import JobDTO, PaginationDTO
@@ -76,6 +77,7 @@ async def paginate(
7677
page_size: int,
7778
job_ids: Collection[int],
7879
job_types: Collection[str],
80+
tag_value_ids: Collection[int],
7981
location_ids: Collection[int],
8082
location_types: Collection[str],
8183
search_query: str | None,
@@ -92,6 +94,18 @@ async def paginate(
9294
location_type_lower = [location_type.lower() for location_type in location_types]
9395
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]
9496

97+
if tag_value_ids:
98+
tv_ids = list(tag_value_ids)
99+
job_ids_subq = (
100+
select(Job.id)
101+
.join(Job.tag_values)
102+
.where(TagValue.id.in_(tv_ids))
103+
.group_by(Job.id)
104+
# If multiple tag values are passed, job should have both of them (AND, not OR)
105+
.having(func.count(distinct(TagValue.id)) == len(tv_ids))
106+
)
107+
where.append(Job.id.in_(job_ids_subq))
108+
95109
query: Select | CompoundSelect
96110
order_by: list[ColumnElement | SQLColumnExpression]
97111
if search_query:
@@ -128,7 +142,10 @@ async def paginate(
128142
query = select(Job).join(Location, location_join_clause).where(*where)
129143
order_by = [Job.name]
130144

131-
options = [selectinload(Job.location).selectinload(Location.addresses)]
145+
options = [
146+
selectinload(Job.location).selectinload(Location.addresses),
147+
selectinload(Job.tag_values).selectinload(TagValue.tag),
148+
]
132149
return await self._paginate_by_query(
133150
query=query,
134151
order_by=order_by,

data_rentgen/server/api/v1/router/job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async def paginate_jobs(
4343
page_size=query_args.page_size,
4444
job_ids=query_args.job_id,
4545
job_types=query_args.job_type,
46+
tag_value_ids=query_args.tag_value_id,
4647
location_ids=query_args.location_id,
4748
location_types=query_args.location_type,
4849
search_query=query_args.search_query,

data_rentgen/server/schemas/v1/job.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from data_rentgen.server.schemas.v1.location import LocationResponseV1
88
from data_rentgen.server.schemas.v1.pagination import PaginateQueryV1
9+
from data_rentgen.server.schemas.v1.tag import TagResponseV1
910

1011

1112
class JobResponseV1(BaseModel):
@@ -22,6 +23,7 @@ class JobResponseV1(BaseModel):
2223
class JobDetailedResponseV1(BaseModel):
2324
id: str = Field(description="Job id", coerce_numbers_to_str=True)
2425
data: JobResponseV1 = Field(description="Job data")
26+
tags: list[TagResponseV1] = Field(default_factory=list, description="Job tags")
2527

2628
model_config = ConfigDict(from_attributes=True)
2729

@@ -55,6 +57,14 @@ class JobPaginateQueryV1(PaginateQueryV1):
5557
description="Types of jobs",
5658
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
5759
)
60+
tag_value_id: list[int] = Field(
61+
default_factory=list,
62+
description=(
63+
"Get jobs having specific tag values assigned. "
64+
"If multiple values are passed, job should have all of them (AND, not OR)"
65+
),
66+
examples=[[123]],
67+
)
5868
location_id: list[int] = Field(
5969
default_factory=list,
6070
description="Ids of locations the job started at",

data_rentgen/server/services/dataset.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from fastapi import Depends
99

10-
from data_rentgen.db.models.location import Location
10+
from data_rentgen.db.models import Location
1111
from data_rentgen.dto.pagination import PaginationDTO
1212
from data_rentgen.server.services.tag import TagData, TagValueData
1313
from data_rentgen.services.uow import UnitOfWork
@@ -18,7 +18,6 @@ class DatasetData:
1818
id: int
1919
name: str
2020
location: Location
21-
schema = None
2221

2322

2423
@dataclass

data_rentgen/server/services/job.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,30 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from collections.abc import Collection, Sequence
44
from dataclasses import dataclass
5+
from itertools import groupby
56
from typing import Annotated
67

78
from fastapi import Depends
89

9-
from data_rentgen.db.models.job import Job
10+
from data_rentgen.db.models import Location
1011
from data_rentgen.dto.pagination import PaginationDTO
12+
from data_rentgen.server.services.tag import TagData, TagValueData
1113
from data_rentgen.services.uow import UnitOfWork
1214

1315

16+
@dataclass
17+
class JobData:
18+
id: int
19+
name: str
20+
type: str
21+
location: Location
22+
23+
1424
@dataclass
1525
class JobServiceResult:
1626
id: int
17-
data: Job
27+
data: JobData
28+
tags: list[TagData]
1829

1930

2031
class JobServicePaginatedResult(PaginationDTO[JobServiceResult]):
@@ -31,6 +42,7 @@ async def paginate(
3142
page_size: int,
3243
job_ids: Collection[int],
3344
job_types: Collection[str],
45+
tag_value_ids: Collection[int],
3446
location_ids: Collection[int],
3547
location_types: Collection[str],
3648
search_query: str | None,
@@ -40,6 +52,7 @@ async def paginate(
4052
page_size=page_size,
4153
job_ids=job_ids,
4254
job_types=job_types,
55+
tag_value_ids=tag_value_ids,
4356
location_ids=location_ids,
4457
location_types=location_types,
4558
search_query=search_query,
@@ -49,7 +62,31 @@ async def paginate(
4962
page=pagination.page,
5063
page_size=pagination.page_size,
5164
total_count=pagination.total_count,
52-
items=[JobServiceResult(id=job.id, data=job) for job in pagination.items],
65+
items=[
66+
JobServiceResult(
67+
id=job.id,
68+
data=JobData(
69+
id=job.id,
70+
name=job.name,
71+
type=job.type,
72+
location=job.location,
73+
),
74+
tags=[
75+
TagData(
76+
id=tag.id,
77+
name=tag.name,
78+
values=[
79+
TagValueData(id=tv.id, value=tv.value) for tv in sorted(group, key=lambda tv: tv.value)
80+
],
81+
)
82+
for tag, group in groupby(
83+
sorted(job.tag_values, key=lambda tv: tv.tag.name),
84+
key=lambda tv: tv.tag,
85+
)
86+
],
87+
)
88+
for job in pagination.items
89+
],
5390
)
5491

5592
async def get_job_types(self) -> Sequence[str]:

0 commit comments

Comments
 (0)