-
Notifications
You must be signed in to change notification settings - Fork 15
feat: Add specialized ApifyRequestQueue clients #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
8e2f5d4
1d869a4
08df986
6131fff
553663a
eadab26
249f8f5
4ada123
10e0652
359c46e
ce090c0
b511011
fb32861
7ec13ef
10bc7e2
7712410
e63f546
ffa70ff
e5bdff2
57cd8ae
79c02f5
d29a534
506b770
1cc80bb
860b0ec
e6c6fc5
da2f5df
8861c5e
1e8a834
de941d4
b4a588d
c5968bc
49c357e
6edb093
b17ebef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ | |
from apify.log import _configure_logging, logger | ||
from apify.storage_clients import ApifyStorageClient | ||
from apify.storage_clients._file_system import ApifyFileSystemStorageClient | ||
from apify.storages import Dataset, KeyValueStore, RequestQueue | ||
from apify.storage_clients._hybrid_apify._storage_client import ApifyHybridStorageClient | ||
|
||
if TYPE_CHECKING: | ||
import logging | ||
|
@@ -48,9 +48,9 @@ | |
from typing_extensions import Self | ||
|
||
from crawlee.proxy_configuration import _NewUrlFunction | ||
from crawlee.storage_clients import StorageClient | ||
|
||
from apify._models import Webhook | ||
from apify.storages import Dataset, KeyValueStore, RequestQueue | ||
|
||
|
||
MainReturnType = TypeVar('MainReturnType') | ||
|
@@ -131,7 +131,6 @@ def __init__( | |
self._configuration = configuration | ||
self._configure_logging = configure_logging | ||
self._apify_client: ApifyClientAsync | None = None | ||
self._local_storage_client: StorageClient | None = None | ||
|
||
self._is_initialized = False | ||
|
||
|
@@ -234,45 +233,49 @@ def log(self) -> logging.Logger: | |
"""The logging.Logger instance the Actor uses.""" | ||
return logger | ||
|
||
def _get_local_storage_client(self) -> StorageClient: | ||
"""Get the local storage client the Actor instance uses.""" | ||
if self._local_storage_client: | ||
return self._local_storage_client | ||
def _raise_if_not_initialized(self) -> None: | ||
if not self._is_initialized: | ||
raise RuntimeError('The Actor was not initialized!') | ||
|
||
@cached_property | ||
def _storage_client(self) -> ApifyHybridStorageClient: | ||
"""Storage client used by the actor. | ||
|
||
Depending on the initialization of the service locator the client can be created in different ways. | ||
""" | ||
try: | ||
# Set implicit default local storage client, unless local storage client was already set. | ||
implicit_storage_client = ApifyFileSystemStorageClient() | ||
# Notning was set by the user. | ||
implicit_storage_client = ApifyHybridStorageClient( | ||
local_storage_client=ApifyFileSystemStorageClient(), cloud_storage_client=ApifyStorageClient() | ||
) | ||
service_locator.set_storage_client(implicit_storage_client) | ||
self._local_storage_client = implicit_storage_client | ||
except ServiceConflictError: | ||
self.log.debug( | ||
'Storage client in service locator was set explicitly before Actor.init was called.' | ||
'Using the existing storage client as implicit storage client for the Actor.' | ||
) | ||
else: | ||
return implicit_storage_client | ||
|
||
self._local_storage_client = service_locator.get_storage_client() | ||
if type(self._local_storage_client) is FileSystemStorageClient: | ||
# User set something in the service locator. | ||
storage_client = service_locator.get_storage_client() | ||
if isinstance(storage_client, ApifyHybridStorageClient): | ||
# The client was manually set to the right type in the service locator. This is the explicit way. | ||
return storage_client | ||
|
||
if isinstance(storage_client, ApifyStorageClient): | ||
|
||
# The cloud storage client was manually set in the service locator. | ||
return ApifyHybridStorageClient(cloud_storage_client=storage_client) | ||
|
||
# The local storage client was manually set in the service locator | ||
if type(storage_client) is FileSystemStorageClient: | ||
self.log.warning( | ||
f'Using {FileSystemStorageClient.__module__}.{FileSystemStorageClient.__name__} in Actor context is not' | ||
f' recommended and can lead to problems with reading the input file. Use ' | ||
f'`apify.storage_clients.FileSystemStorageClient` instead.' | ||
) | ||
|
||
return self._local_storage_client | ||
|
||
def _raise_if_not_initialized(self) -> None: | ||
if not self._is_initialized: | ||
raise RuntimeError('The Actor was not initialized!') | ||
|
||
def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> None: | ||
if not force_cloud: | ||
return | ||
|
||
if not self.is_at_home() and self.configuration.token is None: | ||
raise RuntimeError( | ||
'In order to use the Apify cloud storage from your computer, ' | ||
'you need to provide an Apify token using the APIFY_TOKEN environment variable.' | ||
) | ||
return ApifyHybridStorageClient(cloud_storage_client=ApifyStorageClient(), local_storage_client=storage_client) | ||
|
||
async def init(self) -> None: | ||
"""Initialize the Actor instance. | ||
|
@@ -298,22 +301,13 @@ async def init(self) -> None: | |
if _ActorType._is_any_instance_initialized: | ||
self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care') | ||
|
||
# Create an instance of the cloud storage client, the local storage client is obtained | ||
# from the service locator | ||
self._cloud_storage_client = ApifyStorageClient() | ||
|
||
# Make sure that the currently initialized instance is also available through the global `Actor` proxy | ||
cast('Proxy', Actor).__wrapped__ = self | ||
|
||
self._is_exiting = False | ||
self._was_final_persist_state_emitted = False | ||
|
||
# If the Actor is running on the Apify platform, we set the cloud storage client. | ||
if self.is_at_home(): | ||
service_locator.set_storage_client(self._cloud_storage_client) | ||
self._local_storage_client = self._cloud_storage_client | ||
else: | ||
self._get_local_storage_client() | ||
self.log.debug(f'Storage client set to {self._storage_client}') | ||
|
||
service_locator.set_event_manager(self.event_manager) | ||
|
||
|
@@ -470,17 +464,7 @@ async def open_dataset( | |
An instance of the `Dataset` class for the given ID or name. | ||
""" | ||
self._raise_if_not_initialized() | ||
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) | ||
|
||
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() | ||
|
||
return await Dataset.open( | ||
id=id, | ||
alias=alias, | ||
name=name, | ||
configuration=self.configuration, | ||
storage_client=storage_client, | ||
) | ||
return await self._storage_client.open_dataset(id=id, name=name, alias=alias, force_cloud=force_cloud) | ||
|
||
async def open_key_value_store( | ||
self, | ||
|
@@ -509,17 +493,7 @@ async def open_key_value_store( | |
An instance of the `KeyValueStore` class for the given ID or name. | ||
""" | ||
self._raise_if_not_initialized() | ||
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) | ||
|
||
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() | ||
|
||
return await KeyValueStore.open( | ||
id=id, | ||
alias=alias, | ||
name=name, | ||
configuration=self.configuration, | ||
storage_client=storage_client, | ||
) | ||
return await self._storage_client.open_key_value_store(id=id, name=name, alias=alias, force_cloud=force_cloud) | ||
|
||
async def open_request_queue( | ||
self, | ||
|
@@ -550,17 +524,7 @@ async def open_request_queue( | |
An instance of the `RequestQueue` class for the given ID or name. | ||
""" | ||
self._raise_if_not_initialized() | ||
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) | ||
|
||
storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() | ||
|
||
return await RequestQueue.open( | ||
id=id, | ||
alias=alias, | ||
name=name, | ||
configuration=self.configuration, | ||
storage_client=storage_client, | ||
) | ||
return await self._storage_client.open_request_queue(id=id, name=name, alias=alias, force_cloud=force_cloud) | ||
|
||
@overload | ||
async def push_data(self, data: dict | list[dict]) -> None: ... | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from ._storage_client import ApifyHybridStorageClient |
Uh oh!
There was an error while loading. Please reload this page.