Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8e2f5d4
Draft for tests
Pijukatel Aug 26, 2025
1d869a4
Updated draft
Pijukatel Aug 27, 2025
08df986
Try to use list_head
Pijukatel Aug 27, 2025
6131fff
Locks not needed with in_progress
Pijukatel Aug 27, 2025
553663a
Add alternate client
Pijukatel Aug 27, 2025
eadab26
WIP
Pijukatel Aug 28, 2025
249f8f5
Find the chacing problem.
Pijukatel Aug 28, 2025
4ada123
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Aug 28, 2025
10e0652
Wip changes
Pijukatel Aug 28, 2025
359c46e
Add init cache test, update upgrading guide
Pijukatel Sep 12, 2025
ce090c0
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 18, 2025
b511011
Finalize change and add few more tests
Pijukatel Sep 19, 2025
fb32861
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 19, 2025
7ec13ef
Remove unnecesary methods from the specialized client
Pijukatel Sep 19, 2025
10bc7e2
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 19, 2025
7712410
Rename default_request_queue_apify
Pijukatel Sep 19, 2025
e63f546
Use single and shared literals and rename the RQ client classes
Pijukatel Sep 19, 2025
ffa70ff
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 19, 2025
e5bdff2
Update tests
Pijukatel Sep 22, 2025
57cd8ae
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 22, 2025
79c02f5
Update upgrading guide
Pijukatel Sep 22, 2025
d29a534
Extract storage related complexity from Actor to dedicated storage cl…
Pijukatel Sep 24, 2025
506b770
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 24, 2025
1cc80bb
Update log test
Pijukatel Sep 24, 2025
860b0ec
Rename access to request_queue_access
Pijukatel Sep 24, 2025
e6c6fc5
Update src/apify/_actor.py
Pijukatel Sep 24, 2025
da2f5df
Review comments
Pijukatel Sep 24, 2025
8861c5e
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 24, 2025
1e8a834
Review comments
Pijukatel Sep 24, 2025
de941d4
Update based on Crawlee update
Pijukatel Sep 25, 2025
b4a588d
Merge remote-tracking branch 'origin/master' into no-locking-queue
Pijukatel Sep 25, 2025
c5968bc
Use composition instead of inheritance
Pijukatel Sep 25, 2025
49c357e
Polish some docs
Pijukatel Sep 25, 2025
6edb093
More docs polishing
Pijukatel Sep 25, 2025
b17ebef
Track pending_request_count in local metadata estimation
Pijukatel Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions docs/04_upgrading/upgrading_to_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ Some changes in the related model classes:
## Removed Actor.config property
- `Actor.config` property has been removed. Use `Actor.configuration` instead.

## Default storage ids in configuration changed to None
- `Configuration.default_key_value_store_id` changed from `'default'` to `None`.
- `Configuration.default_dataset_id` changed from `'default'` to `None`.
- `Configuration.default_request_queue_id` changed from `'default'` to `None`.

Previously using the default storage without specifying its `id` in `Configuration` would lead to using specific storage with id `'default'`. Now it will use newly created unnamed storage with `'id'` assigned by the Apify platform, consecutive calls to get the default storage will return the same storage.

## Actor initialization and ServiceLocator changes

`Actor` initialization and global `service_locator` services setup is more strict and predictable.
Expand Down Expand Up @@ -102,20 +109,51 @@ async def main():
)
```

## Removed Actor.config property
- `Actor.config` property has been removed. Use `Actor.configuration` instead.
### Changes in storage clients

## Default storage ids in configuration changed to None
- `Configuration.default_key_value_store_id` changed from `'default'` to `None`.
- `Configuration.default_dataset_id` changed from `'default'` to `None`.
- `Configuration.default_request_queue_id` changed from `'default'` to `None`.
## Explicit control over storage clients used in Actor
- It is now possible to have full control over which storage clients are used by the `Actor`. To make development of Actors convenient, the `Actor` has two storage clients. One that is used when running on Apify platform or when opening storages with `force_cloud=True` and the other client that is used when running outside the Apify platform. The `Actor` has reasonable defaults and for the majority of use-cases there is no need to change it. However, if you need to use a different storage client, you can set it up before entering `Actor` context through `service_locator`.

**Now (v3.0):**

```python
from crawlee import service_locator
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient, MemoryStorageClient
from apify import Actor


