From a09519433117b020130699b3682eff543df39d2f Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 4 Sep 2024 14:24:28 +0200 Subject: [PATCH 1/3] Hotfix for batch_add_requests batch size limit --- .../_request_queue_client.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/apify/apify_storage_client/_request_queue_client.py b/src/apify/apify_storage_client/_request_queue_client.py index 2cdbe58d..211446fb 100644 --- a/src/apify/apify_storage_client/_request_queue_client.py +++ b/src/apify/apify_storage_client/_request_queue_client.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from more_itertools import chunked from typing_extensions import override from crawlee import Request @@ -157,8 +158,11 @@ async def batch_add_requests( *, forefront: bool = False, ) -> BatchRequestsOperationResponse: - return BatchRequestsOperationResponse.model_validate( - await self._client.batch_add_requests( + processed = [] + unprocessed = [] + + for chunk in chunked(requests, 25): + response = await self._client.batch_add_requests( requests=[ r.model_dump( by_alias=True, @@ -170,10 +174,18 @@ async def batch_add_requests( 'data', }, ) - for r in requests + for r in chunk ], forefront=forefront, ) + processed.extend(response['processedRequests']) + unprocessed.extend(response['unprocessedRequests']) + + return BatchRequestsOperationResponse.model_validate( + { + 'processedRequests': processed, + 'unprocessedRequests': unprocessed, + } ) @override From dc7298e47c98e4654ebd4150cd57622adb41d4a5 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 5 Sep 2024 13:14:05 +0200 Subject: [PATCH 2/3] Comment --- src/apify/apify_storage_client/_request_queue_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/apify_storage_client/_request_queue_client.py b/src/apify/apify_storage_client/_request_queue_client.py index 211446fb..03c51df1 100644 --- a/src/apify/apify_storage_client/_request_queue_client.py +++ b/src/apify/apify_storage_client/_request_queue_client.py @@ -161,7 +161,7 @@ async def batch_add_requests( processed = [] unprocessed = [] - for chunk in chunked(requests, 25): + for chunk in chunked(requests, 25): # The API endpoint won't accept more than 25 requests at once response = await self._client.batch_add_requests( requests=[ r.model_dump( From 9d48a6aaef6c41a91137dc8c3f12a9f8b83fc48a Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 5 Sep 2024 13:48:49 +0200 Subject: [PATCH 3/3] Add integration tests --- tests/integration/test_request_queue.py | 65 +++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 46afa2ab..2ec06914 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -39,3 +39,68 @@ async def main() -> None: run_result = await actor.call() assert run_result is not None assert run_result['status'] == 'SUCCEEDED' + + async def test_batch(self, make_actor: ActorFactory) -> None: + async def main() -> None: + async with Actor: + desired_request_count = 100 + print('Opening request queue...') + # I have seen it get stuck on this call + rq = await Actor.open_request_queue() + # Add some requests + await rq.add_requests_batched([f'https://example.com/{i}' for i in range(desired_request_count)]) + + handled_request_count = 0 + while next_request := await rq.fetch_next_request(): + print('Fetching next request...') + queue_operation_info = await rq.mark_request_as_handled(next_request) + assert queue_operation_info is not None + assert queue_operation_info.was_already_handled is False + handled_request_count += 1 + + assert handled_request_count == desired_request_count + print('Waiting for queue to be finished...') + is_finished = await rq.is_finished() + assert is_finished is True + + actor = await make_actor('rq-batch-test', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED' + + async def test_batch_non_unique(self, make_actor: ActorFactory) -> None: + async def main() -> None: + from crawlee import Request + + async with Actor: + desired_request_count = 100 + print('Opening request queue...') + # I have seen it get stuck on this call + rq = await Actor.open_request_queue() + # Add some requests + await rq.add_requests_batched( + [ + Request.from_url(f'https://example.com/{i}', unique_key=str(i - 1 if i % 4 == 1 else i)) + for i in range(desired_request_count) + ] + ) + + handled_request_count = 0 + while next_request := await rq.fetch_next_request(): + print('Fetching next request...') + queue_operation_info = await rq.mark_request_as_handled(next_request) + assert queue_operation_info is not None + assert queue_operation_info.was_already_handled is False + handled_request_count += 1 + + assert handled_request_count == desired_request_count * 3 / 4 + print('Waiting for queue to be finished...') + is_finished = await rq.is_finished() + assert is_finished is True + + actor = await make_actor('rq-batch-test', main_func=main) + + run_result = await actor.call() + assert run_result is not None + assert run_result['status'] == 'SUCCEEDED'