Skip to content

Commit dd03c4d

Browse files
Pijukatelvdusek
andauthored
feat: Add deduplication to add_batch_of_requests (#534)
### Description - Ensure that already known requests are excluded from `api_client.batch_add_requests` calls to avoid expensive and pointless API calls. - Add all new requests to the cache when calling `batch_add_requests`. - Add test with real API usage measurement. ### Issues - Closes: #514 ### Testing - Added new integration tests to verify reduced API usage. - Comparing benchmark actor based on master vs this PR. Actor is a simple ParselCrawler that crawls the whole crawlee.dev, which contains many duplicate links, as the documentation is cross-linked thoroughly. Results: - Massive reduction of cost from request queue. - Significant overall speed up due to reduced API calls. --------- Co-authored-by: Vlada Dusek <[email protected]>
1 parent 52777a7 commit dd03c4d

File tree

3 files changed

+314
-53
lines changed

3 files changed

+314
-53
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,20 +242,77 @@ async def add_batch_of_requests(
242242
Returns:
243243
Response containing information about the added requests.
244244
"""
245-
# Prepare requests for API by converting to dictionaries.
246-
requests_dict = [
247-
request.model_dump(
248-
by_alias=True,
249-
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
245+
# Do not try to add previously added requests to avoid pointless expensive calls to API
246+
247+
new_requests: list[Request] = []
248+
already_present_requests: list[ProcessedRequest] = []
249+
250+
for request in requests:
251+
if self._requests_cache.get(request.id):
252+
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
253+
# It could have been handled by another client in the meantime, so cached information about
254+
# `request.was_already_handled` is not reliable.
255+
already_present_requests.append(
256+
ProcessedRequest.model_validate(
257+
{
258+
'id': request.id,
259+
'uniqueKey': request.unique_key,
260+
'wasAlreadyPresent': True,
261+
'wasAlreadyHandled': request.was_already_handled,
262+
}
263+
)
264+
)
265+
266+
else:
267+
# Add new request to the cache.
268+
processed_request = ProcessedRequest.model_validate(
269+
{
270+
'id': request.id,
271+
'uniqueKey': request.unique_key,
272+
'wasAlreadyPresent': True,
273+
'wasAlreadyHandled': request.was_already_handled,
274+
}
275+
)
276+
self._cache_request(
277+
unique_key_to_request_id(request.unique_key),
278+
processed_request,
279+
)
280+
new_requests.append(request)
281+
282+
if new_requests:
283+
# Prepare requests for API by converting to dictionaries.
284+
requests_dict = [
285+
request.model_dump(
286+
by_alias=True,
287+
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
288+
)
289+
for request in new_requests
290+
]
291+
292+
# Send requests to API.
293+
api_response = AddRequestsResponse.model_validate(
294+
await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
295+
)
296+
297+
# Add the locally known already present processed requests based on the local cache.
298+
api_response.processed_requests.extend(already_present_requests)
299+
300+
# Remove unprocessed requests from the cache
301+
for unprocessed_request in api_response.unprocessed_requests:
302+
self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None)
303+
304+
else:
305+
api_response = AddRequestsResponse.model_validate(
306+
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
250307
)
251-
for request in requests
252-
]
253308

254-
# Send requests to API.
255-
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
309+
logger.debug(
310+
f'Tried to add new requests: {len(new_requests)}, '
311+
f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, '
312+
f'skipped already present requests: {len(already_present_requests)}'
313+
)
256314

257315
# Update assumed total count for newly added requests.
258-
api_response = AddRequestsResponse.model_validate(response)
259316
new_request_count = 0
260317
for processed_request in api_response.processed_requests:
261318
if not processed_request.was_already_present and not processed_request.was_already_handled:

0 commit comments

Comments
 (0)