Skip to content

Commit 00e2ea6

Browse files
committed
[DOP-22530] Implement GET /v1/jobs?location_type=...
1 parent b8e039f commit 00e2ea6

File tree

10 files changed

+276
-167
lines changed

10 files changed

+276
-167
lines changed

data_rentgen/db/repositories/job.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,40 @@ async def paginate(
7777
job_ids: Collection[int],
7878
search_query: str | None,
7979
location_id: int | None,
80+
location_type: Collection[str],
8081
job_type: Collection[str],
8182
) -> PaginationDTO[Job]:
8283
where = []
84+
location_join_clause = Location.id == Job.location_id
8385
if job_ids:
8486
where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type]
85-
86-
query: Select | CompoundSelect
87-
order_by: list[ColumnElement | SQLColumnExpression]
8887
if job_type:
8988
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
9089
if location_id:
9190
where.append(Job.location_id == location_id) # type: ignore[arg-type]
91+
if location_type:
92+
location_type_lower = [location_type.lower() for location_type in location_type]
93+
where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type]
9294

95+
query: Select | CompoundSelect
96+
order_by: list[ColumnElement | SQLColumnExpression]
9397
if search_query:
9498
tsquery = make_tsquery(search_query)
9599

96-
job_stmt = select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank")).where(
97-
ts_match(Job.search_vector, tsquery),
98-
*where,
100+
job_stmt = (
101+
select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank"))
102+
.join(Location, location_join_clause)
103+
.where(ts_match(Job.search_vector, tsquery), *where)
99104
)
100105
location_stmt = (
101106
select(Job, ts_rank(Location.search_vector, tsquery).label("search_rank"))
102-
.join(Job, Location.id == Job.location_id)
107+
.join(Location, location_join_clause)
103108
.where(ts_match(Location.search_vector, tsquery), *where)
104109
)
105110
address_stmt = (
106111
select(Job, func.max(ts_rank(Address.search_vector, tsquery).label("search_rank")))
107-
.join(Location, Address.location_id == Location.id)
108-
.join(Job, Location.id == Job.location_id)
112+
.join(Location, location_join_clause)
113+
.join(Address, Address.location_id == Job.location_id)
109114
.where(ts_match(Address.search_vector, tsquery), *where)
110115
.group_by(Job.id, Location.id, Address.id)
111116
)
@@ -120,7 +125,7 @@ async def paginate(
120125
).group_by(*job_columns)
121126
order_by = [desc("search_rank"), asc("name")]
122127
else:
123-
query = select(Job).where(*where)
128+
query = select(Job).join(Location, location_join_clause).where(*where)
124129
order_by = [Job.name]
125130

126131
options = [selectinload(Job.location).selectinload(Location.addresses)]

data_rentgen/server/api/v1/router/job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ async def paginate_jobs(
4444
job_ids=query_args.job_id,
4545
search_query=query_args.search_query,
4646
location_id=query_args.location_id,
47+
location_type=query_args.location_type,
4748
job_type=query_args.job_type,
4849
)
4950
return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination)

data_rentgen/server/schemas/v1/job.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ class JobDetailedResponseV1(BaseModel):
2929
class JobTypesResponseV1(BaseModel):
3030
"""Job types"""
3131

32-
job_types: list[str] = Field(description="List of distinct job types")
32+
job_types: list[str] = Field(
33+
description="List of distinct job types",
34+
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
35+
)
3336

3437
model_config = ConfigDict(from_attributes=True)
3538

@@ -42,14 +45,22 @@ class JobPaginateQueryV1(PaginateQueryV1):
4245
default=None,
4346
min_length=3,
4447
description="Search query",
48+
examples=["my job"],
4549
)
4650
job_type: list[str] = Field(
4751
default_factory=list,
4852
description="Specify job types",
53+
examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]],
4954
)
5055
location_id: int | None = Field(
5156
default=None,
5257
description="The location id which jobs belong",
58+
examples=[123],
59+
)
60+
location_type: list[str] = Field(
61+
default_factory=list,
62+
description="Specify location types",
63+
examples=[["yarn"]],
5364
)
5465

5566
model_config = ConfigDict(extra="forbid")

