Skip to content

Commit 85a1bac

Browse files
authored
chore: Use shared mode ApifyStorageClient for test_concurrent_processing_simulation (#662)
Use `ApifyStorageClient(request_queue_access='shared')` for `test_concurrent_processing_simulation`. This is the expected usage for concurrent access. And fix the error in the test ### Issues Closes: #529
1 parent ff55336 commit 85a1bac

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

tests/integration/actor/test_actor_request_queue.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import asyncio
44
from typing import TYPE_CHECKING
55

6-
import pytest
7-
86
from .._utils import generate_unique_resource_name
97
from apify import Actor
108
from apify._models import ActorRun
@@ -387,16 +385,23 @@ async def main() -> None:
387385
assert run_result.status == 'SUCCEEDED'
388386

389387

390-
@pytest.mark.skip(
391-
reason='The Apify RQ client is not resilient to concurrent processing, making this test flaky. See issue #529.'
392-
)
393388
async def test_concurrent_processing_simulation(
394389
make_actor: MakeActorFunction,
395390
run_actor: RunActorFunction,
396391
) -> None:
397392
"""Test simulation of concurrent request processing."""
398393

399394
async def main() -> None:
395+
from crawlee import service_locator
396+
397+
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
398+
399+
# Use `request_queue_access='shared' for concurrent access`
400+
service_locator.set_storage_client(
401+
SmartApifyStorageClient(
402+
cloud_storage_client=ApifyStorageClient(request_queue_access='shared'),
403+
)
404+
)
400405
async with Actor:
401406
rq = await Actor.open_request_queue()
402407
Actor.log.info('Request queue opened')
@@ -412,18 +417,21 @@ async def main() -> None:
412417
# Simulate concurrent workers
413418
async def worker() -> int:
414419
processed = 0
420+
request_counter = 0
415421

416422
while request := await rq.fetch_next_request():
417423
# Simulate some work
418424
await asyncio.sleep(0.01)
419425

420426
# Randomly reclaim some requests (simulate failures)
421-
if processed % 7 == 0 and processed > 0: # Reclaim every 7th request
427+
if request_counter % 5 == 0 and request_counter > 0: # Reclaim every 5th request
422428
await rq.reclaim_request(request)
423429
else:
424430
await rq.mark_request_as_handled(request)
425431
processed += 1
426432

433+
request_counter += 1
434+
427435
return processed
428436

429437
# Run multiple workers concurrently

0 commit comments

Comments
 (0)