diff --git a/data_rentgen/db/repositories/dataset.py b/data_rentgen/db/repositories/dataset.py index 128afe8f..ebdba92e 100644 --- a/data_rentgen/db/repositories/dataset.py +++ b/data_rentgen/db/repositories/dataset.py @@ -99,14 +99,18 @@ async def paginate( dataset_ids: Collection[int], tag_value_ids: Collection[int], location_id: int | None, + location_type: Collection[str], search_query: str | None, ) -> PaginationDTO[Dataset]: where = [] + location_join_clause = Location.id == Dataset.location_id if dataset_ids: where.append(Dataset.id == any_(list(dataset_ids))) # type: ignore[arg-type] - if location_id: where.append(Dataset.location_id == location_id) + if location_type: + location_type_lower = [location_type.lower() for location_type in location_type] + where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type] if tag_value_ids: tv_ids = list(tag_value_ids) @@ -125,19 +129,20 @@ async def paginate( if search_query: tsquery = make_tsquery(search_query) - dataset_stmt = select(Dataset, ts_rank(Dataset.search_vector, tsquery).label("search_rank")).where( - ts_match(Dataset.search_vector, tsquery), - *where, + dataset_stmt = ( + select(Dataset, ts_rank(Dataset.search_vector, tsquery).label("search_rank")) + .join(Location, location_join_clause) + .where(ts_match(Dataset.search_vector, tsquery), *where) ) location_stmt = ( select(Dataset, ts_rank(Location.search_vector, tsquery).label("search_rank")) - .join(Dataset, Location.id == Dataset.location_id) + .join(Location, location_join_clause) .where(ts_match(Location.search_vector, tsquery), *where) ) address_stmt = ( select(Dataset, func.max(ts_rank(Address.search_vector, tsquery)).label("search_rank")) - .join(Location, Address.location_id == Location.id) - .join(Dataset, Location.id == Dataset.location_id) + .join(Location, location_join_clause) + .join(Address, Address.location_id == Dataset.location_id) .where(ts_match(Address.search_vector, tsquery), *where) .group_by(Dataset.id, Location.id, Address.id) ) @@ -152,7 +157,7 @@ async def paginate( ).group_by(*dataset_columns) order_by = [desc("search_rank"), asc("name")] else: - query = select(Dataset).where(*where) + query = select(Dataset).join(Location, location_join_clause).where(*where) order_by = [Dataset.name] options = [ diff --git a/data_rentgen/db/repositories/job.py b/data_rentgen/db/repositories/job.py index 45d35eb3..cf3c1f3a 100644 --- a/data_rentgen/db/repositories/job.py +++ b/data_rentgen/db/repositories/job.py @@ -77,35 +77,40 @@ async def paginate( job_ids: Collection[int], search_query: str | None, location_id: int | None, + location_type: Collection[str], job_type: Collection[str], ) -> PaginationDTO[Job]: where = [] + location_join_clause = Location.id == Job.location_id if job_ids: where.append(Job.id == any_(list(job_ids))) # type: ignore[arg-type] - - query: Select | CompoundSelect - order_by: list[ColumnElement | SQLColumnExpression] if job_type: where.append(Job.type == any_(list(job_type))) # type: ignore[arg-type] if location_id: where.append(Job.location_id == location_id) # type: ignore[arg-type] + if location_type: + location_type_lower = [location_type.lower() for location_type in location_type] + where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type] + query: Select | CompoundSelect + order_by: list[ColumnElement | SQLColumnExpression] if search_query: tsquery = make_tsquery(search_query) - job_stmt = select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank")).where( - ts_match(Job.search_vector, tsquery), - *where, + job_stmt = ( + select(Job, ts_rank(Job.search_vector, tsquery).label("search_rank")) + .join(Location, location_join_clause) + .where(ts_match(Job.search_vector, tsquery), *where) ) location_stmt = ( select(Job, ts_rank(Location.search_vector, tsquery).label("search_rank")) - .join(Job, Location.id == Job.location_id) + .join(Location, location_join_clause) .where(ts_match(Location.search_vector, tsquery), *where) ) address_stmt = ( select(Job, func.max(ts_rank(Address.search_vector, tsquery).label("search_rank"))) - .join(Location, Address.location_id == Location.id) - .join(Job, Location.id == Job.location_id) + .join(Location, location_join_clause) + .join(Address, Address.location_id == Job.location_id) .where(ts_match(Address.search_vector, tsquery), *where) .group_by(Job.id, Location.id, Address.id) ) @@ -120,7 +125,7 @@ async def paginate( ).group_by(*job_columns) order_by = [desc("search_rank"), asc("name")] else: - query = select(Job).where(*where) + query = select(Job).join(Location, location_join_clause).where(*where) order_by = [Job.name] options = [selectinload(Job.location).selectinload(Location.addresses)] diff --git a/data_rentgen/db/repositories/job_type.py b/data_rentgen/db/repositories/job_type.py index 4d044f75..cdf55855 100644 --- a/data_rentgen/db/repositories/job_type.py +++ b/data_rentgen/db/repositories/job_type.py @@ -20,6 +20,8 @@ JobType.type == bindparam("type"), ) +get_distinct_query = select(JobType.type).distinct(JobType.type).order_by(JobType.type) + class JobTypeRepository(Repository[JobType]): async def fetch_bulk(self, job_types_dto: list[JobTypeDTO]) -> list[tuple[JobTypeDTO, JobType | None]]: @@ -41,8 +43,7 @@ async def create(self, job_type_dto: JobTypeDTO) -> JobType: return await self._get(job_type_dto) or await self._create(job_type_dto) async def get_job_types(self) -> Sequence[str]: - query = select(JobType.type).distinct(JobType.type) - result = await self._session.scalars(query) + result = await self._session.scalars(get_distinct_query) return result.all() async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None: diff --git a/data_rentgen/db/repositories/location.py b/data_rentgen/db/repositories/location.py index deb79e3d..0bb0ba26 100644 --- a/data_rentgen/db/repositories/location.py +++ b/data_rentgen/db/repositories/location.py @@ -42,6 +42,7 @@ ) .options(selectinload(Location.addresses)) ) +get_distinct_query = select(Location.type).distinct(Location.type).order_by(Location.type) class LocationRepository(Repository[Location]): @@ -50,16 +51,15 @@ async def paginate( page: int, page_size: int, location_ids: Collection[int], - location_type: str | None, + location_type: Collection[str], search_query: str | None, ) -> PaginationDTO[Location]: where = [] - if location_ids: where.append(Location.id == any_(list(location_ids))) # type: ignore[arg-type] - if location_type: - where.append(Location.type == location_type) + location_type_lower = [location_type.lower() for location_type in location_type] + where.append(Location.type == any_(location_type_lower)) # type: ignore[arg-type] query: Select | CompoundSelect order_by: list[ColumnElement | SQLColumnExpression] @@ -161,3 +161,7 @@ async def _update_addresses(self, existing: Location, new: LocationDTO) -> Locat existing.addresses.extend(addresses) await self._session.flush([existing]) return existing + + async def get_location_types(self): + scalars = await self._session.scalars(get_distinct_query) + return scalars.all() diff --git a/data_rentgen/server/api/v1/router/dataset.py b/data_rentgen/server/api/v1/router/dataset.py index ce9e3a7d..96869965 100644 --- a/data_rentgen/server/api/v1/router/dataset.py +++ b/data_rentgen/server/api/v1/router/dataset.py @@ -39,6 +39,7 @@ async def paginate_datasets( dataset_ids=query_args.dataset_id, tag_value_ids=query_args.tag_value_id, location_id=query_args.location_id, + location_type=query_args.location_type, search_query=query_args.search_query, ) return PageResponseV1[DatasetDetailedResponseV1].from_pagination(pagination) diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index 622ba159..7ba005e9 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -44,6 +44,7 @@ async def paginate_jobs( job_ids=query_args.job_id, search_query=query_args.search_query, location_id=query_args.location_id, + location_type=query_args.location_type, job_type=query_args.job_type, ) return PageResponseV1[JobDetailedResponseV1].from_pagination(pagination) @@ -74,4 +75,4 @@ async def get_job_types( current_user: Annotated[User, Depends(get_user())], ) -> JobTypesResponseV1: job_types = await job_service.get_job_types() - return JobTypesResponseV1(job_types=sorted(job_types)) + return JobTypesResponseV1(job_types=list(job_types)) diff --git a/data_rentgen/server/api/v1/router/location.py b/data_rentgen/server/api/v1/router/location.py index 81a20468..f0006121 100644 --- a/data_rentgen/server/api/v1/router/location.py +++ b/data_rentgen/server/api/v1/router/location.py @@ -14,6 +14,7 @@ PageResponseV1, UpdateLocationRequestV1, ) +from data_rentgen.server.schemas.v1.location import LocationTypesResponseV1 from data_rentgen.server.services import LocationService, get_user router = APIRouter( @@ -57,3 +58,12 @@ async def update_location( ) -> LocationDetailedResponseV1: location = await location_service.update_external_id(location_id, location_data.external_id) return LocationDetailedResponseV1.model_validate(location) + + +@router.get("/types", summary="Get distinct types of Locations") +async def get_location_types( + location_service: Annotated[LocationService, Depends()], + current_user: Annotated[User, Depends(get_user())], +) -> LocationTypesResponseV1: + location_types = await location_service.get_location_types() + return LocationTypesResponseV1(location_types=list(location_types)) diff --git a/data_rentgen/server/schemas/v1/dataset.py b/data_rentgen/server/schemas/v1/dataset.py index d060f339..59fbb459 100644 --- a/data_rentgen/server/schemas/v1/dataset.py +++ b/data_rentgen/server/schemas/v1/dataset.py @@ -55,13 +55,28 @@ class DatasetDetailedResponseV1(BaseModel): class DatasetPaginateQueryV1(PaginateQueryV1): """Query params for Dataset paginate request.""" - dataset_id: list[int] = Field(default_factory=list, description="Dataset id") - tag_value_id: list[int] = Field(default_factory=list, description="Tag value id") - location_id: int | None = Field(default=None, description="Location id to filter dataset") + dataset_id: list[int] = Field( + default_factory=list, + description="Get specific datasets by their ids", + ) + tag_value_id: list[int] = Field( + default_factory=list, + description="Get datasets with specific tag values (AND)", + ) + location_id: int | None = Field( + default=None, + description="Get datasets by location id", + ) + location_type: list[str] = Field( + default_factory=list, + description="Get datasets by location types", + examples=[["yarn"]], + ) search_query: str | None = Field( default=None, min_length=3, description="Search query", + examples=[["my dataset"]], ) model_config = ConfigDict(extra="forbid") diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 21cb0625..d4f8c7bf 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -27,9 +27,12 @@ class JobDetailedResponseV1(BaseModel): class JobTypesResponseV1(BaseModel): - """JobTypes""" + """Job types""" - job_types: list[str] = Field(description="List of distinct job types") + job_types: list[str] = Field( + description="List of distinct job types", + examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]], + ) model_config = ConfigDict(from_attributes=True) @@ -42,14 +45,22 @@ class JobPaginateQueryV1(PaginateQueryV1): default=None, min_length=3, description="Search query", + examples=["my job"], ) job_type: list[str] = Field( default_factory=list, description="Specify job types", + examples=[["SPARK_APPLICATION", "AIRFLOW_DAG"]], ) location_id: int | None = Field( default=None, description="The location id which jobs belong", + examples=[123], + ) + location_type: list[str] = Field( + default_factory=list, + description="Specify location types", + examples=[["yarn"]], ) model_config = ConfigDict(extra="forbid") diff --git a/data_rentgen/server/schemas/v1/location.py b/data_rentgen/server/schemas/v1/location.py index 5d992697..b6fac597 100644 --- a/data_rentgen/server/schemas/v1/location.py +++ b/data_rentgen/server/schemas/v1/location.py @@ -51,15 +51,31 @@ class LocationDetailedResponseV1(BaseModel): model_config = ConfigDict(from_attributes=True) +class LocationTypesResponseV1(BaseModel): + """Location types""" + + location_types: list[str] = Field( + description="List of distinct location types", + examples=[["kafka", "hdfs", "yarn"]], + ) + + model_config = ConfigDict(from_attributes=True) + + class LocationPaginateQueryV1(PaginateQueryV1): """Query params for Location paginate request.""" location_id: list[int] = Field(default_factory=list, description="Location id") - location_type: str | None = Field(default=None, description="Location type") + location_type: list[str] = Field( + default_factory=list, + description="Location type", + examples=[["kafka", "hdfs"], ["yarn"]], + ) search_query: str | None = Field( default=None, min_length=3, description="Search query", + examples=[["localhost:8123"]], ) model_config = ConfigDict(extra="forbid") diff --git a/data_rentgen/server/services/dataset.py b/data_rentgen/server/services/dataset.py index 4b9a07b2..9129f497 100644 --- a/data_rentgen/server/services/dataset.py +++ b/data_rentgen/server/services/dataset.py @@ -43,6 +43,7 @@ async def paginate( dataset_ids: Collection[int], tag_value_ids: Collection[int], location_id: int | None, + location_type: Collection[str], search_query: str | None, ) -> DatasetServicePaginatedResult: pagination = await self._uow.dataset.paginate( @@ -51,6 +52,7 @@ async def paginate( dataset_ids=dataset_ids, tag_value_ids=tag_value_ids, location_id=location_id, + location_type=location_type, search_query=search_query, ) diff --git a/data_rentgen/server/services/job.py b/data_rentgen/server/services/job.py index 76db7e20..d465a4bc 100644 --- a/data_rentgen/server/services/job.py +++ b/data_rentgen/server/services/job.py @@ -32,6 +32,7 @@ async def paginate( job_ids: Collection[int], search_query: str | None, location_id: int | None, + location_type: Collection[str], job_type: Collection[str], ) -> JobServicePaginatedResult: pagination = await self._uow.job.paginate( @@ -40,6 +41,7 @@ async def paginate( job_ids=job_ids, search_query=search_query, location_id=location_id, + location_type=location_type, job_type=job_type, ) diff --git a/data_rentgen/server/services/location.py b/data_rentgen/server/services/location.py index b96b0f21..1c15db0a 100644 --- a/data_rentgen/server/services/location.py +++ b/data_rentgen/server/services/location.py @@ -1,6 +1,6 @@ # SPDX-FileCopyrightText: 2024-2025 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from collections.abc import Collection +from collections.abc import Collection, Sequence from dataclasses import dataclass from typing import Annotated @@ -62,7 +62,7 @@ async def paginate( page: int, page_size: int, location_ids: Collection[int], - location_type: str | None, + location_type: Collection[str], search_query: str | None, ) -> LocationServicePaginatedResult: pagination = await self._uow.location.paginate( @@ -109,3 +109,6 @@ async def update_external_id( jobs=LocationServiceJobStatistics.from_row(job_stats.get(location.id)), ), ) + + async def get_location_types(self) -> Sequence[str]: + return await self._uow.location.get_location_types() diff --git a/docs/changelog/next_release/328.feature.1.rst b/docs/changelog/next_release/328.feature.1.rst new file mode 100644 index 00000000..a8b63ef1 --- /dev/null +++ b/docs/changelog/next_release/328.feature.1.rst @@ -0,0 +1 @@ +Add new ``GET /v1/locations/types`` endpoint returning list of all known location types. diff --git a/docs/changelog/next_release/328.feature.2.rst b/docs/changelog/next_release/328.feature.2.rst new file mode 100644 index 00000000..22751c84 --- /dev/null +++ b/docs/changelog/next_release/328.feature.2.rst @@ -0,0 +1,2 @@ +Add new filter to ``GET /v1/jobs``: + - location_type: ``list[str]`` diff --git a/docs/changelog/next_release/328.feature.3.rst b/docs/changelog/next_release/328.feature.3.rst new file mode 100644 index 00000000..1f2f25e5 --- /dev/null +++ b/docs/changelog/next_release/328.feature.3.rst @@ -0,0 +1,2 @@ +Add new filter to ``GET /v1/datasets``: + - location_type: ``list[str]`` diff --git a/docs/changelog/next_release/328.feature.4.rst b/docs/changelog/next_release/328.feature.4.rst new file mode 100644 index 00000000..1bcc8929 --- /dev/null +++ b/docs/changelog/next_release/328.feature.4.rst @@ -0,0 +1 @@ +Allow passing multiple ``location_type`` filters to ``GET /v1/locations``. diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index 96b60c55..b2a90500 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -235,20 +235,20 @@ async def jobs_search( @pytest_asyncio.fixture async def jobs_with_locations_and_types( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], -) -> AsyncGenerator[tuple[Job]]: +) -> AsyncGenerator[tuple[Job, ...]]: async with async_session_maker() as async_session: cluster_location = await create_location(async_session, location_kwargs={"name": "my-cluster", "type": "yarn"}) airflow_location = await create_location( async_session, location_kwargs={"name": "airflow-host", "type": "http"}, ) - cluster_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + spark_type = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) airflow_dag_type = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) airflow_task_type = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) - cluster_job = await create_job( + spark_job = await create_job( async_session, location_id=cluster_location.id, - job_type_id=cluster_type.id, + job_type_id=spark_type.id, job_kwargs={"name": "my-job_cluster"}, ) dag_job = await create_job( @@ -266,7 +266,7 @@ async def jobs_with_locations_and_types( async_session.expunge_all() - yield (cluster_job, dag_job, task_job) + yield (spark_job, dag_job, task_job) async with async_session_maker() as async_session: await clean_db(async_session) diff --git a/tests/test_server/test_datasets/test_get_datasets_by_location.py b/tests/test_server/test_datasets/test_get_datasets_by_location.py new file mode 100644 index 00000000..e2d5eb7c --- /dev/null +++ b/tests/test_server/test_datasets/test_get_datasets_by_location.py @@ -0,0 +1,156 @@ +from http import HTTPStatus + +import pytest +from httpx import AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from data_rentgen.db.models import Dataset, Location +from tests.fixtures.mocks import MockedUser +from tests.test_server.utils.convert_to_json import dataset_to_json +from tests.test_server.utils.enrich import enrich_datasets + +pytestmark = [pytest.mark.server, pytest.mark.asyncio] + + +async def test_get_datasets_by_location_id( + test_client: AsyncClient, + async_session: AsyncSession, + datasets_search: tuple[dict[str, Dataset], ...], + mocked_user: MockedUser, +) -> None: + _, _, datasets_by_address = datasets_search + datasets = await enrich_datasets([datasets_by_address["hdfs://my-cluster-namenode:2080"]], async_session) + location_id = datasets[0].location_id + + response = await test_client.get( + "/v1/datasets", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_id": location_id}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 1, + }, + "items": [ + { + "id": str(dataset.id), + "data": dataset_to_json(dataset), + "tags": [], + } + for dataset in datasets + ], + } + + +async def test_get_datasets_by_location_id_non_existent( + test_client: AsyncClient, + async_session: AsyncSession, + datasets_search: tuple[dict[str, Dataset], ...], + mocked_user: MockedUser, +) -> None: + response = await test_client.get( + "/v1/datasets", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_id": -1}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 0, + }, + "items": [], + } + + +async def test_get_datasets_by_location_type( + test_client: AsyncClient, + async_session: AsyncSession, + datasets_search: tuple[dict[str, Dataset], ...], + mocked_user: MockedUser, +) -> None: + # random locations created by datasets_search fixture can also have type=hdfs + datasets_query = ( + select(Dataset) + .join(Location, Location.id == Dataset.location_id) + .where(Location.type == "hdfs") + .order_by(Dataset.name) + ) + + dataset_scalars = await async_session.scalars(datasets_query) + async_session.expunge_all() + + datasets = await enrich_datasets(list(dataset_scalars.all()), async_session) + + response = await test_client.get( + "/v1/datasets", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_type": ["HDFS"]}, # case-insensitive + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": len(datasets), + }, + "items": [ + { + "id": str(dataset.id), + "data": dataset_to_json(dataset), + "tags": [], + } + for dataset in datasets + ], + } + + +async def test_get_datasets_by_location_type_non_existent( + test_client: AsyncClient, + async_session: AsyncSession, + datasets_search: tuple[dict[str, Dataset], ...], + mocked_user: MockedUser, +) -> None: + response = await test_client.get( + "/v1/datasets", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_type": "non_existent_location_type"}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 0, + }, + "items": [], + } diff --git a/tests/test_server/test_jobs/test_get_jobs_by_location.py b/tests/test_server/test_jobs/test_get_jobs_by_location.py new file mode 100644 index 00000000..c8d479a9 --- /dev/null +++ b/tests/test_server/test_jobs/test_get_jobs_by_location.py @@ -0,0 +1,140 @@ +from http import HTTPStatus + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from data_rentgen.db.models import Job +from tests.fixtures.mocks import MockedUser +from tests.test_server.utils.convert_to_json import job_to_json +from tests.test_server.utils.enrich import enrich_jobs + +pytestmark = [pytest.mark.server, pytest.mark.asyncio] + + +async def test_get_jobs_by_location_id( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +) -> None: + jobs = await enrich_jobs(jobs_with_locations_and_types, async_session) + + # first job in jobs has a different location unlike two others + [_, dag_job, task_job] = jobs + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_id": dag_job.location_id}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 2, + }, + "items": [ + { + "id": str(job.id), + "data": job_to_json(job), + } + for job in [dag_job, task_job] + ], + } + + +async def test_get_jobs_by_location_id_non_existent( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +): + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_id": -1}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 0, + }, + "items": [], + } + + +async def test_get_lobs_by_location_type( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +) -> None: + spark_job, *_ = await enrich_jobs(jobs_with_locations_and_types, async_session) + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_type": ["YARN"]}, # case-insensitive + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 1, + }, + "items": [ + { + "id": str(spark_job.id), + "data": job_to_json(spark_job), + }, + ], + } + + +async def test_get_jobs_by_location_type_non_existent( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +): + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"location_type": "non_existing_location_type"}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 0, + }, + "items": [], + } diff --git a/tests/test_server/test_jobs/test_get_jobs_by_type.py b/tests/test_server/test_jobs/test_get_jobs_by_type.py new file mode 100644 index 00000000..d32c4fe0 --- /dev/null +++ b/tests/test_server/test_jobs/test_get_jobs_by_type.py @@ -0,0 +1,91 @@ +from http import HTTPStatus + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from data_rentgen.db.models import Job, JobType +from tests.fixtures.mocks import MockedUser +from tests.test_server.utils.convert_to_json import job_to_json +from tests.test_server.utils.enrich import enrich_jobs + +pytestmark = [pytest.mark.server, pytest.mark.asyncio] + + +async def test_get_job_types( + test_client: AsyncClient, + job_types: list[JobType], + mocked_user: MockedUser, +) -> None: + unique_job_type = {item.type for item in job_types} + response = await test_client.get( + "/v1/jobs/types", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == {"job_types": sorted(unique_job_type)} + + +async def test_get_jobs_by_job_type( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +) -> None: + jobs = await enrich_jobs(jobs_with_locations_and_types, async_session) + [_, dag_job, task_job] = jobs + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"job_type": ["AIRFLOW_DAG", "AIRFLOW_TASK"]}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 2, + }, + "items": [ + { + "id": str(job.id), + "data": job_to_json(job), + } + for job in (dag_job, task_job) + ], + } + + +async def test_get_jobs_by_non_existent_type( + test_client: AsyncClient, + async_session: AsyncSession, + jobs_with_locations_and_types: tuple[Job, ...], + mocked_user: MockedUser, +) -> None: + response = await test_client.get( + "/v1/jobs", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"job_type": "NO_EXISTENT_TYPE"}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "meta": { + "has_next": False, + "has_previous": False, + "next_page": None, + "page": 1, + "page_size": 20, + "pages_count": 1, + "previous_page": None, + "total_count": 0, + }, + "items": [], + } diff --git a/tests/test_server/test_jobs/test_search_jobs.py b/tests/test_server/test_jobs/test_search_jobs.py index d5089eb3..f6d9ce2e 100644 --- a/tests/test_server/test_jobs/test_search_jobs.py +++ b/tests/test_server/test_jobs/test_search_jobs.py @@ -4,7 +4,7 @@ from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession -from data_rentgen.db.models import Job, JobType +from data_rentgen.db.models import Job from tests.fixtures.mocks import MockedUser from tests.test_server.utils.convert_to_json import job_to_json from tests.test_server.utils.enrich import enrich_jobs @@ -245,148 +245,3 @@ async def test_search_jobs_no_results( }, "items": [], } - - -async def test_get_job_types( - test_client: AsyncClient, - job_types: list[JobType], - mocked_user: MockedUser, -) -> None: - unique_job_type = {item.type for item in job_types} - response = await test_client.get( - "/v1/jobs/types", - headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == {"job_types": sorted(unique_job_type)} - - -async def test_search_jobs_by_location_id( - test_client: AsyncClient, - async_session: AsyncSession, - jobs_with_locations_and_types: tuple[Job], - mocked_user: MockedUser, -) -> None: - jobs = await enrich_jobs(jobs_with_locations_and_types, async_session) - - # first job in jobs has a different location unlike two others - [_, dag_job, task_job] = jobs - response = await test_client.get( - "/v1/jobs", - headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - params={"location_id": dag_job.location_id}, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == { - "meta": { - "has_next": False, - "has_previous": False, - "next_page": None, - "page": 1, - "page_size": 20, - "pages_count": 1, - "previous_page": None, - "total_count": 2, - }, - "items": [ - { - "id": str(job.id), - "data": job_to_json(job), - } - for job in [dag_job, task_job] - ], - } - - -async def test_search_jobs_by_location_id_non_existen_id( - test_client: AsyncClient, - async_session: AsyncSession, - jobs_with_locations_and_types: tuple[Job], - mocked_user: MockedUser, -): - response = await test_client.get( - "/v1/jobs", - headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - params={"location_id": -1}, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == { - "meta": { - "has_next": False, - "has_previous": False, - "next_page": None, - "page": 1, - "page_size": 20, - "pages_count": 1, - "previous_page": None, - "total_count": 0, - }, - "items": [], - } - - -async def test_search_by_jobs_type( - test_client: AsyncClient, - async_session: AsyncSession, - jobs_with_locations_and_types: tuple[Job], - mocked_user: MockedUser, -) -> None: - jobs = await enrich_jobs(jobs_with_locations_and_types, async_session) - [_, dag_job, task_job] = jobs - response = await test_client.get( - "/v1/jobs", - headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - params={"job_type": ["AIRFLOW_DAG", "AIRFLOW_TASK"]}, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == { - "meta": { - "has_next": False, - "has_previous": False, - "next_page": None, - "page": 1, - "page_size": 20, - "pages_count": 1, - "previous_page": None, - "total_count": 2, - }, - "items": [ - { - "id": str(job.id), - "data": job_to_json(job), - } - for job in (dag_job, task_job) - ], - } - - -async def test_search_jobs_by_non_existen_type( - test_client: AsyncClient, - async_session: AsyncSession, - jobs_with_locations_and_types: tuple[Job], - mocked_user: MockedUser, -) -> None: - response = await test_client.get( - "/v1/jobs", - headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - params={"job_type": "NO_EXISTEN_TYPE"}, - ) - - assert response.status_code == HTTPStatus.OK, response.json() - assert response.json() == { - "meta": { - "has_next": False, - "has_previous": False, - "next_page": None, - "page": 1, - "page_size": 20, - "pages_count": 1, - "previous_page": None, - "total_count": 0, - }, - "items": [], - } diff --git a/tests/test_server/test_locations/test_get_locations.py b/tests/test_server/test_locations/test_get_locations.py index 2953b76a..eaef9fef 100644 --- a/tests/test_server/test_locations/test_get_locations.py +++ b/tests/test_server/test_locations/test_get_locations.py @@ -54,6 +54,21 @@ async def test_get_locations_no_filters( } +async def test_get_location_types( + test_client: AsyncClient, + locations: list[Location], + mocked_user: MockedUser, +) -> None: + unique_location_type = {item.type for item in locations} + response = await test_client.get( + "/v1/locations/types", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == {"location_types": sorted(unique_location_type)} + + async def test_get_locations_with_type_filter( test_client: AsyncClient, locations: list[Location], @@ -67,7 +82,7 @@ async def test_get_locations_with_type_filter( response = await test_client.get( "v1/locations", headers={"Authorization": f"Bearer {mocked_user.access_token}"}, - params={"location_type": location_type}, + params={"location_type": location_type.upper()}, # case-insensitive ) assert response.status_code == HTTPStatus.OK, response.json() diff --git a/tests/test_server/utils/enrich.py b/tests/test_server/utils/enrich.py index f03d024d..ac8a7c5b 100644 --- a/tests/test_server/utils/enrich.py +++ b/tests/test_server/utils/enrich.py @@ -1,3 +1,5 @@ +from collections.abc import Sequence + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from sqlalchemy.sql import select @@ -6,7 +8,7 @@ from data_rentgen.db.models.tag import Tag -async def enrich_runs(runs: list[Run], async_session: AsyncSession) -> list[Run]: +async def enrich_runs(runs: Sequence[Run], async_session: AsyncSession) -> list[Run]: run_ids = [run.id for run in runs] query = ( select(Run) @@ -22,7 +24,7 @@ async def enrich_runs(runs: list[Run], async_session: AsyncSession) -> list[Run] return [runs_by_id[run_id] for run_id in run_ids] -async def enrich_datasets(datasets: list[Dataset], async_session: AsyncSession) -> list[Dataset]: +async def enrich_datasets(datasets: Sequence[Dataset], async_session: AsyncSession) -> list[Dataset]: dataset_ids = [dataset.id for dataset in datasets] query = ( select(Dataset) @@ -36,7 +38,7 @@ async def enrich_datasets(datasets: list[Dataset], async_session: AsyncSession) return [datasets_by_id[dataset_id] for dataset_id in dataset_ids] -async def enrich_jobs(jobs: list[Job], async_session: AsyncSession) -> list[Job]: +async def enrich_jobs(jobs: Sequence[Job], async_session: AsyncSession) -> list[Job]: job_ids = [job.id for job in jobs] query = select(Job).where(Job.id.in_(job_ids)).options(selectinload(Job.location).selectinload(Location.addresses)) result = await async_session.scalars(query) @@ -45,7 +47,7 @@ async def enrich_jobs(jobs: list[Job], async_session: AsyncSession) -> list[Job] return [jobs_by_id[job_id] for job_id in job_ids] -async def enrich_locations(locations: list[Location], async_session: AsyncSession) -> list[Location]: +async def enrich_locations(locations: Sequence[Location], async_session: AsyncSession) -> list[Location]: location_ids = [location.id for location in locations] query = select(Location).where(Location.id.in_(location_ids)).options(selectinload(Location.addresses)) result = await async_session.scalars(query) @@ -54,7 +56,7 @@ async def enrich_locations(locations: list[Location], async_session: AsyncSessio return [locations_by_id[location_id] for location_id in location_ids] -async def enrich_tags(tags: list[Tag], async_session: AsyncSession) -> list[Tag]: +async def enrich_tags(tags: Sequence[Tag], async_session: AsyncSession) -> list[Tag]: tag_ids = [tag.id for tag in tags] query = select(Tag).where(Tag.id.in_(tag_ids)).options(selectinload(Tag.tag_values)) result = await async_session.scalars(query)