Skip to content

Commit a74592e

Browse files
committed
[DOP-22532] change job type filter to list
1 parent 39bdd6e commit a74592e

File tree

6 files changed

+61
-123
lines changed

6 files changed

+61
-123
lines changed

data_rentgen/db/repositories/job.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async def paginate(
7777
job_ids: Collection[int],
7878
search_query: str | None,
7979
location_id: int | None,
80-
job_type: str | None,
80+
job_type: Collection[str],
8181
) -> PaginationDTO[Job]:
8282
where = []
8383
if job_ids:
@@ -86,7 +86,7 @@ async def paginate(
8686
query: Select | CompoundSelect
8787
order_by: list[ColumnElement | SQLColumnExpression]
8888
if job_type:
89-
where.append(Job.type == job_type) # type: ignore[arg-type]
89+
where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type]
9090
if location_id:
9191
where.append(Job.location_id == location_id) # type: ignore[arg-type]
9292

data_rentgen/server/api/v1/router/job.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
router = APIRouter(
2323
prefix="/jobs",
2424
tags=["Jobs"],
25-
responses=get_error_responses(include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema}),
25+
responses=get_error_responses(
26+
include={NotAuthorizedSchema, NotAuthorizedRedirectSchema, InvalidRequestSchema},
27+
),
2628
)
2729

2830

