Skip to content

Commit 8954f4f

Browse files
committed
WIP do not trust total. Start for datatset
1 parent 14a8662 commit 8954f4f

File tree

2 files changed

+61
-28
lines changed

2 files changed

+61
-28
lines changed

src/apify_client/clients/base/resource_collection_client.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,32 +53,37 @@ def _list(self, **kwargs: Any) -> ListPage:
5353
return ListPage(parse_date_fields(pluck_data(response.json())))
5454

5555
def _list_iterable(self, **kwargs: Any) -> IterableListPage[T]:
56-
"""Return object can be awaited or iterated over."""
57-
chunk_size = kwargs.pop('chunk_size', None)
56+
"""Return object can be awaited or iterated over.
5857
59-
list_page = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
58+
Not using total from the API response as it can change during iteration.
59+
"""
60+
chunk_size = kwargs.pop('chunk_size', 0) or 0
61+
offset = kwargs.get('offset') or 0
62+
limit = kwargs.get('limit') or 0
63+
64+
list_page = self._list(**{**kwargs, 'limit': _min_for_limit_param(limit, chunk_size)})
6065

6166
def iterator() -> Iterator[T]:
6267
current_page = list_page
6368
for item in current_page.items:
6469
yield item
6570

66-
offset = kwargs.get('offset') or 0
67-
limit = min(kwargs.get('limit') or current_page.total, current_page.total)
68-
69-
current_offset = offset + len(current_page.items)
70-
remaining_items = min(current_page.total - offset, limit) - len(current_page.items)
71-
while current_page.items and remaining_items > 0:
71+
fetched_items = len(current_page.items)
72+
while (
73+
current_page.items # If there are any items left to fetch
74+
and (
75+
not limit or (limit > fetched_items)
76+
) # If there are is limit to fetch and have not reached it yet.
77+
):
7278
new_kwargs = {
7379
**kwargs,
74-
'offset': current_offset,
75-
'limit': _min_for_limit_param(remaining_items, chunk_size),
80+
'offset': offset + fetched_items,
81+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
7682
}
7783
current_page = self._list(**new_kwargs)
7884
for item in current_page.items:
7985
yield item
80-
current_offset += len(current_page.items)
81-
remaining_items -= len(current_page.items)
86+
fetched_items += len(current_page.items)
8287

8388
return IterableListPage[T](list_page, iterator())
8489

@@ -116,8 +121,13 @@ async def _list(self, **kwargs: Any) -> ListPage:
116121
return ListPage(parse_date_fields(pluck_data(response.json())))
117122

118123
def _list_iterable(self, **kwargs: Any) -> ListPageProtocolAsync[T]:
119-
"""Return object can be awaited or iterated over."""
120-
chunk_size = kwargs.pop('chunk_size', None)
124+
"""Return object can be awaited or iterated over.
125+
126+
Not using total from the API response as it can change during iteration.
127+
"""
128+
chunk_size = kwargs.pop('chunk_size', 0) or 0
129+
offset = kwargs.get('offset') or 0
130+
limit = kwargs.get('limit') or 0
121131

122132
list_page_awaitable = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})
123133

@@ -126,22 +136,22 @@ async def async_iterator() -> AsyncIterator[T]:
126136
for item in current_page.items:
127137
yield item
128138

129-
offset = kwargs.get('offset') or 0
130-
limit = min(kwargs.get('limit') or current_page.total, current_page.total)
131-
132-
current_offset = offset + len(current_page.items)
133-
remaining_items = min(current_page.total - offset, limit) - len(current_page.items)
134-
while current_page.items and remaining_items > 0:
139+
fetched_items = len(current_page.items)
140+
while (
141+
current_page.items # If there are any items left to fetch
142+
and (
143+
not limit or (limit > fetched_items)
144+
) # If there are is limit to fetch and have not reached it yet.
145+
):
135146
new_kwargs = {
136147
**kwargs,
137-
'offset': current_offset,
138-
'limit': _min_for_limit_param(remaining_items, chunk_size),
148+
'offset': offset + fetched_items,
149+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
139150
}
140151
current_page = await self._list(**new_kwargs)
141152
for item in current_page.items:
142153
yield item
143-
current_offset += len(current_page.items)
144-
remaining_items -= len(current_page.items)
154+
fetched_items += len(current_page.items)
145155

146156
return IterableListPageAsync[T](list_page_awaitable, async_iterator())
147157

@@ -170,7 +180,7 @@ async def _get_or_create(
170180
return parse_date_fields(pluck_data(response.json()))
171181

172182

173-
class ListPageProtocol(Protocol[T], Iterable[T]):
183+
class ListPageProtocol(Iterable[T], Protocol[T]):
174184
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
175185

176186
items: list[T]
@@ -209,7 +219,7 @@ def __iter__(self) -> Iterator[T]:
209219
return self._iterator
210220

211221

212-
class ListPageProtocolAsync(Protocol[T], AsyncIterable[T], Awaitable[ListPage[T]]):
222+
class ListPageProtocolAsync(AsyncIterable[T], Awaitable[ListPage[T]], Protocol[T]):
213223
"""Protocol for an object that can be both awaited and asynchronously iterated over."""
214224

215225

tests/unit/test_client_pagination.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
WebhookDispatchCollectionClient,
3434
WebhookDispatchCollectionClientAsync,
3535
)
36+
from apify_client.clients.resource_clients import DatasetClientAsync
3637

3738
CollectionClientAsync: TypeAlias = (
3839
ActorCollectionClientAsync
@@ -106,7 +107,12 @@ def mocked_api_pagination_logic(*_: Any, **kwargs: Any) -> dict:
106107
'items': items[lower_index : min(upper_index, lower_index + max_items_per_page)],
107108
}
108109
}
109-
110+
response.headers = {
111+
'x-apify-pagination-total': str(total_items),
112+
'x-apify-pagination-offset': str(offset),
113+
'x-apify-pagination-limit': str(limit or count),
114+
'x-apify-pagination-desc': str(params.get('desc', False)).lower(),
115+
}
110116
return response
111117

112118

@@ -265,3 +271,20 @@ def test_client_list_iterable(client: CollectionClient, inputs: dict, expected_i
265271
assert len(returned_items) == list_response.total
266272

267273
assert returned_items == expected_items
274+
275+
276+
@pytest.mark.parametrize(
277+
('inputs', 'expected_items', 'client'), generate_test_params(client_set='dataset', async_clients=True)
278+
)
279+
async def test_dataset_items_list_iterable_async(
280+
client: DatasetClientAsync, inputs: dict, expected_items: list[dict[str, int]]
281+
) -> None:
282+
with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic):
283+
returned_items = [item async for item in client.list_items(**inputs)]
284+
285+
if inputs == {}:
286+
list_response = await client.list_items(**inputs)
287+
print(1)
288+
assert len(returned_items) == list_response.total
289+
290+
assert returned_items == expected_items

0 commit comments

Comments
 (0)