Skip to content

Commit 65b297a

Browse files
committed
Update to handle parallel requests with same links
1 parent 03dcb15 commit 65b297a

File tree

2 files changed

+69
-12
lines changed

2 files changed

+69
-12
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,10 @@ async def add_batch_of_requests(
239239
Response containing information about the added requests.
240240
"""
241241
# Do not try to add previously added requests to avoid pointless expensive calls to API
242+
242243
new_requests: list[Request] = []
243244
already_present_requests: list[dict[str, str | bool]] = []
245+
244246
for request in requests:
245247
if self._requests_cache.get(request.id):
246248
# We are no sure if it was already handled at this point, and it is not worth calling API for it.
@@ -254,12 +256,22 @@ async def add_batch_of_requests(
254256
)
255257

256258
else:
259+
# Add new request to the cache.
260+
processed_request = ProcessedRequest.model_validate(
261+
{
262+
'id': request.id,
263+
'uniqueKey': request.unique_key,
264+
'wasAlreadyPresent': True,
265+
'wasAlreadyHandled': request.was_already_handled,
266+
}
267+
)
268+
self._cache_request(
269+
unique_key_to_request_id(request.unique_key),
270+
processed_request,
271+
forefront=False,
272+
)
257273
new_requests.append(request)
258274

259-
logger.debug(
260-
f'Adding new requests: {len(new_requests)}, '
261-
f'skipping already present requests: {len(already_present_requests)}'
262-
)
263275
if new_requests:
264276
# Prepare requests for API by converting to dictionaries.
265277
requests_dict = [
@@ -272,19 +284,16 @@ async def add_batch_of_requests(
272284

273285
# Send requests to API.
274286
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
275-
# Add new requests to the cache.
276-
for processed_request_raw in response['processedRequests']:
277-
processed_request = ProcessedRequest.model_validate(processed_request_raw)
278-
self._cache_request(
279-
unique_key_to_request_id(processed_request.unique_key),
280-
processed_request,
281-
forefront=False,
282-
)
283287
# Add the locally known already present processed requests based on the local cache.
284288
response['processedRequests'].extend(already_present_requests)
285289
else:
286290
response = {'unprocessedRequests': [], 'processedRequests': already_present_requests}
287291

292+
logger.debug(
293+
f'Added new requests: {len(new_requests)}, '
294+
f'skipped already present requests: {len(already_present_requests)}'
295+
)
296+
288297
# Update assumed total count for newly added requests.
289298
api_response = AddRequestsResponse.model_validate(response)
290299
new_request_count = 0

tests/integration/test_actor_request_queue.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,51 @@ async def main() -> None:
156156
run_result = await run_actor(actor)
157157

158158
assert run_result.status == 'SUCCEEDED'
159+
160+
161+
async def test_request_queue_parallel_deduplication(
162+
make_actor: MakeActorFunction,
163+
run_actor: RunActorFunction,
164+
) -> None:
165+
"""Test that the deduplication works correctly even with parallel attempts to add same links."""
166+
167+
async def main() -> None:
168+
import asyncio
169+
import logging
170+
171+
from apify import Actor, Request
172+
173+
async with Actor:
174+
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
175+
176+
requests = [Request.from_url(f'http://example.com/{i}') for i in range(1000)]
177+
rq = await Actor.open_request_queue()
178+
179+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
180+
181+
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
182+
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
183+
_rq = await rq_client.get()
184+
assert _rq
185+
stats_before = _rq.get('stats', {})
186+
Actor.log.info(stats_before)
187+
188+
# Add same requests in 10 parallel workers
189+
async def add_requests_worker() -> None:
190+
await rq.add_requests(requests)
191+
192+
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)]
193+
await asyncio.gather(*add_requests_workers)
194+
195+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
196+
_rq = await rq_client.get()
197+
assert _rq
198+
stats_after = _rq.get('stats', {})
199+
Actor.log.info(stats_after)
200+
201+
assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests)
202+
203+
actor = await make_actor(label='rq-parallel-deduplication', main_func=main)
204+
run_result = await run_actor(actor)
205+
206+
assert run_result.status == 'SUCCEEDED'

0 commit comments

Comments
 (0)