Skip to content

Commit f830ab0

Browse files
Pijukateljanbuchar
andauthored
feat: Add specialized ApifyRequestQueue clients (#573)
### Description - `ApifyRequestQueueClient` can be created in two access modes - `single`, `shared`: - `shared` - current version that supports multiple producers/consumers and locking of requests. More Apify API calls, higher API usage -> more expensive, slower. - `single` - new constrained client for self-consumer and multiple constrained producers. (Detailed constraints in the docs). Fewer Apify API calls, lower API usage -> cheaper, faster. - Most of the `ApifyRequestQueueClient` tests were moved away from actor-based tests, so that they can be parametrized for both variants of the `ApifyRequestQueueClients` and to make local debugging easier. #### Usage: RequestQueue with `shared`: `await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access="shared"))` RequestQueue with default `single`: `await RequestQueue.open(storage_client=ApifyStorageClient())` #### Stats difference: The full client is doing significantly more API calls and regarding the API usage it is doing 50% more RequestQueue writes and also more RequestQueue reads. **Example rq related stats for crawler started with 1000 requests:** `shared`: API calls: 2123 API usage: {'readCount': 1000, 'writeCount': 3000, 'deleteCount': 0, 'headItemReadCount': 0, 'storageBytes': 104035} `single`: API calls: 1059 API usage: {'readCount': 3, 'writeCount': 2000, 'deleteCount': 0, 'headItemReadCount': 14, 'storageBytes': 103826} ### Issues - Part of: #513 --------- Co-authored-by: Jan Buchar <[email protected]>
1 parent 6ce9a98 commit f830ab0

20 files changed

+2262
-1684
lines changed

docs/04_upgrading/upgrading_to_v3.md

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ Some changes in the related model classes:
6969
## Removed Actor.config property
7070
- `Actor.config` property has been removed. Use `Actor.configuration` instead.
7171

72+
## Default storage ids in configuration changed to None
73+
- `Configuration.default_key_value_store_id` changed from `'default'` to `None`.
74+
- `Configuration.default_dataset_id` changed from `'default'` to `None`.
75+
- `Configuration.default_request_queue_id` changed from `'default'` to `None`.
76+
77+
Previously using the default storage without specifying its `id` in `Configuration` would lead to using specific storage with id `'default'`. Now it will use newly created unnamed storage with `'id'` assigned by the Apify platform, consecutive calls to get the default storage will return the same storage.
78+
7279
## Actor initialization and ServiceLocator changes
7380

7481
`Actor` initialization and global `service_locator` services setup is more strict and predictable.
@@ -102,20 +109,51 @@ async def main():
102109
)
103110
```
104111

105-
## Removed Actor.config property
106-
- `Actor.config` property has been removed. Use `Actor.configuration` instead.
112+
### Changes in storage clients
107113

108-
## Default storage ids in configuration changed to None
109-
- `Configuration.default_key_value_store_id` changed from `'default'` to `None`.
110-
- `Configuration.default_dataset_id` changed from `'default'` to `None`.
111-
- `Configuration.default_request_queue_id` changed from `'default'` to `None`.
114+
## Explicit control over storage clients used in Actor
115+
- It is now possible to have full control over which storage clients are used by the `Actor`. To make development of Actors convenient, the `Actor` has two storage clients. One that is used when running on Apify platform or when opening storages with `force_cloud=True` and the other client that is used when running outside the Apify platform. The `Actor` has reasonable defaults and for the majority of use-cases there is no need to change it. However, if you need to use a different storage client, you can set it up before entering `Actor` context through `service_locator`.
116+
117+
**Now (v3.0):**
118+
119+
```python
120+
from crawlee import service_locator
121+
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient, MemoryStorageClient
122+
from apify import Actor
123+
124+
125+
async def main():
126+
service_locator.set_storage_client(
127+
SmartApifyStorageClient(
128+
cloud_storage_client=ApifyStorageClient(request_queue_access="single"),
129+
local_storage_client=MemoryStorageClient()
130+
)
131+
)
132+
async with Actor:
133+
rq = await Actor.open_request_queue()
134+
```
112135

113-
Previously using the default storage without specifying its `id` in `Configuration` would lead to using specific storage with id `'default'`. Now it will use newly created unnamed storage with `'id'` assigned by the Apify platform, consecutive calls to get the default storage will return the same storage.
114136

115-
## Storages
137+
## The default use of optimized ApifyRequestQueueClient
116138

117-
<!-- TODO -->
139+
- The default client for working with Apify platform based `RequestQueue` is now optimized and simplified client which does significantly lower amount of API calls, but does not support multiple consumers working on the same queue. It is cheaper and faster and is suitable for the majority of the use cases.
140+
- The full client is still available, but it has to be explicitly requested via `request_queue_access="shared"` argument when using the `ApifyStorageClient`.
118141

119-
## Storage clients
142+
**Now (v3.0):**
143+
144+
```python
145+
from crawlee import service_locator
146+
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
147+
from apify import Actor
120148

