Skip to content

Commit d29a534

Browse files
committed
Extract storage related complexity from Actor to dedicated storage client
1 parent 79c02f5 commit d29a534

File tree

10 files changed

+389
-88
lines changed

10 files changed

+389
-88
lines changed

docs/04_upgrading/upgrading_to_v3.md

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,28 @@ async def main():
5353

5454
<!-- TODO -->
5555

56+
## Explicit control over storage clients used in Actor
57+
- It is now possible to have full control over which storage clients are used by the `Actor`. To make development of Actors convenient, the `Actor` has two storage clients. One that is used when running on Apify platform or when opening storages with `force_cloud=True` and the other client that is used when running outside the Apify platform. The `Actor` has reasonable defaults and for the majority of use-cases there is no need to change it. However, if you need to use a different storage client, you can set it up before entering `Actor` context through `service_locator`.
58+
59+
**Now (v3.0):**
60+
```python
61+
from crawlee import service_locator
62+
from apify.storage_clients import ApifyStorageClient, ApifyHybridStorageClient, MemoryStorageClient
63+
from apify import Actor
64+
65+
66+
async def main():
67+
service_locator.set_storage_client(
68+
ApifyHybridStorageClient(
69+
cloud_storage_client=ApifyStorageClient(access="single"),
70+
local_storage_client=MemoryStorageClient()
71+
)
72+
)
73+
async with Actor:
74+
rq = await Actor.open_request_queue()
75+
```
76+
77+
5678
## The default use of optimized ApifyRequestQueueClient
5779

5880
- The default client for working with Apify platform based `RequestQueue` is now optimized and simplified client which does significantly lower amount of API calls, but does not support multiple consumers working on the same queue. It is cheaper and faster and is suitable for the majority of the use cases.
@@ -61,12 +83,13 @@ async def main():
6183
**Now (v3.0):**
6284

