Skip to content

Commit 4bc5c91

Browse files
committed
Add deduplication and test
1 parent ae3044e commit 4bc5c91

File tree

2 files changed

+80
-10
lines changed

2 files changed

+80
-10
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,17 +238,48 @@ async def add_batch_of_requests(
238238
Returns:
239239
Response containing information about the added requests.
240240
"""
241-
# Prepare requests for API by converting to dictionaries.
242-
requests_dict = [
243-
request.model_dump(
244-
by_alias=True,
245-
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
246-
)
247-
for request in requests
248-
]
241+
# Do not try to add previously added requests to avoid pointless expensive calls to API
242+
new_requests: list[Request] = []
243+
already_present_requests: list[ProcessedRequest] = []
244+
for request in requests:
245+
if self._requests_cache.get(request.id):
246+
# We are no sure if it was already handled at this point, and it is not worth calling API for it.
247+
already_present_requests.append(
248+
{
249+
'id': request.id,
250+
'uniqueKey': request.unique_key,
251+
'wasAlreadyPresent': True,
252+
'wasAlreadyHandled': request.was_already_handled,
253+
}
254+
)
249255

250-
# Send requests to API.
251-
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
256+
else:
257+
new_requests.append(request)
258+
259+
if new_requests:
260+
# Prepare requests for API by converting to dictionaries.
261+
requests_dict = [
262+
request.model_dump(
263+
by_alias=True,
264+
exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them.
265+
)
266+
for request in new_requests
267+
]
268+
269+
# Send requests to API.
270+
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
271+
# Add new requests to the cache.
272+
for processed_request_raw in response['processedRequests']:
273+
processed_request = ProcessedRequest.model_validate(processed_request_raw)
274+
self._cache_request(
275+
unique_key_to_request_id(processed_request.unique_key),
276+
processed_request,
277+
forefront=False,
278+
)
279+
# Add the locally known already present processed requests based on the local cache.
280+
response['processedRequests'].extend(already_present_requests)
281+
else:
282+
response = {'unprocessedRequests': [], 'processedRequests': already_present_requests}
252283

253284
# Update assumed total count for newly added requests.
254285
api_response = AddRequestsResponse.model_validate(response)

tests/integration/test_actor_request_queue.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,42 @@ async def test_request_queue_is_finished(
110110

111111
await request_queue.mark_request_as_handled(request)
112112
assert await request_queue.is_finished()
113+
114+
115+
async def test_request_queue_deduplication(
116+
make_actor: MakeActorFunction,
117+
run_actor: RunActorFunction,
118+
) -> None:
119+
"""Test that the deduplication works correctly. Try to add 2 same requests, but it should call API just once.
120+
121+
This tests internal optimization that changes no behavior for the user.
122+
The functions input/output behave the same way,it only uses less amount of API calls.
123+
"""
124+
125+
async def main() -> None:
126+
import asyncio
127+
128+
from apify import Actor, Request
129+
130+
async with Actor:
131+
request = Request.from_url('http://example.com')
132+
rq = await Actor.open_request_queue(name='test-deduplication', force_cloud=True)
133+
134+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
135+
stats_before = (await Actor.apify_client.request_queue(request_queue_id=rq.id).get()).get('stats', {})
136+
Actor.log.info(stats_before)
137+
138+
# Add same request twice
139+
await rq.add_request(request)
140+
await rq.add_request(request)
141+
142+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
143+
stats_after = (await Actor.apify_client.request_queue(request_queue_id=rq.id).get()).get('stats', {})
144+
Actor.log.info(stats_after)
145+
146+
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
147+
148+
actor = await make_actor(label='rq-deduplication', main_func=main)
149+
run_result = await run_actor(actor)
150+
151+
assert run_result.status == 'SUCCEEDED'

0 commit comments

Comments
 (0)