diff --git a/src/apify/apify_storage_client/_request_queue_client.py b/src/apify/apify_storage_client/_request_queue_client.py index 2cdbe58d..03c51df1 100644 --- a/src/apify/apify_storage_client/_request_queue_client.py +++ b/src/apify/apify_storage_client/_request_queue_client.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from more_itertools import chunked from typing_extensions import override from crawlee import Request @@ -157,8 +158,11 @@ async def batch_add_requests( *, forefront: bool = False, ) -> BatchRequestsOperationResponse: - return BatchRequestsOperationResponse.model_validate( - await self._client.batch_add_requests( + processed = [] + unprocessed = [] + + for chunk in chunked(requests, 25): # The API endpoint won't accept more than 25 requests at once + response = await self._client.batch_add_requests( requests=[ r.model_dump( by_alias=True, @@ -170,10 +174,18 @@ async def batch_add_requests( 'data', }, ) - for r in requests + for r in chunk ], forefront=forefront, ) + processed.extend(response['processedRequests']) + unprocessed.extend(response['unprocessedRequests']) + + return BatchRequestsOperationResponse.model_validate( + { + 'processedRequests': processed, + 'unprocessedRequests': unprocessed, + } ) @override diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 46afa2ab..2ec06914 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -39,3 +39,68 @@ async def main() -> None: run_result = await actor.call() assert run_result is not None assert run_result['status'] == 'SUCCEEDED' + + async def test_batch(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + desired_request_count = 100 + print('Opening request queue...') + # I have seen it get stuck on this call + rq = await Actor.open_request_queue() + # Add some requests + await rq.add_requests_batched([f'https://example.com/{i}' for i in range(desired_request_count)]) + + handled_request_count = 0 + while next_request := await rq.fetch_next_request(): + print('Fetching next request...') + queue_operation_info = await rq.mark_request_as_handled(next_request) + assert queue_operation_info is not None + assert queue_operation_info.was_already_handled is False + handled_request_count += 1 + + assert handled_request_count == desired_request_count + print('Waiting for queue to be finished...') + is_finished = await rq.is_finished() + assert is_finished is True + + actor = await make_actor('rq-batch-test', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + + async def test_batch_non_unique(self, make_actor: ActorFactory) -> None: + async def main() -> None: + from crawlee import Request + + async with Actor: + desired_request_count = 100 + print('Opening request queue...') + # I have seen it get stuck on this call + rq = await Actor.open_request_queue() + # Add some requests + await rq.add_requests_batched( + [ + Request.from_url(f'https://example.com/{i}', unique_key=str(i - 1 if i % 4 == 1 else i)) + for i in range(desired_request_count) + ] + ) + + handled_request_count = 0 + while next_request := await rq.fetch_next_request(): + print('Fetching next request...') + queue_operation_info = await rq.mark_request_as_handled(next_request) + assert queue_operation_info is not None + assert queue_operation_info.was_already_handled is False + handled_request_count += 1 + + assert handled_request_count == desired_request_count * 3 / 4 + print('Waiting for queue to be finished...') + is_finished = await rq.is_finished() + assert is_finished is True + + actor = await make_actor('rq-batch-test', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED'