diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 519cd95a..c9e7031a 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -242,20 +242,77 @@ async def add_batch_of_requests( Returns: Response containing information about the added requests. """ - # Prepare requests for API by converting to dictionaries. - requests_dict = [ - request.model_dump( - by_alias=True, - exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. + # Do not try to add previously added requests to avoid pointless expensive calls to API + + new_requests: list[Request] = [] + already_present_requests: list[ProcessedRequest] = [] + + for request in requests: + if self._requests_cache.get(request.id): + # We are not sure if it was already handled at this point, and it is not worth calling API for it. + # It could have been handled by another client in the meantime, so cached information about + # `request.was_already_handled` is not reliable. + already_present_requests.append( + ProcessedRequest.model_validate( + { + 'id': request.id, + 'uniqueKey': request.unique_key, + 'wasAlreadyPresent': True, + 'wasAlreadyHandled': request.was_already_handled, + } + ) + ) + + else: + # Add new request to the cache. + processed_request = ProcessedRequest.model_validate( + { + 'id': request.id, + 'uniqueKey': request.unique_key, + 'wasAlreadyPresent': True, + 'wasAlreadyHandled': request.was_already_handled, + } + ) + self._cache_request( + unique_key_to_request_id(request.unique_key), + processed_request, + ) + new_requests.append(request) + + if new_requests: + # Prepare requests for API by converting to dictionaries. + requests_dict = [ + request.model_dump( + by_alias=True, + exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. + ) + for request in new_requests + ] + + # Send requests to API. + api_response = AddRequestsResponse.model_validate( + await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) + ) + + # Add the locally known already present processed requests based on the local cache. + api_response.processed_requests.extend(already_present_requests) + + # Remove unprocessed requests from the cache + for unprocessed_request in api_response.unprocessed_requests: + self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None) + + else: + api_response = AddRequestsResponse.model_validate( + {'unprocessedRequests': [], 'processedRequests': already_present_requests} ) - for request in requests - ] - # Send requests to API. - response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) + logger.debug( + f'Tried to add new requests: {len(new_requests)}, ' + f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, ' + f'skipped already present requests: {len(already_present_requests)}' + ) # Update assumed total count for newly added requests. - api_response = AddRequestsResponse.model_validate(response) new_request_count = 0 for processed_request in api_response.processed_requests: if not processed_request.was_already_present and not processed_request.was_already_handled: diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 64a846b5..a785f1ad 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -1,6 +1,11 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import asyncio +import logging +from typing import TYPE_CHECKING, Any +from unittest import mock + +import pytest from apify_shared.consts import ApifyEnvVars @@ -8,13 +13,28 @@ from apify import Actor, Request if TYPE_CHECKING: - import pytest + from collections.abc import AsyncGenerator from apify_client import ApifyClientAsync + from crawlee.storages import RequestQueue from .conftest import MakeActorFunction, RunActorFunction +@pytest.fixture +async def apify_named_rq( + apify_client_async: ApifyClientAsync, monkeypatch: pytest.MonkeyPatch +) -> AsyncGenerator[RequestQueue]: + assert apify_client_async.token + monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) + request_queue_name = generate_unique_resource_name('request_queue') + + async with Actor: + request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) + yield request_queue + await request_queue.drop() + + async def test_same_references_in_default_rq( make_actor: MakeActorFunction, run_actor: RunActorFunction, @@ -61,55 +81,239 @@ async def main() -> None: async def test_force_cloud( apify_client_async: ApifyClientAsync, - monkeypatch: pytest.MonkeyPatch, + apify_named_rq: RequestQueue, ) -> None: - assert apify_client_async.token is not None - monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) + request_queue_id = (await apify_named_rq.get_metadata()).id + request_info = await apify_named_rq.add_request(Request.from_url('http://example.com')) + request_queue_client = apify_client_async.request_queue(request_queue_id) - request_queue_name = generate_unique_resource_name('request_queue') + request_queue_details = await request_queue_client.get() + assert request_queue_details is not None + assert request_queue_details.get('name') == apify_named_rq.name - async with Actor: - request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) - request_queue_id = (await request_queue.get_metadata()).id + request_queue_request = await request_queue_client.get_request(request_info.id) + assert request_queue_request is not None + assert request_queue_request['url'] == 'http://example.com' - request_info = await request_queue.add_request(Request.from_url('http://example.com')) - request_queue_client = apify_client_async.request_queue(request_queue_id) +async def test_request_queue_is_finished( + apify_named_rq: RequestQueue, +) -> None: + request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True) + await request_queue.add_request(Request.from_url('http://example.com')) + assert not await request_queue.is_finished() - try: - request_queue_details = await request_queue_client.get() - assert request_queue_details is not None - assert request_queue_details.get('name') == request_queue_name + request = await request_queue.fetch_next_request() + assert request is not None + assert not await request_queue.is_finished(), ( + 'RequestQueue should not be finished unless the request is marked as handled.' + ) - request_queue_request = await request_queue_client.get_request(request_info.id) - assert request_queue_request is not None - assert request_queue_request['url'] == 'http://example.com' - finally: - await request_queue_client.delete() + await request_queue.mark_request_as_handled(request) + assert await request_queue.is_finished() -async def test_request_queue_is_finished( - apify_client_async: ApifyClientAsync, - monkeypatch: pytest.MonkeyPatch, +async def test_request_queue_deduplication( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, ) -> None: - assert apify_client_async.token is not None - monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) + """Test that the deduplication works correctly. Try to add 2 similar requests, but it should call API just once. - request_queue_name = generate_unique_resource_name('request_queue') + Deduplication works based on the request's `unique_key` only. To include more attributes in the unique key the + `use_extended_unique_key=True` argument of `Request.from_url` method can be used. + This tests internal optimization that changes no behavior for the user. + The functions input/output behave the same way,it only uses less amount of API calls. + """ - async with Actor: - try: - request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) - await request_queue.add_request(Request.from_url('http://example.com')) - assert not await request_queue.is_finished() - - request = await request_queue.fetch_next_request() - assert request is not None - assert not await request_queue.is_finished(), ( - 'RequestQueue should not be finished unless the request is marked as handled.' - ) - - await request_queue.mark_request_as_handled(request) - assert await request_queue.is_finished() - finally: - await request_queue.drop() + async def main() -> None: + import asyncio + + from apify import Actor, Request + + async with Actor: + request1 = Request.from_url('http://example.com', method='POST') + request2 = Request.from_url('http://example.com', method='GET') + rq = await Actor.open_request_queue() + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + + # Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client + rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id) + _rq = await rq_client.get() + assert _rq + stats_before = _rq.get('stats', {}) + Actor.log.info(stats_before) + + # Add same request twice + await rq.add_request(request1) + await rq.add_request(request2) + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + _rq = await rq_client.get() + assert _rq + stats_after = _rq.get('stats', {}) + Actor.log.info(stats_after) + + assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 + + actor = await make_actor(label='rq-deduplication', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_deduplication_use_extended_unique_key( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that the deduplication works correctly. Try to add 2 similar requests and it should call API just twice. + + Deduplication works based on the request's `unique_key` only. To include more attributes in the unique key the + `use_extended_unique_key=True` argument of `Request.from_url` method can be used. + This tests internal optimization that changes no behavior for the user. + The functions input/output behave the same way,it only uses less amount of API calls. + """ + + async def main() -> None: + import asyncio + + from apify import Actor, Request + + async with Actor: + request1 = Request.from_url('http://example.com', method='POST', use_extended_unique_key=True) + request2 = Request.from_url('http://example.com', method='GET', use_extended_unique_key=True) + rq = await Actor.open_request_queue() + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + + # Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client + rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id) + _rq = await rq_client.get() + assert _rq + stats_before = _rq.get('stats', {}) + Actor.log.info(stats_before) + + # Add same request twice + await rq.add_request(request1) + await rq.add_request(request2) + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + _rq = await rq_client.get() + assert _rq + stats_after = _rq.get('stats', {}) + Actor.log.info(stats_after) + + assert (stats_after['writeCount'] - stats_before['writeCount']) == 2 + + actor = await make_actor(label='rq-deduplication', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_parallel_deduplication( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that the deduplication works correctly even with parallel attempts to add same links. + + The test is set up in a way for workers to have some requests that were already added to the queue and some new + requests. The function must correctly deduplicate the requests and add only new requests. For example: + First worker adding 10 new requests, + second worker adding 10 new requests and 10 known requests, + third worker adding 10 new requests and 20 known requests and so on""" + + async def main() -> None: + import asyncio + import logging + + from apify import Actor, Request + + worker_count = 10 + max_requests = 100 + batch_size = iter(range(10, max_requests + 1, int(max_requests / worker_count))) + + async with Actor: + logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) + + requests = [Request.from_url(f'http://example.com/{i}') for i in range(max_requests)] + rq = await Actor.open_request_queue() + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + + # Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client + rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id) + _rq = await rq_client.get() + assert _rq + stats_before = _rq.get('stats', {}) + Actor.log.info(stats_before) + + # Add batches of some new and some already present requests in workers + async def add_requests_worker() -> None: + await rq.add_requests(requests[: next(batch_size)]) + + # Start all workers + add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(worker_count)] + await asyncio.gather(*add_requests_workers) + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + _rq = await rq_client.get() + assert _rq + stats_after = _rq.get('stats', {}) + Actor.log.info(stats_after) + + assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests) + + actor = await make_actor(label='rq-parallel-deduplication', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_deduplication_unprocessed_requests( + apify_named_rq: RequestQueue, +) -> None: + """Test that the deduplication does not add unprocessed requests to the cache. + + In this test the first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and + thus has no chance of increasing the `writeCount`. The second call can increase the `writeCount` only if it is not + cached, as cached requests do not make the call (tested in other tests). So this means the `unprocessedRequests` + request was intentionally not cached.""" + logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + + # Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client + rq_client = Actor.apify_client.request_queue(request_queue_id=apify_named_rq.id) + _rq = await rq_client.get() + assert _rq + stats_before = _rq.get('stats', {}) + Actor.log.info(stats_before) + + def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dict[str, list[dict]]: + """Simulate API returning unprocessed requests.""" + return { + 'processedRequests': [], + 'unprocessedRequests': [ + {'url': request['url'], 'uniqueKey': request['uniqueKey'], 'method': request['method']} + for request in requests + ], + } + + with mock.patch( + 'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests', + side_effect=return_unprocessed_requests, + ): + # Simulate failed API call for adding requests. Request was not processed and should not be cached. + await apify_named_rq.add_requests(['http://example.com/1']) + + # This will succeed. + await apify_named_rq.add_requests(['http://example.com/1']) + + await asyncio.sleep(10) # Wait to be sure that metadata are updated + _rq = await rq_client.get() + assert _rq + stats_after = _rq.get('stats', {}) + Actor.log.info(stats_after) + + assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 diff --git a/uv.lock b/uv.lock index aaaab0fb..0e5526c3 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" [[package]]