6385
```python
64-
from apify.storages import RequestQueue
86+
from crawlee import service_locator
6587
from apify.storage_clients import ApifyStorageClient
88+
from apify import Actor
6689

6790
async def main():
6891
# Full client that supports multiple consumers of the Apify Request Queue
69-
rq_shared = await RequestQueue.open(storage_client=ApifyStorageClient(access="shared"))
70-
# Default optimized client that expects only single consumer of the Apify Request Queue
71-
rq_single = await RequestQueue.open(storage_client=ApifyStorageClient())
92+
service_locator.set_storage_client(ApifyStorageClient(access="shared"))
93+
async with Actor:
94+
rq = await Actor.open_request_queue()
7295
```

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ keywords = [
3636
dependencies = [
3737
"apify-client>=2.0.0,<3.0.0",
3838
"apify-shared>=2.0.0,<3.0.0",
39-
"crawlee==0.6.13b42",
39+
"crawlee @ git+https://github.com/apify/crawlee-python.git@include-storag-client-in-additional-cache-key",
4040
"cachetools>=5.5.0",
4141
"cryptography>=42.0.0",
4242
"impit>=0.6.1",

src/apify/_actor.py

Lines changed: 34 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from apify.log import _configure_logging, logger
3939
from apify.storage_clients import ApifyStorageClient
4040
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
41-
from apify.storages import Dataset, KeyValueStore, RequestQueue
41+
from apify.storage_clients._hybrid_apify._storage_client import ApifyHybridStorageClient
4242

4343
if TYPE_CHECKING:
4444
import logging
@@ -48,9 +48,9 @@
4848
from typing_extensions import Self
4949

5050
from crawlee.proxy_configuration import _NewUrlFunction
51-
from crawlee.storage_clients import StorageClient
5251

5352
from apify._models import Webhook
53+
from apify.storages import Dataset, KeyValueStore, RequestQueue
5454

5555

5656
MainReturnType = TypeVar('MainReturnType')
@@ -131,7 +131,6 @@ def __init__(
131131
self._configuration = configuration
132132
self._configure_logging = configure_logging
133133
self._apify_client: ApifyClientAsync | None = None
134-
self._local_storage_client: StorageClient | None = None
135134

136135
self._is_initialized = False
137136

@@ -234,45 +233,49 @@ def log(self) -> logging.Logger:
234233
"""The logging.Logger instance the Actor uses."""
235234
return logger
236235

237-
def _get_local_storage_client(self) -> StorageClient:
238-
"""Get the local storage client the Actor instance uses."""
239-
if self._local_storage_client:
240-
return self._local_storage_client
236+
def _raise_if_not_initialized(self) -> None:
237+
if not self._is_initialized:
238+
raise RuntimeError('The Actor was not initialized!')
239+
240+
@cached_property
241+
def _storage_client(self) -> ApifyHybridStorageClient:
242+
"""Storage client used by the actor.
241243
244+
Depending on the initialization of the service locator the client can be created in different ways.
245+
"""
242246
try:
243-
# Set implicit default local storage client, unless local storage client was already set.
244-
implicit_storage_client = ApifyFileSystemStorageClient()
247+
# Notning was set by the user.
248+
implicit_storage_client = ApifyHybridStorageClient(
249+
local_storage_client=ApifyFileSystemStorageClient(), cloud_storage_client=ApifyStorageClient()
250+
)
245251
service_locator.set_storage_client(implicit_storage_client)
246-
self._local_storage_client = implicit_storage_client
247252
except ServiceConflictError:
248253
self.log.debug(
249254
'Storage client in service locator was set explicitly before Actor.init was called.'
250255
'Using the existing storage client as implicit storage client for the Actor.'
251256
)
257+
else:
258+
return implicit_storage_client
252259

253-
self._local_storage_client = service_locator.get_storage_client()
254-
if type(self._local_storage_client) is FileSystemStorageClient:
260+
# User set something in the service locator.
261+
storage_client = service_locator.get_storage_client()
262+
if isinstance(storage_client, ApifyHybridStorageClient):
263+
# The client was manually set to the right type in the service locator. This is the explicit way.
264+
return storage_client
265+
266+
if isinstance(storage_client, ApifyStorageClient):
267+
# The cloud storage client was manually set in the service locator.
268+
return ApifyHybridStorageClient(cloud_storage_client=storage_client)
269+
270+
# The local storage client was manually set in the service locator
271+
if type(storage_client) is FileSystemStorageClient:
255272
self.log.warning(
256273
f'Using {FileSystemStorageClient.__module__}.{FileSystemStorageClient.__name__} in Actor context is not'
257274
f' recommended and can lead to problems with reading the input file. Use '
258275
f'`apify.storage_clients.FileSystemStorageClient` instead.'
259276
)
260277

261-
return self._local_storage_client
262-
263-
def _raise_if_not_initialized(self) -> None:
264-
if not self._is_initialized:
265-
raise RuntimeError('The Actor was not initialized!')
266-
267-
def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> None:
268-
if not force_cloud:
269-
return
270-
271-
if not self.is_at_home() and self.configuration.token is None:
272-
raise RuntimeError(
273-
'In order to use the Apify cloud storage from your computer, '
274-
'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
275-
)
278+
return ApifyHybridStorageClient(cloud_storage_client=ApifyStorageClient(), local_storage_client=storage_client)
276279

277280
async def init(self) -> None:
278281
"""Initialize the Actor instance.
@@ -298,22 +301,13 @@ async def init(self) -> None:
298301
if _ActorType._is_any_instance_initialized:
299302
self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care')
300303

301-
# Create an instance of the cloud storage client, the local storage client is obtained
302-
# from the service locator
303-
self._cloud_storage_client = ApifyStorageClient()
304-
305304
# Make sure that the currently initialized instance is also available through the global `Actor` proxy
306305
cast('Proxy', Actor).__wrapped__ = self
307306

308307
self._is_exiting = False
309308
self._was_final_persist_state_emitted = False
310309

311-
# If the Actor is running on the Apify platform, we set the cloud storage client.
312-
if self.is_at_home():
313-
service_locator.set_storage_client(self._cloud_storage_client)
314-
self._local_storage_client = self._cloud_storage_client
315-
else:
316-
self._get_local_storage_client()
310+
self.log.debug(f'Storage client set to {self._storage_client}')
317311

318312
service_locator.set_event_manager(self.event_manager)
319313

@@ -470,17 +464,7 @@ async def open_dataset(
470464
An instance of the `Dataset` class for the given ID or name.
471465
"""
472466
self._raise_if_not_initialized()
473-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
474-
475-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
476-
477-
return await Dataset.open(
478-
id=id,
479-
alias=alias,
480-
name=name,
481-
configuration=self.configuration,
482-
storage_client=storage_client,
483-
)
467+
return await self._storage_client.open_dataset(id=id, name=name, alias=alias, force_cloud=force_cloud)
484468

485469
async def open_key_value_store(
486470
self,
@@ -509,17 +493,7 @@ async def open_key_value_store(
509493
An instance of the `KeyValueStore` class for the given ID or name.
510494
"""
511495
self._raise_if_not_initialized()
512-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
513-
514-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
515-
516-
return await KeyValueStore.open(
517-
id=id,
518-
alias=alias,
519-
name=name,
520-
configuration=self.configuration,
521-
storage_client=storage_client,
522-
)
496+
return await self._storage_client.open_key_value_store(id=id, name=name, alias=alias, force_cloud=force_cloud)
523497

524498
async def open_request_queue(
525499
self,
@@ -550,17 +524,7 @@ async def open_request_queue(
550524
An instance of the `RequestQueue` class for the given ID or name.
551525
"""
552526
self._raise_if_not_initialized()
553-
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)
554-
555-
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()
556-
557-
return await RequestQueue.open(
558-
id=id,
559-
alias=alias,
560-
name=name,
561-
configuration=self.configuration,
562-
storage_client=storage_client,
563-
)
527+
return await self._storage_client.open_request_queue(id=id, name=name, alias=alias, force_cloud=force_cloud)
564528

565529
@overload
566530
async def push_data(self, data: dict | list[dict]) -> None: ...

src/apify/storage_clients/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
from ._apify import ApifyStorageClient
44
from ._file_system import ApifyFileSystemStorageClient as FileSystemStorageClient
5+
from ._hybrid_apify import ApifyHybridStorageClient
56

67
__all__ = [
8+
'ApifyHybridStorageClient',
79
'ApifyStorageClient',
810
'FileSystemStorageClient',
911
'MemoryStorageClient',
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from ._storage_client import ApifyHybridStorageClient

0 commit comments

Comments
 (0)