Skip to content

Commit 329baed

Browse files
committed
Adress review comments
1 parent 2c3d0ce commit 329baed

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,9 @@ async def add_batch_of_requests(
249249

250250
for request in requests:
251251
if self._requests_cache.get(request.id):
252-
# We are no sure if it was already handled at this point, and it is not worth calling API for it.
252+
# We are not sure if it was already handled at this point, and it is not worth calling API for it.
253+
# It could have been handled by another client in the meantime, so cached information about
254+
# `request.was_already_handled` is not reliable.
253255
already_present_requests.append(
254256
{
255257
'id': request.id,
@@ -299,7 +301,7 @@ async def add_batch_of_requests(
299301

300302
logger.debug(
301303
f'Tried to add new requests: {len(new_requests)}, '
302-
f'succeeded to add new requests: {len(response["processedRequests"])}, '
304+
f'succeeded to add new requests: {len(response["processedRequests"]) - len(already_present_requests)}, '
303305
f'skipped already present requests: {len(already_present_requests)}'
304306
)
305307

tests/integration/test_actor_request_queue.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,28 @@ async def test_request_queue_parallel_deduplication(
163163
make_actor: MakeActorFunction,
164164
run_actor: RunActorFunction,
165165
) -> None:
166-
"""Test that the deduplication works correctly even with parallel attempts to add same links."""
166+
"""Test that the deduplication works correctly even with parallel attempts to add same links.
167+
168+
The test is set up in a way for workers to have some requests that were already added to the queue and some new
169+
requests. The function must correctly deduplicate the requests and add only new requests. For example:
170+
First worker adding 10 new requests,
171+
second worker adding 10 new requests and 10 known requests,
172+
third worker adding 10 new requests and 20 known requests and so on"""
167173

168174
async def main() -> None:
169175
import asyncio
170176
import logging
171177

172178
from apify import Actor, Request
173179

180+
worker_count = 10
181+
max_requests = 100
182+
batch_size = iter(range(10, max_requests + 1, int(max_requests / worker_count)))
183+
174184
async with Actor:
175185
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
176186

177-
requests = [Request.from_url(f'http://example.com/{i}') for i in range(100)]
187+
requests = [Request.from_url(f'http://example.com/{i}') for i in range(max_requests)]
178188
rq = await Actor.open_request_queue()
179189

180190
await asyncio.sleep(10) # Wait to be sure that metadata are updated
@@ -186,11 +196,12 @@ async def main() -> None:
186196
stats_before = _rq.get('stats', {})
187197
Actor.log.info(stats_before)
188198

189-
# Add same requests in 10 parallel workers
199+
# Add batches of some new and some already present requests in workers
190200
async def add_requests_worker() -> None:
191-
await rq.add_requests(requests)
201+
await rq.add_requests(requests[: next(batch_size)])
192202

193-
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)]
203+
# Start all workers
204+
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(worker_count)]
194205
await asyncio.gather(*add_requests_workers)
195206

196207
await asyncio.sleep(10) # Wait to be sure that metadata are updated

0 commit comments

Comments
 (0)