diff --git a/docs/queries/pagination-and-rows-number.md b/docs/queries/pagination-and-rows-number.md index d29284254..5efabf6e5 100644 --- a/docs/queries/pagination-and-rows-number.md +++ b/docs/queries/pagination-and-rows-number.md @@ -73,7 +73,12 @@ tracks = await Track.objects.limit(1).all() So operations like `filter()`, `select_related()`, `limit()` and `offset()` etc. can be chained. - Something like `Track.object.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()` + Something like `Track.objects.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()` + +!!!note + You can slice the results; But note that negative indexing is not supported. + + Something like: `Track.objects[5:10].all()` ## offset @@ -107,8 +112,12 @@ tracks = await Track.objects.offset(1).limit(1).all() So operations like `filter()`, `select_related()`, `limit()` and `offset()` etc. can be chained. - Something like `Track.object.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()` + Something like `Track.objects.select_related("album").filter(album__name="Malibu").offset(1).limit(1).all()` + +!!!note + You can slice the results; But note that negative indexing is not supported. + Something like: `Track.objects[5:10].all()` ## get diff --git a/ormar/queryset/queryset.py b/ormar/queryset/queryset.py index b2aa5e78c..c5a15cfa1 100644 --- a/ormar/queryset/queryset.py +++ b/ormar/queryset/queryset.py @@ -41,6 +41,7 @@ class LegacyRow(dict): # type: ignore from ormar.queryset.queries.prefetch_query import PrefetchQuery from ormar.queryset.queries.query import Query from ormar.queryset.reverse_alias_resolver import ReverseAliasResolver +from ormar.queryset.utils import get_limit_offset if TYPE_CHECKING: # pragma no cover from ormar import Model @@ -1217,3 +1218,12 @@ async def bulk_update( # noqa: CCR001 await cast(Type["Model"], self.model_cls).Meta.signals.post_bulk_update.send( sender=self.model_cls, instances=objects # type: ignore ) + + def __getitem__(self, key: Union[int, slice]) -> "QuerySet[T]": + """Retrieve an item or slice from the set of results.""" + + if not isinstance(key, (int, slice)): + raise TypeError(f"{key} is neither an integer nor a range.") + + limit, offset = get_limit_offset(key=key) + return self.rebuild_self(offset=offset, limit_count=limit) diff --git a/ormar/queryset/utils.py b/ormar/queryset/utils.py index 4ebe07c7f..939eb1e0a 100644 --- a/ormar/queryset/utils.py +++ b/ormar/queryset/utils.py @@ -16,6 +16,11 @@ if TYPE_CHECKING: # pragma no cover from ormar import Model, BaseField +try: + from typing import Literal # type: ignore +except ImportError: # pragma: no cover + from typing_extensions import Literal # type: ignore + def check_node_not_dict_or_not_last_node( part: str, is_last: bool, current_level: Any @@ -340,3 +345,47 @@ def _process_through_field( else: relation = related_field.related_name return previous_model, relation, is_through + + +def _int_limit_offset(key: int) -> Tuple[Literal[1], int]: + """ + Returned the `Limit` & `Offset` Calculated for Integer Object. + + :param key: integer number in `__getitem__` + :type key: int + :return: limit_count, offset + :rtype: Tuple[Optional[int], Optional[int]] + """ + + if key < 0: + raise ValueError("Negative indexing is not supported.") + + return 1, key + + +def _slice_limit_offset(key: slice) -> Tuple[Optional[int], Optional[int]]: + """ + Returned the `Limit` & `Offset` Calculated for Slice Object. + + :param key: slice object in `__getitem__` + :type key: slice + :return: limit_count, offset + :rtype: Tuple[Optional[int], Optional[int]] + """ + + if key.step is not None and key.step != 1: + raise ValueError(f"{key.step} steps are not supported, only one.") + + start, stop = key.start is not None, key.stop is not None + if (start and key.start < 0) or (stop and key.stop < 0): + raise ValueError("The selected range is not valid.") + + limit_count: Optional[int] = max(key.stop - (key.start or 0), 0) if stop else None + return limit_count, key.start + + +def get_limit_offset(key: Union[int, slice]) -> Tuple[Optional[int], Optional[int]]: + """Utility to Select Limit Offset Function by `key` Type Slice or Integer""" + + func = _int_limit_offset if isinstance(key, int) else _slice_limit_offset + return func(key=key) # type: ignore diff --git a/ormar/relations/querysetproxy.py b/ormar/relations/querysetproxy.py index e27934dbf..a3c96cffc 100644 --- a/ormar/relations/querysetproxy.py +++ b/ormar/relations/querysetproxy.py @@ -874,3 +874,19 @@ def order_by(self, columns: Union[List, str, "OrderAction"]) -> "QuerysetProxy[T return self.__class__( relation=self.relation, type_=self.type_, to=self.to, qryset=queryset ) + + def __getitem__(self, key: Union[int, slice]) -> "QuerysetProxy[T]": + """ + You can slice the results to desired number of parent models. + + Actual call delegated to QuerySet. + + :param key: numbers of models to slicing + :type key: int | slice + :return: QuerysetProxy + :rtype: QuerysetProxy + """ + queryset = self.queryset[key] + return self.__class__( + relation=self.relation, type_=self.type_, to=self.to, qryset=queryset + ) diff --git a/tests/test_queries/test_pagination.py b/tests/test_queries/test_pagination.py index 9650736a0..70a9403c5 100644 --- a/tests/test_queries/test_pagination.py +++ b/tests/test_queries/test_pagination.py @@ -120,3 +120,79 @@ async def test_proxy_pagination(): await user.cars.paginate(2, page_size=10).all() assert len(user.cars) == 10 assert user.cars[0].name == "10" + + +@pytest.mark.asyncio +async def test_slice_getitem_queryset_exceptions(): + async with database: + async with database.transaction(force_rollback=True): + with pytest.raises(TypeError): + await Car.objects["foo"].all() + + with pytest.raises(ValueError): + await Car.objects[-1].all() + + with pytest.raises(ValueError): + await Car.objects[::2].all() + + with pytest.raises(ValueError): + await Car.objects[-2:-1].all() + + +@pytest.mark.asyncio +async def test_slice_getitem_queryset_on_single_model(): + async with database: + async with database.transaction(force_rollback=True): + for i in range(10): + await Car(name=f"{i}").save() + + cars_page1 = await Car.objects[2:8].all() + assert len(cars_page1) == 6 + assert cars_page1[0].name == "2" + assert cars_page1[-1].name == "7" + + cars_page2 = await Car.objects[2:].all() + assert len(cars_page2) == 8 + assert cars_page2[0].name == "2" + assert cars_page2[-1].name == "9" + + cars_page3 = await Car.objects[:8].all() + assert len(cars_page3) == 8 + assert cars_page3[0].name == "0" + assert cars_page3[-1].name == "7" + + cars_page4 = await Car.objects[5].all() + assert len(cars_page4) == 1 + assert cars_page4[0].name == "5" + + cars_page5 = await Car.objects[8:2].all() + assert len(cars_page5) == 0 + assert cars_page5 == [] + + +@pytest.mark.asyncio +async def test_slice_getitem_queryset_on_proxy(): + async with database: + async with database.transaction(force_rollback=True): + user = await User(name="Sep").save() + + for i in range(20): + c = await Car(name=f"{i}").save() + await user.cars.add(c) + + await user.cars.filter(id__gte=0)[:5].all() + assert len(user.cars) == 5 + assert user.cars[0].name == "0" + assert user.cars[4].name == "4" + + await user.cars.filter(id__gte=0)[5:10].all() + assert len(user.cars) == 5 + assert user.cars[0].name == "5" + assert user.cars[4].name == "9" + + await user.cars.filter(id__gte=0)[10].all() + assert len(user.cars) == 1 + + await user.cars.filter(id__gte=0)[10:].all() + assert len(user.cars) == 10 + assert user.cars[0].name == "10"