-
Notifications
You must be signed in to change notification settings - Fork 15
feat: Add deduplication to add_batch_of_requests
#534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 56 commits
5c437c9
bf55338
6b2f82b
38bef68
1f85430
a3d68a2
c77e8d5
8731aff
8dfaffb
53fad07
5869f8e
82e65fc
8de950b
98b76c5
7b5ee07
afcb8c7
3bacab7
287a119
e45d65b
51178ca
3cd7dfe
6fe9eb3
1547cbd
bb47efc
4e4fa93
683cb31
e5b2bc4
638756f
931b0ca
ad7c0d8
1f3c481
44d8e09
ca72313
bc61fee
16b76dd
b6e8a5f
a3f8c6e
594a8e5
e1afe2d
8ce6902
42810f0
9edac0f
ec2a9f0
f82d110
71ac38d
a8881dd
f5189c5
89e572e
ae3044e
4bc5c91
70908b3
91ff3fd
03dcb15
65b297a
079f890
2c3d0ce
329baed
978d49e
1b92532
cfdb1e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -242,17 +242,66 @@ async def add_batch_of_requests( | |
Returns: | ||
Response containing information about the added requests. | ||
""" | ||
# Prepare requests for API by converting to dictionaries. | ||
requests_dict = [ | ||
request.model_dump( | ||
by_alias=True, | ||
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. | ||
) | ||
for request in requests | ||
] | ||
# Do not try to add previously added requests to avoid pointless expensive calls to API | ||
|
||
new_requests: list[Request] = [] | ||
already_present_requests: list[dict[str, str | bool]] = [] | ||
|
||
for request in requests: | ||
if self._requests_cache.get(request.id): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Judging by apify/crawlee#3120, a day may come when we try to limit the size of EDIT: hollup a minute, do you use the ID here for deduplication instead of unique key? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since there is this deterministic transformation function There are two issues I created based on the discussion about this: |
||
# We are no sure if it was already handled at this point, and it is not worth calling API for it. | ||
|
||
already_present_requests.append( | ||
{ | ||
'id': request.id, | ||
'uniqueKey': request.unique_key, | ||
'wasAlreadyPresent': True, | ||
'wasAlreadyHandled': request.was_already_handled, | ||
} | ||
) | ||
|
||
else: | ||
# Add new request to the cache. | ||
processed_request = ProcessedRequest.model_validate( | ||
{ | ||
'id': request.id, | ||
'uniqueKey': request.unique_key, | ||
'wasAlreadyPresent': True, | ||
'wasAlreadyHandled': request.was_already_handled, | ||
} | ||
) | ||
self._cache_request( | ||
unique_key_to_request_id(request.unique_key), | ||
processed_request, | ||
) | ||
new_requests.append(request) | ||
|
||
if new_requests: | ||
# Prepare requests for API by converting to dictionaries. | ||
requests_dict = [ | ||
request.model_dump( | ||
by_alias=True, | ||
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. | ||
) | ||
for request in new_requests | ||
] | ||
|
||
# Send requests to API. | ||
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) | ||
|
||
# Add the locally known already present processed requests based on the local cache. | ||
response['processedRequests'].extend(already_present_requests) | ||
|
||
# Send requests to API. | ||
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) | ||
# Remove unprocessed requests from the cache | ||
for unprocessed in response['unprocessedRequests']: | ||
self._requests_cache.pop(unique_key_to_request_id(unprocessed['uniqueKey']), None) | ||
|
||
else: | ||
response = {'unprocessedRequests': [], 'processedRequests': already_present_requests} | ||
|
||
logger.debug( | ||
f'Tried to add new requests: {len(new_requests)}, ' | ||
f'succeeded to add new requests: {len(response["processedRequests"])}, ' | ||
f'skipped already present requests: {len(already_present_requests)}' | ||
) | ||
|
||
# Update assumed total count for newly added requests. | ||
api_response = AddRequestsResponse.model_validate(response) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,40 @@ | ||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
import asyncio | ||
import logging | ||
from typing import TYPE_CHECKING, Any | ||
from unittest import mock | ||
|
||
import pytest | ||
|
||
from apify_shared.consts import ApifyEnvVars | ||
|
||
from ._utils import generate_unique_resource_name | ||
from apify import Actor, Request | ||
|
||
if TYPE_CHECKING: | ||
import pytest | ||
from collections.abc import AsyncGenerator | ||
|
||
from apify_client import ApifyClientAsync | ||
from crawlee.storages import RequestQueue | ||
|
||
from .conftest import MakeActorFunction, RunActorFunction | ||
|
||
|
||
@pytest.fixture | ||
async def apify_named_rq( | ||
apify_client_async: ApifyClientAsync, monkeypatch: pytest.MonkeyPatch | ||
) -> AsyncGenerator[RequestQueue]: | ||
assert apify_client_async.token | ||
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) | ||
request_queue_name = generate_unique_resource_name('request_queue') | ||
|
||
async with Actor: | ||
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) | ||
yield request_queue | ||
await request_queue.drop() | ||
|
||
|
||
async def test_same_references_in_default_rq( | ||
make_actor: MakeActorFunction, | ||
run_actor: RunActorFunction, | ||
|
@@ -61,55 +81,171 @@ async def main() -> None: | |
|
||
async def test_force_cloud( | ||
apify_client_async: ApifyClientAsync, | ||
monkeypatch: pytest.MonkeyPatch, | ||
apify_named_rq: RequestQueue, | ||
) -> None: | ||
assert apify_client_async.token is not None | ||
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) | ||
request_queue_id = (await apify_named_rq.get_metadata()).id | ||
request_info = await apify_named_rq.add_request(Request.from_url('http://example.com')) | ||
request_queue_client = apify_client_async.request_queue(request_queue_id) | ||
|
||
request_queue_name = generate_unique_resource_name('request_queue') | ||
request_queue_details = await request_queue_client.get() | ||
assert request_queue_details is not None | ||
assert request_queue_details.get('name') == apify_named_rq.name | ||
|
||
async with Actor: | ||
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) | ||
request_queue_id = (await request_queue.get_metadata()).id | ||
request_queue_request = await request_queue_client.get_request(request_info.id) | ||
assert request_queue_request is not None | ||
assert request_queue_request['url'] == 'http://example.com' | ||
|
||
request_info = await request_queue.add_request(Request.from_url('http://example.com')) | ||
|
||
request_queue_client = apify_client_async.request_queue(request_queue_id) | ||
async def test_request_queue_is_finished( | ||
apify_named_rq: RequestQueue, | ||
) -> None: | ||
request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True) | ||
await request_queue.add_request(Request.from_url('http://example.com')) | ||
assert not await request_queue.is_finished() | ||
|
||
try: | ||
request_queue_details = await request_queue_client.get() | ||
assert request_queue_details is not None | ||
assert request_queue_details.get('name') == request_queue_name | ||
request = await request_queue.fetch_next_request() | ||
assert request is not None | ||
assert not await request_queue.is_finished(), ( | ||
'RequestQueue should not be finished unless the request is marked as handled.' | ||
) | ||
|
||
request_queue_request = await request_queue_client.get_request(request_info.id) | ||
assert request_queue_request is not None | ||
assert request_queue_request['url'] == 'http://example.com' | ||
finally: | ||
await request_queue_client.delete() | ||
await request_queue.mark_request_as_handled(request) | ||
assert await request_queue.is_finished() | ||
|
||
|
||
async def test_request_queue_is_finished( | ||
apify_client_async: ApifyClientAsync, | ||
monkeypatch: pytest.MonkeyPatch, | ||
async def test_request_queue_deduplication( | ||
make_actor: MakeActorFunction, | ||
run_actor: RunActorFunction, | ||
) -> None: | ||
assert apify_client_async.token is not None | ||
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) | ||
"""Test that the deduplication works correctly. Try to add 2 same requests, but it should call API just once. | ||
|
||
request_queue_name = generate_unique_resource_name('request_queue') | ||
This tests internal optimization that changes no behavior for the user. | ||
The functions input/output behave the same way,it only uses less amount of API calls. | ||
""" | ||
|
||
async with Actor: | ||
try: | ||
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) | ||
await request_queue.add_request(Request.from_url('http://example.com')) | ||
assert not await request_queue.is_finished() | ||
|
||
request = await request_queue.fetch_next_request() | ||
assert request is not None | ||
assert not await request_queue.is_finished(), ( | ||
'RequestQueue should not be finished unless the request is marked as handled.' | ||
) | ||
|
||
await request_queue.mark_request_as_handled(request) | ||
assert await request_queue.is_finished() | ||
finally: | ||
await request_queue.drop() | ||
async def main() -> None: | ||
import asyncio | ||
|
||
from apify import Actor, Request | ||
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async with Actor: | ||
request = Request.from_url('http://example.com') | ||
rq = await Actor.open_request_queue() | ||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
|
||
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client | ||
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id) | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_before = _rq.get('stats', {}) | ||
Actor.log.info(stats_before) | ||
|
||
# Add same request twice | ||
await rq.add_request(request) | ||
await rq.add_request(request) | ||
|
||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_after = _rq.get('stats', {}) | ||
Actor.log.info(stats_after) | ||
|
||
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 | ||
|
||
actor = await make_actor(label='rq-deduplication', main_func=main) | ||
run_result = await run_actor(actor) | ||
|
||
assert run_result.status == 'SUCCEEDED' | ||
|
||
|
||
async def test_request_queue_parallel_deduplication( | ||
make_actor: MakeActorFunction, | ||
run_actor: RunActorFunction, | ||
) -> None: | ||
"""Test that the deduplication works correctly even with parallel attempts to add same links.""" | ||
|
||
async def main() -> None: | ||
import asyncio | ||
import logging | ||
|
||
from apify import Actor, Request | ||
|
||
async with Actor: | ||
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) | ||
|
||
requests = [Request.from_url(f'http://example.com/{i}') for i in range(100)] | ||
rq = await Actor.open_request_queue() | ||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
|
||
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client | ||
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id) | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_before = _rq.get('stats', {}) | ||
Actor.log.info(stats_before) | ||
|
||
# Add same requests in 10 parallel workers | ||
async def add_requests_worker() -> None: | ||
await rq.add_requests(requests) | ||
|
||
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)] | ||
await asyncio.gather(*add_requests_workers) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you made sure that these do in fact run in parallel? To the naked eye, 100 requests doesn't seem like much, I'd expect that the event loop may run the tasks in sequence. Maybe you could add the requests in each worker in smaller batches and add some random delays? Or just add a comment saying that you verified parallel execution empirically 😁 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrote the test for the implementation that did not take parallel execution into account, and it was failing consistently. So from that perspective, I consider the test sufficient. Anyway, I added some chunking to make the test slightly more challenging. The parallel execution can be verified in the logs. For example, below. From the logs it can be seen that the
|
||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_after = _rq.get('stats', {}) | ||
Actor.log.info(stats_after) | ||
|
||
assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests) | ||
|
||
actor = await make_actor(label='rq-parallel-deduplication', main_func=main) | ||
run_result = await run_actor(actor) | ||
|
||
assert run_result.status == 'SUCCEEDED' | ||
|
||
|
||
async def test_request_queue_deduplication_unprocessed_requests( | ||
apify_named_rq: RequestQueue, | ||
) -> None: | ||
"""Test that the deduplication does not add unprocessed requests to the cache.""" | ||
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) | ||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
|
||
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client | ||
rq_client = Actor.apify_client.request_queue(request_queue_id=apify_named_rq.id) | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_before = _rq.get('stats', {}) | ||
Actor.log.info(stats_before) | ||
|
||
def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dict[str, list[dict]]: | ||
"""Simulate API returning unprocessed requests.""" | ||
return { | ||
'processedRequests': [], | ||
'unprocessedRequests': [ | ||
{'url': request['url'], 'uniqueKey': request['uniqueKey'], 'method': request['method']} | ||
for request in requests | ||
], | ||
} | ||
|
||
with mock.patch( | ||
'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests', | ||
side_effect=return_unprocessed_requests, | ||
): | ||
# Simulate failed API call for adding requests. Request was not processed and should not be cached. | ||
await apify_named_rq.add_requests(['http://example.com/1']) | ||
|
||
# This will succeed. | ||
await apify_named_rq.add_requests(['http://example.com/1']) | ||
Comment on lines
+303
to
+311
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance we could verify that the request was actually not cached between the two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is checked implicitly in the last line where it is asserted that there was exactly 1 writeCount difference. The first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and thus has no chance of increasing the writeCount. The second call can make the write only if it is not cached, as cached requests do not make the call (tested in other tests). So this means the request was not cached in between. I could assert the state of the cache in between those calls, but since it is kind of an implementation detail, I would prefer not to. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough, can you explain this in a comment then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, added to the test description. |
||
|
||
await asyncio.sleep(10) # Wait to be sure that metadata are updated | ||
_rq = await rq_client.get() | ||
assert _rq | ||
stats_after = _rq.get('stats', {}) | ||
Actor.log.info(stats_after) | ||
|
||
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.