Skip to content

Commit 359c46e

Browse files
committed
Add init cache test, update upgrading guide
1 parent 10e0652 commit 359c46e

File tree

4 files changed

+77
-27
lines changed

4 files changed

+77
-27
lines changed

docs/04_upgrading/upgrading_to_v3.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,32 @@ Support for Python 3.9 has been dropped. The Apify Python SDK v3.x now requires
1616
## Storage clients
1717

1818
<!-- TODO -->
19+
20+
## The default use of optimized ApifyRequestQueueClient
21+
22+
- The default client for working with Apify platform based `RequestQueue` is now optimized and simplified client which has significantly lower amount of API calls, but does not support multiple consumers working on the same queue. It is cheaper and faster and is suitable for the majority of the use cases.
23+
- The full client is still available, but it has to be explicitly requested via `simple_request_queue=False` argument when using the `ApifyStorageClient`.
24+
25+
**Before (v2.x):**
26+
27+
```python
28+
from apify import Actor
29+
30+
async def main():
31+
async with Actor:
32+
...
33+
```
34+
35+
**Now (v3.0):**
36+
37+
```python
38+
from apify import Actor
39+
from crawlee import service_locator
40+
from apify.storage_clients import ApifyStorageClient
41+
42+
async def main():
43+
# Use the full-featured RequestQueue client only if you really need it.
44+
service_locator.set_storage_client(ApifyStorageClient(simple_request_queue=False))
45+
async with Actor:
46+
...
47+
```

src/apify/storage_clients/_apify/_request_queue_client_simple.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,9 @@ async def _init_caches(self) -> None:
388388
389389
This is mainly done to improve local deduplication capability. List request can return up to 10k requests, but
390390
their order is implementation detail and does not respect head order or insertion order.
391+
392+
Deduplication on platform is expensive, it takes 1 API call per request and 1 write operation per request.
393+
Local deduplication is cheaper, it takes 1 API call for whole cache and 1 read operation per request.
391394
"""
392395
response = await self._api_client.list_requests(limit=10_000)
393396
for request_data in response.get('items', []):

src/apify/storage_clients/_apify/_storage_client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
class ApifyStorageClient(StorageClient):
2323
"""Apify storage client."""
2424

25-
def __init__(self, simple_request_queue: bool = True) -> None:
25+
def __init__(self, *, simple_request_queue: bool = True) -> None:
2626
"""Initialize the Apify storage client.
2727
2828
Args:
@@ -86,10 +86,9 @@ async def create_rq_client(
8686

8787
configuration = configuration or ApifyConfiguration.get_global_configuration()
8888
if isinstance(configuration, ApifyConfiguration):
89-
if not self._simple_request_queue:
89+
if self._simple_request_queue:
9090
return await ApifyRequestQueueClientSimple.open(id=id, name=name, configuration=configuration)
91-
else:
92-
return await ApifyRequestQueueClientFull.open(id=id, name=name, configuration=configuration)
91+
return await ApifyRequestQueueClientFull.open(id=id, name=name, configuration=configuration)
9392

9493
raise TypeError(
9594
f'Expected "configuration" to be an instance of "apify.Configuration", '

tests/integration/test_request_queue.py

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
from __future__ import annotations
22

33
import asyncio
4+
from datetime import datetime, timezone
45
from typing import TYPE_CHECKING
56

67
import pytest
7-
from apify_shared.consts import ApifyEnvVars
88

9-
from crawlee import Request
9+
from apify_shared.consts import ApifyEnvVars
10+
from crawlee import Request, service_locator
1011

11-
from apify import Actor
1212
from ._utils import generate_unique_resource_name
13+
from apify import Actor
1314

1415
if TYPE_CHECKING:
1516
from apify_client import ApifyClientAsync
@@ -1072,29 +1073,47 @@ async def test_request_queue_not_had_multiple_clients(
10721073
assert api_response['hadMultipleClients'] is False
10731074

10741075

1075-
async def test_cache_initialization(
1076-
apify_token: str, monkeypatch: pytest.MonkeyPatch, apify_client_async: ApifyClientAsync
1077-
) -> None:
1078-
"""Test that same `RequestQueue` created from Actor does not act as multiple clients."""
1076+
async def test_cache_initialization(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> None:
1077+
"""Test that Apify based `RequestQueue` initializes cache correctly to reduce unnecessary API calls."""
10791078

1080-
"""Create an instance of the Apify request queue on the platform and drop it when the test is finished."""
1079+
# Create an instance of the Apify request queue on the platform and drop it when the test is finished.
10811080
request_queue_name = generate_unique_resource_name('request_queue')
10821081
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
10831082

1083+
requests = [Request.from_url(f'http://example.com/{i}', handled_at=datetime.now(timezone.utc)) for i in range(10)]
1084+
10841085
async with Actor:
10851086
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
1086-
yield rq
1087-
await rq.drop()
1088-
1089-
1090-
await request_queue_force_cloud.fetch_next_request()
1091-
await request_queue_force_cloud.fetch_next_request()
1092-
1093-
# Check that it is correctly in the RequestQueueClient metadata
1094-
assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is False
1095-
1096-
# Check that it is correctly in the API
1097-
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id)
1098-
api_response = await api_client.get()
1099-
assert api_response
1100-
assert api_response['hadMultipleClients'] is False
1087+
try:
1088+
await rq.add_requests(requests)
1089+
1090+
# Check that it is correctly in the API
1091+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1092+
1093+
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
1094+
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
1095+
_rq = await rq_client.get()
1096+
assert _rq
1097+
stats_before = _rq.get('stats', {})
1098+
Actor.log.info(stats_before)
1099+
1100+
# Clear service locator cache to simulate creating RQ instance from scratch
1101+
service_locator.storage_instance_manager.clear_cache()
1102+
1103+
# Try to enqueue same requests again. It should be deduplicated from local cache created on initialization
1104+
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
1105+
await rq.add_requests(requests)
1106+
1107+
await asyncio.sleep(10) # Wait to be sure that metadata are updated
1108+
_rq = await rq_client.get()
1109+
assert _rq
1110+
stats_after = _rq.get('stats', {})
1111+
Actor.log.info(stats_after)
1112+
1113+
# Cache was actually initialized, readCount increased
1114+
assert (stats_after['readCount'] - stats_before['readCount']) == len(requests)
1115+
# Deduplication happened locally, writeCount should be the same
1116+
assert stats_after['writeCount'] == stats_before['writeCount']
1117+
1118+
finally:
1119+
await rq.drop()

0 commit comments

Comments
 (0)