Skip to content

Commit 5309bc6

Browse files
committed
Done for dataset as well. TODO: For KVS and RQ
1 parent 8954f4f commit 5309bc6

18 files changed

+439
-369
lines changed

src/apify_client/_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ListPage(Generic[T]):
3131
desc: bool
3232
"""Whether the listing is descending or not."""
3333

34-
def __init__(self: ListPage, data: dict) -> None:
34+
def __init__(self, data: dict) -> None:
3535
"""Initialize a ListPage instance from the API response data."""
3636
self.items = data.get('items', [])
3737
self.offset = data.get('offset', 0)

src/apify_client/clients/base/base_client.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING, Any
3+
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Generator, Iterable, Iterator
4+
from typing import (
5+
TYPE_CHECKING,
6+
Any,
7+
Generic,
8+
Protocol,
9+
TypeVar,
10+
)
411

512
from apify_client._logging import WithLogDetailsClient
13+
from apify_client._types import ListPage
614
from apify_client._utils import to_safe_id
715

816
# Conditional import only executed when type checking, otherwise we'd get circular dependency issues
917
if TYPE_CHECKING:
1018
from apify_client import ApifyClient, ApifyClientAsync
1119
from apify_client._http_client import HTTPClient, HTTPClientAsync
20+
T = TypeVar('T')
1221

1322

1423
class _BaseBaseClient(metaclass=WithLogDetailsClient):
@@ -87,6 +96,40 @@ def __init__(
8796
self.safe_id = to_safe_id(self.resource_id)
8897
self.url = f'{self.url}/{self.safe_id}'
8998

99+
@staticmethod
100+
def _list_iterable_from_callback(callback: Callable[..., ListPage[T]], **kwargs: Any) -> ListPageProtocol[T]:
101+
"""Return object can be awaited or iterated over.
102+
103+
Not using total from the API response as it can change during iteration.
104+
"""
105+
chunk_size = kwargs.pop('chunk_size', 0) or 0
106+
offset = kwargs.get('offset') or 0
107+
limit = kwargs.get('limit') or 0
108+
109+
list_page = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
110+
111+
def iterator() -> Iterator[T]:
112+
current_page = list_page
113+
for item in current_page.items:
114+
yield item
115+
116+
fetched_items = len(current_page.items)
117+
while (
118+
current_page.items # If there are any items left to fetch
119+
and (not limit or (limit > fetched_items)) # If there is limit to fetch, and it was not reached it yet.
120+
):
121+
new_kwargs = {
122+
**kwargs,
123+
'offset': offset + fetched_items,
124+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
125+
}
126+
current_page = callback(**new_kwargs)
127+
for item in current_page.items:
128+
yield item
129+
fetched_items += len(current_page.items)
130+
131+
return IterableListPage[T](list_page, iterator())
132+
90133

91134
class BaseClientAsync(_BaseBaseClient):
92135
"""Base class for async sub-clients."""
@@ -127,3 +170,112 @@ def __init__(
127170
if self.resource_id is not None:
128171
self.safe_id = to_safe_id(self.resource_id)
129172
self.url = f'{self.url}/{self.safe_id}'
173+
174+
@staticmethod
175+
def _list_iterable_from_callback(
176+
callback: Callable[..., Awaitable[ListPage[T]]], **kwargs: Any
177+
) -> ListPageProtocolAsync[T]:
178+
"""Return object can be awaited or iterated over.
179+
180+
Not using total from the API response as it can change during iteration.
181+
"""
182+
chunk_size = kwargs.pop('chunk_size', 0) or 0
183+
offset = kwargs.get('offset') or 0
184+
limit = kwargs.get('limit') or 0
185+
186+
list_page_awaitable = callback(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
187+
188+
async def async_iterator() -> AsyncIterator[T]:
189+
current_page = await list_page_awaitable
190+
for item in current_page.items:
191+
yield item
192+
193+
fetched_items = len(current_page.items)
194+
while (
195+
current_page.items # If there are any items left to fetch
196+
and (not limit or (limit > fetched_items)) # If there is limit to fetch, and it was not reached it yet.
197+
):
198+
new_kwargs = {
199+
**kwargs,
200+
'offset': offset + fetched_items,
201+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
202+
}
203+
current_page = await callback(**new_kwargs)
204+
for item in current_page.items:
205+
yield item
206+
fetched_items += len(current_page.items)
207+
208+
return IterableListPageAsync[T](list_page_awaitable, async_iterator())
209+
210+
211+
def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
212+
"""Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity."""
213+
# API treats 0 as None for limit parameter, in this context API understands 0 as infinity.
214+
if a == 0:
215+
a = None
216+
if b == 0:
217+
b = None
218+
if a is None:
219+
return b
220+
if b is None:
221+
return a
222+
return min(a, b)
223+
224+
225+
class ListPageProtocol(Iterable[T], Protocol[T]):
226+
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
227+
228+
items: list[T]
229+
"""List of returned objects on this page."""
230+
231+
count: int
232+
"""Count of the returned objects on this page."""
233+
234+
offset: int
235+
"""The limit on the number of returned objects offset specified in the API call."""
236+
237+
limit: int
238+
"""The offset of the first object specified in the API call"""
239+
240+
total: int
241+
"""Total number of objects matching the API call criteria."""
242+
243+
desc: bool
244+
"""Whether the listing is descending or not."""
245+
246+
247+
class ListPageProtocolAsync(AsyncIterable[T], Awaitable[ListPage[T]], Protocol[T]):
248+
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
249+
250+
251+
class IterableListPage(ListPage[T], Generic[T]):
252+
"""Can be called to get ListPage with items or iterated over to get individual items."""
253+
254+
def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None:
255+
self.items = list_page.items
256+
self.offset = list_page.offset
257+
self.limit = list_page.limit
258+
self.count = list_page.count
259+
self.total = list_page.total
260+
self.desc = list_page.desc
261+
self._iterator = iterator
262+
263+
def __iter__(self) -> Iterator[T]:
264+
"""Return an iterator over the items from API, possibly doing multiple API calls."""
265+
return self._iterator
266+
267+
268+
class IterableListPageAsync(Generic[T]):
269+
"""Can be awaited to get ListPage with items or asynchronously iterated over to get individual items."""
270+
271+
def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None:
272+
self._awaitable = awaitable
273+
self._async_iterator = async_iterator
274+
275+
def __aiter__(self) -> AsyncIterator[T]:
276+
"""Return an asynchronous iterator over the items from API, possibly doing multiple API calls."""
277+
return self._async_iterator
278+
279+
def __await__(self) -> Generator[Any, Any, ListPage[T]]:
280+
"""Return an awaitable that resolves to the ListPage doing exactly one API call."""
281+
return self._awaitable.__await__()

0 commit comments

Comments
 (0)