121-
<!-- TODO -->
149+
150+
async def main():
151+
# Full client that supports multiple consumers of the Apify Request Queue
152+
service_locator.set_storage_client(
153+
SmartApifyStorageClient(
154+
cloud_storage_client=ApifyStorageClient(request_queue_access="shared"),
155+
)
156+
)
157+
async with Actor:
158+
rq = await Actor.open_request_queue()
159+
```

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ keywords = [
3636
dependencies = [
3737
"apify-client>=2.0.0,<3.0.0",
3838
"apify-shared>=2.0.0,<3.0.0",
39-
"crawlee==0.6.13b42",
39+
"crawlee==0.6.13b46",
4040
"cachetools>=5.5.0",
4141
"cryptography>=42.0.0",
4242
"impit>=0.6.1",

src/apify/_actor.py

Lines changed: 37 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
EventPersistStateData,
2626
EventSystemInfoData,
2727
)
28-
from crawlee.storage_clients import FileSystemStorageClient
2928

3029
from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
3130
from apify._configuration import Configuration
@@ -38,6 +37,7 @@
3837
from apify.log import _configure_logging, logger
3938
from apify.storage_clients import ApifyStorageClient
4039
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
40+
from apify.storage_clients._smart_apify._storage_client import SmartApifyStorageClient
4141
from apify.storages import Dataset, KeyValueStore, RequestQueue
4242

4343
if TYPE_CHECKING:
@@ -48,7 +48,6 @@
4848
from typing_extensions import Self
4949

5050
from crawlee.proxy_configuration import _NewUrlFunction
51-
from crawlee.storage_clients import StorageClient
5251

5352
from apify._models import Webhook
5453

@@ -131,7 +130,6 @@ def __init__(
131130
self._configuration = configuration
132131
self._configure_logging = configure_logging
133132
self._apify_client: ApifyClientAsync | None = None
134-
self._local_storage_client: StorageClient | None = None
135133

136134
self._is_initialized = False
137135

@@ -234,45 +232,42 @@ def log(self) -> logging.Logger:
234232
"""The logging.Logger instance the Actor uses."""
235233
return logger
236234

237-
def _get_local_storage_client(self) -> StorageClient:
238-
"""Get the local storage client the Actor instance uses."""
239-
if self._local_storage_client:
240-
return self._local_storage_client
235+
def _raise_if_not_initialized(self) -> None:
236+
if not self._is_initialized:
237+
raise RuntimeError('The Actor was not initialized!')
238+
239+
@cached_property
240+
def _storage_client(self) -> SmartApifyStorageClient:
241+
"""Storage client used by the actor.
241242
243+
Depending on the initialization of the service locator the client can be created in different ways.
244+
"""
242245
try:
243-
# Set implicit default local storage client, unless local storage client was already set.
244-
implicit_storage_client = ApifyFileSystemStorageClient()
246+
# Nothing was set by the user.
247+
implicit_storage_client = SmartApifyStorageClient(
248+
local_storage_client=ApifyFileSystemStorageClient(), cloud_storage_client=ApifyStorageClient()
249+
)
245250
service_locator.set_storage_client(implicit_storage_client)
246-
self._local_storage_client = implicit_storage_client
247251
except ServiceConflictError:
248252
self.log.debug(
249253
'Storage client in service locator was set explicitly before Actor.init was called.'
250254
'Using the existing storage client as implicit storage client for the Actor.'
251255
)
252-
253-
self._local_storage_client = service_locator.get_storage_client()
254-
if type(self._local_storage_client) is FileSystemStorageClient:
255-
self.log.warning(
256-
f'Using {FileSystemStorageClient.__module__}.{FileSystemStorageClient.__name__} in Actor context is not'
257-
f' recommended and can lead to problems with reading the input file. Use '
258-
f'`apify.storage_clients.FileSystemStorageClient` instead.'
259-
)
260-
261-
return self._local_storage_client
262-
263-
def _raise_if_not_initialized(self) -> None:
264-
if not self._is_initialized:
265-
raise RuntimeError('The Actor was not initialized!')
266-
267-
def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> None:
268-
if not force_cloud:
269-
return
270-
271-
if not self.is_at_home() and self.configuration.token is None:
272-
raise RuntimeError(
273-
'In order to use the Apify cloud storage from your computer, '
274-
'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
275-
)
256+
else:
257+
return implicit_storage_client
258+
259+
# User set something in the service locator.
260+
explicit_storage_client = service_locator.get_storage_client()
261+
if isinstance(explicit_storage_client, SmartApifyStorageClient):
262+
# The client was manually set to the right type in the service locator. This is the explicit way.
263+
return explicit_storage_client
264+
265+
raise RuntimeError(
266+
'The storage client in the service locator has to be instance of SmartApifyStorageClient. If you want to '
267+
'set the storage client manually you have to call '
268+
'`service_locator.set_storage_client(SmartApifyStorageClient(...))` before entering Actor context or '
269+
'awaiting `Actor.init`.'
270+
)
276271

277272
async def init(self) -> None:
278273
"""Initialize the Actor instance.
@@ -285,6 +280,7 @@ async def init(self) -> None:
285280
This method should be called immediately before performing any additional Actor actions, and it should be
286281
called only once.
287282
"""
283+
self.log.info('Initializing Actor...')
288284
if self._configuration:
289285
# Set explicitly the configuration in the service locator
290286
service_locator.set_configuration(self.configuration)
@@ -298,30 +294,20 @@ async def init(self) -> None:
298294
if _ActorType._is_any_instance_initialized:
299295
self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care')
300296

301-
# Create an instance of the cloud storage client, the local storage client is obtained
302-
# from the service locator
303-
self._cloud_storage_client = ApifyStorageClient()
304-
305297
# Make sure that the currently initialized instance is also available through the global `Actor` proxy
306298
cast('Proxy', Actor).__wrapped__ = self
307299

308300
self._is_exiting = False
309301
self._was_final_persist_state_emitted = False
310302

311-
# If the Actor is running on the Apify platform, we set the cloud storage client.
312-
if self.is_at_home():
313-
service_locator.set_storage_client(self._cloud_storage_client)
314-
self._local_storage_client = self._cloud_storage_client
315-
else:
316-
self._get_local_storage_client()
303+
self.log.debug(f'Storage client set to {self._storage_client}')
317304

318305
service_locator.set_event_manager(self.event_manager)
319306

320307
# The logging configuration has to be called after all service_locator set methods.
321308
if self._configure_logging:
322309
_configure_logging()
323310

324-
self.log.info('Initializing Actor...')
325311
self.log.info('System info', extra=get_system_info())
326312

327313
await self.event_manager.__aenter__()
@@ -470,16 +456,11 @@ async def open_dataset(
470456
An instance of the `Dataset` class for the given ID or name.
471457
"""
472458
self._raise_if_not_initialized()
473-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
474-
475-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
476-
477459
return await Dataset.open(
478460
id=id,
479-
alias=alias,
480461
name=name,
481-
configuration=self.configuration,
482-
storage_client=storage_client,
462+
alias=alias,
463+
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
483464
)
484465

485466
async def open_key_value_store(
@@ -509,16 +490,11 @@ async def open_key_value_store(
509490
An instance of the `KeyValueStore` class for the given ID or name.
510491
"""
511492
self._raise_if_not_initialized()
512-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
513-
514-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
515-
516493
return await KeyValueStore.open(
517494
id=id,
518-
alias=alias,
519495
name=name,
520-
configuration=self.configuration,
521-
storage_client=storage_client,
496+
alias=alias,
497+
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
522498
)
523499

524500
async def open_request_queue(
@@ -550,16 +526,11 @@ async def open_request_queue(
550526
An instance of the `RequestQueue` class for the given ID or name.
551527
"""
552528
self._raise_if_not_initialized()
553-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
554-
555-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
556-
557529
return await RequestQueue.open(
558530
id=id,
559-
alias=alias,
560531
name=name,
561-
configuration=self.configuration,
562-
storage_client=storage_client,
532+
alias=alias,
533+
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
563534
)
564535

565536
@overload

src/apify/storage_clients/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
from ._apify import ApifyStorageClient
44
from ._file_system import ApifyFileSystemStorageClient as FileSystemStorageClient
5+
from ._smart_apify import SmartApifyStorageClient
56

67
__all__ = [
78
'ApifyStorageClient',
89
'FileSystemStorageClient',
910
'MemoryStorageClient',
11+
'SmartApifyStorageClient',
1012
]

0 commit comments

Comments
 (0)