data_rentgen/server/services/job.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async def paginate(
3232
job_ids: Collection[int],
3333
search_query: str | None,
3434
location_id: int | None,
35+
location_type: Collection[str],
3536
job_type: Collection[str],
3637
) -> JobServicePaginatedResult:
3738
pagination = await self._uow.job.paginate(
@@ -40,6 +41,7 @@ async def paginate(
4041
job_ids=job_ids,
4142
search_query=search_query,
4243
location_id=location_id,
44+
location_type=location_type,
4345
job_type=job_type,
4446
)
4547

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Add new filter to ``GET /v1/jobs``:
2+
- location_type: ``list[str]``

tests/test_server/fixtures/factories/job.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,20 +235,20 @@ async def jobs_search(
235235
@pytest_asyncio.fixture
236236
async def jobs_with_locations_and_types(
237237
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
238-
) -> AsyncGenerator[tuple[Job]]:
238+
) -> AsyncGenerator[tuple[Job, ...]]:
239239
async with async_session_maker() as async_session:
240240
cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"})
241241
airflow_location = await create_location(
242242
async_session,
243243
location_kwargs={"name": "airflow-host", "type": "http"},
244244
)
245-
cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
245+
spark_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
246246
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
247247
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
248-
cluster_job = await create_job(
248+
spark_job = await create_job(
249249
async_session,
250250
location_id=cluster_location.id,
251-
job_type_id=cluster_type.id,
251+
job_type_id=spark_type.id,
252252
job_kwargs={"name": "my-job_cluster"},
253253
)
254254
dag_job = await create_job(
@@ -266,7 +266,7 @@ async def jobs_with_locations_and_types(
266266

267267
async_session.expunge_all()
268268

269-
yield (cluster_job, dag_job, task_job)
269+
yield (spark_job, dag_job, task_job)
270270

271271
async with async_session_maker() as async_session:
272272
await clean_db(async_session)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from http import HTTPStatus
2+
3+
import pytest
4+
from httpx import AsyncClient
5+
from sqlalchemy.ext.asyncio import AsyncSession
6+
7+
from data_rentgen.db.models import Job
8+
from tests.fixtures.mocks import MockedUser
9+
from tests.test_server.utils.convert_to_json import job_to_json
10+
from tests.test_server.utils.enrich import enrich_jobs
11+
12+
pytestmark = [pytest.mark.server, pytest.mark.asyncio]
13+
14+
15+
async def test_get_jobs_by_location_id(
16+
test_client: AsyncClient,
17+
async_session: AsyncSession,
18+
jobs_with_locations_and_types: tuple[Job, ...],
19+
mocked_user: MockedUser,
20+
) -> None:
21+
jobs = await enrich_jobs(jobs_with_locations_and_types, async_session)
22+
23+
# first job in jobs has a different location unlike two others
24+
[_, dag_job, task_job] = jobs
25+
response = await test_client.get(
26+
"/v1/jobs",
27+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
28+
params={"location_id": dag_job.location_id},
29+
)
30+
31+
assert response.status_code == HTTPStatus.OK, response.json()
32+
assert response.json() == {
33+
"meta": {
34+
"has_next": False,
35+
"has_previous": False,
36+
"next_page": None,
37+
"page": 1,
38+
"page_size": 20,
39+
"pages_count": 1,
40+
"previous_page": None,
41+
"total_count": 2,
42+
},
43+
"items": [
44+
{
45+
"id": str(job.id),
46+
"data": job_to_json(job),
47+
}
48+
for job in [dag_job, task_job]
49+
],
50+
}
51+
52+
53+
async def test_get_jobs_by_location_id_non_existent(
54+
test_client: AsyncClient,
55+
async_session: AsyncSession,
56+
jobs_with_locations_and_types: tuple[Job, ...],
57+
mocked_user: MockedUser,
58+
):
59+
response = await test_client.get(
60+
"/v1/jobs",
61+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
62+
params={"location_id": -1},
63+
)
64+
65+
assert response.status_code == HTTPStatus.OK, response.json()
66+
assert response.json() == {
67+
"meta": {
68+
"has_next": False,
69+
"has_previous": False,
70+
"next_page": None,
71+
"page": 1,
72+
"page_size": 20,
73+
"pages_count": 1,
74+
"previous_page": None,
75+
"total_count": 0,
76+
},
77+
"items": [],
78+
}
79+
80+
81+
async def test_get_lobs_by_location_type(
82+
test_client: AsyncClient,
83+
async_session: AsyncSession,
84+
jobs_with_locations_and_types: tuple[Job, ...],
85+
mocked_user: MockedUser,
86+
) -> None:
87+
spark_job, *_ = await enrich_jobs(jobs_with_locations_and_types, async_session)
88+
response = await test_client.get(
89+
"/v1/jobs",
90+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
91+
params={"location_type": ["YARN"]}, # case-insensitive
92+
)
93+
94+
assert response.status_code == HTTPStatus.OK, response.json()
95+
assert response.json() == {
96+
"meta": {
97+
"has_next": False,
98+
"has_previous": False,
99+
"next_page": None,
100+
"page": 1,
101+
"page_size": 20,
102+
"pages_count": 1,
103+
"previous_page": None,
104+
"total_count": 1,
105+
},
106+
"items": [
107+
{
108+
"id": str(spark_job.id),
109+
"data": job_to_json(spark_job),
110+
},
111+
],
112+
}
113+
114+
115+
async def test_get_jobs_by_location_type_non_existent(
116+
test_client: AsyncClient,
117+
async_session: AsyncSession,
118+
jobs_with_locations_and_types: tuple[Job, ...],
119+
mocked_user: MockedUser,
120+
):
121+
response = await test_client.get(
122+
"/v1/jobs",
123+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
124+
params={"location_type": "non_existing_location_type"},
125+
)
126+
127+
assert response.status_code == HTTPStatus.OK, response.json()
128+
assert response.json() == {
129+
"meta": {
130+
"has_next": False,
131+
"has_previous": False,
132+
"next_page": None,
133+
"page": 1,
134+
"page_size": 20,
135+
"pages_count": 1,
136+
"previous_page": None,
137+
"total_count": 0,
138+
},
139+
"items": [],
140+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from http import HTTPStatus
2+
3+
import pytest
4+
from httpx import AsyncClient
5+
from sqlalchemy.ext.asyncio import AsyncSession
6+
7+
from data_rentgen.db.models import Job, JobType
8+
from tests.fixtures.mocks import MockedUser
9+
from tests.test_server.utils.convert_to_json import job_to_json
10+
from tests.test_server.utils.enrich import enrich_jobs
11+
12+
pytestmark = [pytest.mark.server, pytest.mark.asyncio]
13+
14+
15+
async def test_get_job_types(
16+
test_client: AsyncClient,
17+
job_types: list[JobType],
18+
mocked_user: MockedUser,
19+
) -> None:
20+
unique_job_type = {item.type for item in job_types}
21+
response = await test_client.get(
22+
"/v1/jobs/types",
23+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
24+
)
25+
26+
assert response.status_code == HTTPStatus.OK, response.json()
27+
assert response.json() == {"job_types": sorted(unique_job_type)}
28+
29+
30+
async def test_get_jobs_by_job_type(
31+
test_client: AsyncClient,
32+
async_session: AsyncSession,
33+
jobs_with_locations_and_types: tuple[Job, ...],
34+
mocked_user: MockedUser,
35+
) -> None:
36+
jobs = await enrich_jobs(jobs_with_locations_and_types, async_session)
37+
[_, dag_job, task_job] = jobs
38+
response = await test_client.get(
39+
"/v1/jobs",
40+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
41+
params={"job_type": ["AIRFLOW_DAG", "AIRFLOW_TASK"]},
42+
)
43+
44+
assert response.status_code == HTTPStatus.OK, response.json()
45+
assert response.json() == {
46+
"meta": {
47+
"has_next": False,
48+
"has_previous": False,
49+
"next_page": None,
50+
"page": 1,
51+
"page_size": 20,
52+
"pages_count": 1,
53+
"previous_page": None,
54+
"total_count": 2,
55+
},
56+
"items": [
57+
{
58+
"id": str(job.id),
59+
"data": job_to_json(job),
60+
}
61+
for job in (dag_job, task_job)
62+
],
63+
}
64+
65+
66+
async def test_get_jobs_by_non_existent_type(
67+
test_client: AsyncClient,
68+
async_session: AsyncSession,
69+
jobs_with_locations_and_types: tuple[Job, ...],
70+
mocked_user: MockedUser,
71+
) -> None:
72+
response = await test_client.get(
73+
"/v1/jobs",
74+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
75+
params={"job_type": "NO_EXISTENT_TYPE"},
76+
)
77+
78+
assert response.status_code == HTTPStatus.OK, response.json()
79+
assert response.json() == {
80+
"meta": {
81+
"has_next": False,
82+
"has_previous": False,
83+
"next_page": None,
84+
"page": 1,
85+
"page_size": 20,
86+
"pages_count": 1,
87+
"previous_page": None,
88+
"total_count": 0,
89+
},
90+
"items": [],
91+
}

0 commit comments

Comments
 (0)