33import asyncio
44from typing import TYPE_CHECKING
55
6- import pytest
7-
86from .._utils import generate_unique_resource_name
97from apify import Actor
108from apify ._models import ActorRun
@@ -387,16 +385,23 @@ async def main() -> None:
387385 assert run_result .status == 'SUCCEEDED'
388386
389387
390- @pytest .mark .skip (
391- reason = 'The Apify RQ client is not resilient to concurrent processing, making this test flaky. See issue #529.'
392- )
393388async def test_concurrent_processing_simulation (
394389 make_actor : MakeActorFunction ,
395390 run_actor : RunActorFunction ,
396391) -> None :
397392 """Test simulation of concurrent request processing."""
398393
399394 async def main () -> None :
395+ from crawlee import service_locator
396+
397+ from apify .storage_clients import ApifyStorageClient , SmartApifyStorageClient
398+
399+ # Use `request_queue_access='shared' for concurrent access`
400+ service_locator .set_storage_client (
401+ SmartApifyStorageClient (
402+ cloud_storage_client = ApifyStorageClient (request_queue_access = 'shared' ),
403+ )
404+ )
400405 async with Actor :
401406 rq = await Actor .open_request_queue ()
402407 Actor .log .info ('Request queue opened' )
@@ -412,18 +417,21 @@ async def main() -> None:
412417 # Simulate concurrent workers
413418 async def worker () -> int :
414419 processed = 0
420+ request_counter = 0
415421
416422 while request := await rq .fetch_next_request ():
417423 # Simulate some work
418424 await asyncio .sleep (0.01 )
419425
420426 # Randomly reclaim some requests (simulate failures)
421- if processed % 7 == 0 and processed > 0 : # Reclaim every 7th request
427+ if request_counter % 5 == 0 and request_counter > 0 : # Reclaim every 5th request
422428 await rq .reclaim_request (request )
423429 else :
424430 await rq .mark_request_as_handled (request )
425431 processed += 1
426432
433+ request_counter += 1
434+
427435 return processed
428436
429437 # Run multiple workers concurrently
0 commit comments