@@ -62,7 +64,7 @@ async def get_jobs_lineage(
6264
return build_lineage_response(lineage)
6365

6466

65-
@router.get("/job_types", summary="Get distinct types of Jobs")
67+
@router.get("/job-types", summary="Get distinct types of Jobs")
6668
async def get_job_types(
6769
job_service: Annotated[JobService, Depends()],
6870
current_user: Annotated[User, Depends(get_user())],

data_rentgen/server/schemas/v1/job.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class JobPaginateQueryV1(PaginateQueryV1):
4343
min_length=3,
4444
description="Search query",
4545
)
46-
job_type: str | None = Field(
47-
default=None,
48-
description="Filter for searching",
46+
job_type: list[str] = Field(
47+
default_factory=list,
48+
description="Specify job types",
4949
)
5050
location_id: int | None = Field(
5151
default=None,
52-
description="",
52+
description="The location id which jobs belong",
5353
)
5454

5555
model_config = ConfigDict(extra="forbid")

data_rentgen/server/services/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async def paginate(
3232
job_ids: Collection[int],
3333
search_query: str | None,
3434
location_id: int | None,
35-
job_type: str | None,
35+
job_type: Collection[str],
3636
) -> JobServicePaginatedResult:
3737
pagination = await self._uow.job.paginate(
3838
page=page,

tests/test_server/fixtures/factories/job.py

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ async def jobs_search(
155155
await create_location(
156156
async_session,
157157
location_kwargs=kwargs,
158-
address_kwargs={"urls": [random_string(32), random_string(32)]}, # Each location has 2 addresses
158+
# Each location has 2 addresses
159+
address_kwargs={
160+
"urls": [random_string(32), random_string(32)],
161+
},
159162
)
160163
for kwargs in location_kwargs
161164
]
@@ -170,7 +173,9 @@ async def jobs_search(
170173
await create_location(
171174
async_session,
172175
location_kwargs={"type": "random"},
173-
address_kwargs={"urls": [random_string(32), random_string(32)]},
176+
address_kwargs={
177+
"urls": [random_string(32), random_string(32)],
178+
},
174179
)
175180
for _ in range(3)
176181
]
@@ -207,9 +212,19 @@ async def jobs_search(
207212

208213
jobs_by_name = {job.name: job for job in jobs_with_names}
209214
jobs_by_location = dict(
210-
zip([location.name for location in locations_with_names], jobs_with_location_names, strict=False),
215+
zip(
216+
[location.name for location in locations_with_names],
217+
jobs_with_location_names,
218+
strict=False,
219+
),
220+
)
221+
jobs_by_address = dict(
222+
zip(
223+
addresses_url,
224+
[job for job in jobs_with_address_urls for _ in range(2)],
225+
strict=False,
226+
),
211227
)
212-
jobs_by_address = dict(zip(addresses_url, [job for job in jobs_with_address_urls for _ in range(2)], strict=False))
213228

214229
yield jobs_by_name, jobs_by_location, jobs_by_address
215230

@@ -218,36 +233,7 @@ async def jobs_search(
218233

219234

220235
@pytest_asyncio.fixture
221-
async def jobs_search_with_different_types(
222-
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
223-
) -> AsyncGenerator[tuple[Job]]:
224-
async with async_session_maker() as async_session:
225-
location = await create_location(async_session)
226-
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
227-
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
228-
229-
job_with_dag_type = await create_job(
230-
async_session,
231-
location_id=location.id,
232-
job_type_id=airflow_dag_type.id,
233-
job_kwargs={"name": "airflow-dag"},
234-
)
235-
job_with_task_type = await create_job(
236-
async_session,
237-
location_id=location.id,
238-
job_type_id=airflow_task_type.id,
239-
job_kwargs={"name": "airflow-task"},
240-
)
241-
async_session.expunge_all()
242-
243-
yield (job_with_dag_type, job_with_task_type)
244-
245-
async with async_session_maker() as async_session:
246-
await clean_db(async_session)
247-
248-
249-
@pytest_asyncio.fixture
250-
async def jobs_search_with_different_locations(
236+
async def jobs_with_locations_and_types(
251237
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
252238
) -> AsyncGenerator[tuple[Job]]:
253239
async with async_session_maker() as async_session:
@@ -256,23 +242,25 @@ async def jobs_search_with_different_locations(
256242
async_session,
257243
location_kwargs={"name": "airflow-host", "type": "http"},
258244
)
259-
job_type = await create_job_type(async_session)
245+
cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"})
246+
airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"})
247+
airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"})
260248
cluster_job = await create_job(
261249
async_session,
262250
location_id=cluster_location.id,
263-
job_type_id=job_type.id,
251+
job_type_id=cluster_type.id,
264252
job_kwargs={"name": "my-job_cluster"},
265253
)
266254
dag_job = await create_job(
267255
async_session,
268256
location_id=airflow_location.id,
269-
job_type_id=job_type.id,
257+
job_type_id=airflow_dag_type.id,
270258
job_kwargs={"name": "my-job_dag"},
271259
)
272260
task_job = await create_job(
273261
async_session,
274262
location_id=airflow_location.id,
275-
job_type_id=job_type.id,
263+
job_type_id=airflow_task_type.id,
276264
job_kwargs={"name": "my-job_task"},
277265
)
278266

tests/test_server/test_jobs/test_search_jobs.py

Lines changed: 24 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ async def test_get_job_types(
254254
) -> None:
255255
unique_job_type = {item.type for item in job_types}
256256
response = await test_client.get(
257-
"/v1/jobs/job_types",
257+
"/v1/jobs/job-types",
258258
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
259259
)
260260

@@ -265,47 +265,21 @@ async def test_get_job_types(
265265
async def test_search_jobs_by_location_id(
266266
test_client: AsyncClient,
267267
async_session: AsyncSession,
268-
jobs_search_with_different_locations: tuple[Job],
268+
jobs_with_locations_and_types: tuple[Job],
269269
mocked_user: MockedUser,
270270
) -> None:
271-
jobs = await enrich_jobs(jobs_search_with_different_locations, async_session)
272-
response = await test_client.get(
273-
"/v1/jobs",
274-
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
275-
params={"search_query": "my-job"},
276-
)
277-
278-
assert response.status_code == HTTPStatus.OK, response.json()
279-
assert response.json() == {
280-
"meta": {
281-
"has_next": False,
282-
"has_previous": False,
283-
"next_page": None,
284-
"page": 1,
285-
"page_size": 20,
286-
"pages_count": 1,
287-
"previous_page": None,
288-
"total_count": 3,
289-
},
290-
"items": [
291-
{
292-
"id": str(job.id),
293-
"data": job_to_json(job),
294-
}
295-
for job in jobs
296-
],
297-
}
271+
jobs = await enrich_jobs(jobs_with_locations_and_types, async_session)
298272

299273
# first job in jobs has a different location unlike two others
300274
[_, dag_job, task_job] = jobs
301-
response_with_location_id_filter = await test_client.get(
275+
response = await test_client.get(
302276
"/v1/jobs",
303277
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
304-
params={"search_query": "my-job", "location_id": dag_job.location_id},
278+
params={"location_id": dag_job.location_id},
305279
)
306280

307-
assert response_with_location_id_filter.status_code == HTTPStatus.OK, response_with_location_id_filter.json()
308-
assert response_with_location_id_filter.json() == {
281+
assert response.status_code == HTTPStatus.OK, response.json()
282+
assert response.json() == {
309283
"meta": {
310284
"has_next": False,
311285
"has_previous": False,
@@ -329,17 +303,17 @@ async def test_search_jobs_by_location_id(
329303
async def test_search_jobs_by_location_id_non_existen_id(
330304
test_client: AsyncClient,
331305
async_session: AsyncSession,
332-
jobs_search_with_different_locations: tuple[Job],
306+
jobs_with_locations_and_types: tuple[Job],
333307
mocked_user: MockedUser,
334308
):
335-
response_with_location_id_filter = await test_client.get(
309+
response = await test_client.get(
336310
"/v1/jobs",
337311
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
338-
params={"search_query": "my-job", "location_id": -1},
312+
params={"location_id": -1},
339313
)
340314

341-
assert response_with_location_id_filter.status_code == HTTPStatus.OK, response_with_location_id_filter.json()
342-
assert response_with_location_id_filter.json() == {
315+
assert response.status_code == HTTPStatus.OK, response.json()
316+
assert response.json() == {
343317
"meta": {
344318
"has_next": False,
345319
"has_previous": False,
@@ -354,18 +328,18 @@ async def test_search_jobs_by_location_id_non_existen_id(
354328
}
355329

356330

357-
async def test_search_jobs_by_name_and_type(
331+
async def test_search_by_jobs_type(
358332
test_client: AsyncClient,
359333
async_session: AsyncSession,
360-
jobs_search_with_different_types: tuple[Job],
334+
jobs_with_locations_and_types: tuple[Job],
361335
mocked_user: MockedUser,
362336
) -> None:
363-
jobs = await enrich_jobs(jobs_search_with_different_types, async_session)
364-
job_with_dag_type = jobs[0]
337+
jobs = await enrich_jobs(jobs_with_locations_and_types, async_session)
338+
[_, dag_job, task_job] = jobs
365339
response = await test_client.get(
366340
"/v1/jobs",
367341
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
368-
params={"search_query": "airflow"},
342+
params={"job_type": ["AIRFLOW_DAG", "AIRFLOW_TASK"]},
369343
)
370344

371345
assert response.status_code == HTTPStatus.OK, response.json()
@@ -385,51 +359,25 @@ async def test_search_jobs_by_name_and_type(
385359
"id": str(job.id),
386360
"data": job_to_json(job),
387361
}
388-
for job in jobs
362+
for job in (dag_job, task_job)
389363
],
390364
}
391365

392-
response_with_type_filter = await test_client.get(
393-
"/v1/jobs",
394-
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
395-
params={"job_type": "AIRFLOW_DAG", "search_query": "airflow"},
396-
)
397366

398-
assert response_with_type_filter.status_code == HTTPStatus.OK, response_with_type_filter.json()
399-
assert response_with_type_filter.json() == {
400-
"meta": {
401-
"has_next": False,
402-
"has_previous": False,
403-
"next_page": None,
404-
"page": 1,
405-
"page_size": 20,
406-
"pages_count": 1,
407-
"previous_page": None,
408-
"total_count": 1,
409-
},
410-
"items": [
411-
{
412-
"id": str(job_with_dag_type.id),
413-
"data": job_to_json(job_with_dag_type),
414-
},
415-
],
416-
}
417-
418-
419-
async def test_search_jobs_by_name_and_type_non_existen_type(
367+
async def test_search_jobs_by_non_existen_type(
420368
test_client: AsyncClient,
421369
async_session: AsyncSession,
422-
jobs_search_with_different_types: tuple[Job],
370+
jobs_with_locations_and_types: tuple[Job],
423371
mocked_user: MockedUser,
424372
) -> None:
425-
response_with_type_filter = await test_client.get(
373+
response = await test_client.get(
426374
"/v1/jobs",
427375
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
428-
params={"job_type": "NO_EXISTEN_TYPE", "search_query": "airflow"},
376+
params={"job_type": "NO_EXISTEN_TYPE"},
429377
)
430378

431-
assert response_with_type_filter.status_code == HTTPStatus.OK, response_with_type_filter.json()
432-
assert response_with_type_filter.json() == {
379+
assert response.status_code == HTTPStatus.OK, response.json()
380+
assert response.json() == {
433381
"meta": {
434382
"has_next": False,
435383
"has_previous": False,

0 commit comments

Comments
 (0)