async def main():
service_locator.set_storage_client(
SmartApifyStorageClient(
cloud_storage_client=ApifyStorageClient(request_queue_access="single"),
local_storage_client=MemoryStorageClient()
)
)
async with Actor:
rq = await Actor.open_request_queue()
```

Previously using the default storage without specifying its `id` in `Configuration` would lead to using specific storage with id `'default'`. Now it will use newly created unnamed storage with `'id'` assigned by the Apify platform, consecutive calls to get the default storage will return the same storage.

## Storages
## The default use of optimized ApifyRequestQueueClient

<!-- TODO -->
- The default client for working with Apify platform based `RequestQueue` is now optimized and simplified client which does significantly lower amount of API calls, but does not support multiple consumers working on the same queue. It is cheaper and faster and is suitable for the majority of the use cases.
- The full client is still available, but it has to be explicitly requested via `request_queue_access="shared"` argument when using the `ApifyStorageClient`.

## Storage clients
**Now (v3.0):**

```python
from crawlee import service_locator
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
from apify import Actor

<!-- TODO -->

async def main():
# Full client that supports multiple consumers of the Apify Request Queue
service_locator.set_storage_client(
SmartApifyStorageClient(
cloud_storage_client=ApifyStorageClient(request_queue_access="shared"),
)
)
async with Actor:
rq = await Actor.open_request_queue()
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ keywords = [
dependencies = [
"apify-client>=2.0.0,<3.0.0",
"apify-shared>=2.0.0,<3.0.0",
"crawlee==0.6.13b42",
"crawlee==0.6.13b46",
"cachetools>=5.5.0",
"cryptography>=42.0.0",
"impit>=0.6.1",
Expand Down
103 changes: 37 additions & 66 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
EventPersistStateData,
EventSystemInfoData,
)
from crawlee.storage_clients import FileSystemStorageClient

from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
from apify._configuration import Configuration
Expand All @@ -38,6 +37,7 @@
from apify.log import _configure_logging, logger
from apify.storage_clients import ApifyStorageClient
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
from apify.storage_clients._smart_apify._storage_client import SmartApifyStorageClient
from apify.storages import Dataset, KeyValueStore, RequestQueue

if TYPE_CHECKING:
Expand All @@ -48,7 +48,6 @@
from typing_extensions import Self

from crawlee.proxy_configuration import _NewUrlFunction
from crawlee.storage_clients import StorageClient

from apify._models import Webhook

Expand Down Expand Up @@ -131,7 +130,6 @@ def __init__(
self._configuration = configuration
self._configure_logging = configure_logging
self._apify_client: ApifyClientAsync | None = None
self._local_storage_client: StorageClient | None = None

self._is_initialized = False

Expand Down Expand Up @@ -234,45 +232,42 @@ def log(self) -> logging.Logger:
"""The logging.Logger instance the Actor uses."""
return logger

def _get_local_storage_client(self) -> StorageClient:
"""Get the local storage client the Actor instance uses."""
if self._local_storage_client:
return self._local_storage_client
def _raise_if_not_initialized(self) -> None:
if not self._is_initialized:
raise RuntimeError('The Actor was not initialized!')

@cached_property
def _storage_client(self) -> SmartApifyStorageClient:
"""Storage client used by the actor.

Depending on the initialization of the service locator the client can be created in different ways.
"""
try:
# Set implicit default local storage client, unless local storage client was already set.
implicit_storage_client = ApifyFileSystemStorageClient()
# Nothing was set by the user.
implicit_storage_client = SmartApifyStorageClient(
local_storage_client=ApifyFileSystemStorageClient(), cloud_storage_client=ApifyStorageClient()
)
service_locator.set_storage_client(implicit_storage_client)
self._local_storage_client = implicit_storage_client
except ServiceConflictError:
self.log.debug(
'Storage client in service locator was set explicitly before Actor.init was called.'
'Using the existing storage client as implicit storage client for the Actor.'
)

self._local_storage_client = service_locator.get_storage_client()
if type(self._local_storage_client) is FileSystemStorageClient:
self.log.warning(
f'Using {FileSystemStorageClient.__module__}.{FileSystemStorageClient.__name__} in Actor context is not'
f' recommended and can lead to problems with reading the input file. Use '
f'`apify.storage_clients.FileSystemStorageClient` instead.'
)

return self._local_storage_client

def _raise_if_not_initialized(self) -> None:
if not self._is_initialized:
raise RuntimeError('The Actor was not initialized!')

def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> None:
if not force_cloud:
return

if not self.is_at_home() and self.configuration.token is None:
raise RuntimeError(
'In order to use the Apify cloud storage from your computer, '
'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
)
else:
return implicit_storage_client

# User set something in the service locator.
explicit_storage_client = service_locator.get_storage_client()
if isinstance(explicit_storage_client, SmartApifyStorageClient):
# The client was manually set to the right type in the service locator. This is the explicit way.
return explicit_storage_client

raise RuntimeError(
'The storage client in the service locator has to be instance of SmartApifyStorageClient. If you want to '
'set the storage client manually you have to call '
'`service_locator.set_storage_client(SmartApifyStorageClient(...))` before entering Actor context or '
'awaiting `Actor.init`.'
)

async def init(self) -> None:
"""Initialize the Actor instance.
Expand All @@ -285,6 +280,7 @@ async def init(self) -> None:
This method should be called immediately before performing any additional Actor actions, and it should be
called only once.
"""
self.log.info('Initializing Actor...')
if self._configuration:
# Set explicitly the configuration in the service locator
service_locator.set_configuration(self.configuration)
Expand All @@ -298,30 +294,20 @@ async def init(self) -> None:
if _ActorType._is_any_instance_initialized:
self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care')

# Create an instance of the cloud storage client, the local storage client is obtained
# from the service locator
self._cloud_storage_client = ApifyStorageClient()

# Make sure that the currently initialized instance is also available through the global `Actor` proxy
cast('Proxy', Actor).__wrapped__ = self

self._is_exiting = False
self._was_final_persist_state_emitted = False

# If the Actor is running on the Apify platform, we set the cloud storage client.
if self.is_at_home():
service_locator.set_storage_client(self._cloud_storage_client)
self._local_storage_client = self._cloud_storage_client
else:
self._get_local_storage_client()
self.log.debug(f'Storage client set to {self._storage_client}')

service_locator.set_event_manager(self.event_manager)

# The logging configuration has to be called after all service_locator set methods.
if self._configure_logging:
_configure_logging()

self.log.info('Initializing Actor...')
self.log.info('System info', extra=get_system_info())

await self.event_manager.__aenter__()
Expand Down Expand Up @@ -470,16 +456,11 @@ async def open_dataset(
An instance of the `Dataset` class for the given ID or name.
"""
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)

storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()

return await Dataset.open(
id=id,
alias=alias,
name=name,
configuration=self.configuration,
storage_client=storage_client,
alias=alias,
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
)

async def open_key_value_store(
Expand Down Expand Up @@ -509,16 +490,11 @@ async def open_key_value_store(
An instance of the `KeyValueStore` class for the given ID or name.
"""
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)

storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()

return await KeyValueStore.open(
id=id,
alias=alias,
name=name,
configuration=self.configuration,
storage_client=storage_client,
alias=alias,
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
)

async def open_request_queue(
Expand Down Expand Up @@ -550,16 +526,11 @@ async def open_request_queue(
An instance of the `RequestQueue` class for the given ID or name.
"""
self._raise_if_not_initialized()
self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud)

storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client()

return await RequestQueue.open(
id=id,
alias=alias,
name=name,
configuration=self.configuration,
storage_client=storage_client,
alias=alias,
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
)

@overload
Expand Down
2 changes: 2 additions & 0 deletions src/apify/storage_clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

from ._apify import ApifyStorageClient
from ._file_system import ApifyFileSystemStorageClient as FileSystemStorageClient
from ._smart_apify import SmartApifyStorageClient

__all__ = [
'ApifyStorageClient',
'FileSystemStorageClient',
'MemoryStorageClient',
'SmartApifyStorageClient',
]
Loading
Loading