Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ keywords = [
dependencies = [
"apify-client>=2.0.0,<3.0.0",
"apify-shared>=2.0.0,<3.0.0",
"crawlee==1.0.0rc1",
"crawlee @ git+https://github.com/apify/crawlee-python.git@storage-clients-and-configurations",
"cachetools>=5.5.0",
"cryptography>=42.0.0",
# TODO: ensure compatibility with the latest version of lazy-object-proxy
Expand Down
94 changes: 58 additions & 36 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from crawlee import service_locator
from crawlee.errors import ServiceConflictError
from crawlee.events import (
Event,
EventAbortingData,
Expand All @@ -25,7 +25,7 @@
)

from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
from apify._configuration import Configuration
from apify._configuration import Configuration, service_locator
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
from apify._models import ActorRun
Expand Down Expand Up @@ -119,28 +119,29 @@ def __init__(
self._exit_process = self._get_default_exit_process() if exit_process is None else exit_process
self._is_exiting = False

self._configuration = configuration or Configuration.get_global_configuration()
# Actor state when this method is being executed is unpredictable.
# Actor can be initialized by lazy object proxy or by user directly, or by both.
# Until `init` method is run, this state of uncertainty remains. This is the reason why any setting done here in
# `__init__` method should not be considered final.

self._configuration = configuration
self._configure_logging = configure_logging
self._apify_client = self.new_client()

# Create an instance of the cloud storage client, the local storage client is obtained
# from the service locator.
self._cloud_storage_client = ApifyStorageClient()

# Set the event manager based on whether the Actor is running on the platform or locally.
self._event_manager = (
ApifyEventManager(
configuration=self._configuration,
persist_state_interval=self._configuration.persist_state_interval,
configuration=self.config,
persist_state_interval=self.config.persist_state_interval,
)
if self.is_at_home()
else LocalEventManager(
system_info_interval=self._configuration.system_info_interval,
persist_state_interval=self._configuration.persist_state_interval,
system_info_interval=self.config.system_info_interval,
persist_state_interval=self.config.persist_state_interval,
)
)

self._charging_manager = ChargingManagerImplementation(self._configuration, self._apify_client)
self._charging_manager = ChargingManagerImplementation(self.config, self._apify_client)

self._is_initialized = False

Expand Down Expand Up @@ -203,12 +204,18 @@ def apify_client(self) -> ApifyClientAsync:
@property
def configuration(self) -> Configuration:
"""The Configuration instance the Actor instance uses."""
return self._configuration
return self.config

@property
def config(self) -> Configuration:
"""The Configuration instance the Actor instance uses."""
return self._configuration
if self._configuration:
return self._configuration
self.log.debug(
'Implicit configuration used.'
"It's recommended to explicitly set the configuration to avoid unexpected behavior."
)
return Configuration()

@property
def event_manager(self) -> EventManager:
Expand Down Expand Up @@ -250,12 +257,31 @@ async def init(self) -> None:
This method should be called immediately before performing any additional Actor actions, and it should be
called only once.
"""
if self._configuration:
# Set explicitly the configuration in the service locator
service_locator.set_configuration(self.configuration)
else:
try:
# Set implicit default Apify configuration, unless configuration was already set.
service_locator.set_configuration(self.configuration)
except ServiceConflictError:
self.log.info(
'Configuration in service locator was set explicitly before Actor. '
'Using the existing configuration.'
)
# Use the configuration from the service locator
self._configuration = service_locator.get_configuration()

if self._is_initialized:
raise RuntimeError('The Actor was already initialized!')

if _ActorType._is_any_instance_initialized:
self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care')

# Create an instance of the cloud storage client, the local storage client is obtained
# from the service locator
self._cloud_storage_client = ApifyStorageClient(configuration=self.configuration)

# Make sure that the currently initialized instance is also available through the global `Actor` proxy
cast('Proxy', Actor).__wrapped__ = self

Expand All @@ -267,7 +293,6 @@ async def init(self) -> None:
service_locator.set_storage_client(self._cloud_storage_client)

service_locator.set_event_manager(self.event_manager)
service_locator.set_configuration(self.configuration)

# The logging configuration has to be called after all service_locator set methods.
if self._configure_logging:
Expand Down Expand Up @@ -385,8 +410,8 @@ def new_client(
(increases exponentially from this value).
timeout: The socket timeout of the HTTP requests sent to the Apify API.
"""
token = token or self._configuration.token
api_url = api_url or self._configuration.api_base_url
token = token or self.config.token
api_url = api_url or self.config.api_base_url
return ApifyClientAsync(
token=token,
api_url=api_url,
Expand Down Expand Up @@ -429,7 +454,6 @@ async def open_dataset(
return await Dataset.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=storage_client,
)

Expand Down Expand Up @@ -463,7 +487,6 @@ async def open_key_value_store(
return await KeyValueStore.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=storage_client,
)

Expand Down Expand Up @@ -500,7 +523,6 @@ async def open_request_queue(
return await RequestQueue.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=storage_client,
)

Expand Down Expand Up @@ -549,9 +571,9 @@ async def get_input(self) -> Any:
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
self._raise_if_not_initialized()

input_value = await self.get_value(self._configuration.input_key)
input_secrets_private_key = self._configuration.input_secrets_private_key_file
input_secrets_key_passphrase = self._configuration.input_secrets_private_key_passphrase
input_value = await self.get_value(self.config.input_key)
input_secrets_private_key = self.config.input_secrets_private_key_file
input_secrets_key_passphrase = self.config.input_secrets_private_key_passphrase
if input_secrets_private_key and input_secrets_key_passphrase:
private_key = load_private_key(
input_secrets_private_key,
Expand Down Expand Up @@ -688,7 +710,7 @@ def off(self, event_name: Event, listener: Callable | None = None) -> None:

def is_at_home(self) -> bool:
"""Return `True` when the Actor is running on the Apify platform, and `False` otherwise (e.g. local run)."""
return self._configuration.is_at_home
return self.config.is_at_home

def get_env(self) -> dict:
"""Return a dictionary with information parsed from all the `APIFY_XXX` environment variables.
Expand All @@ -714,7 +736,7 @@ def get_env(self) -> dict:
aliases = [field_name]

for alias in aliases:
config[alias] = getattr(self._configuration, field_name)
config[alias] = getattr(self.config, field_name)

env_vars = {env_var.value.lower(): env_var.name.lower() for env_var in [*ActorEnvVars, *ApifyEnvVars]}
return {option_name: config[env_var] for env_var, option_name in env_vars.items() if env_var in config}
Expand Down Expand Up @@ -1002,13 +1024,13 @@ async def metamorph(
return

if not custom_after_sleep:
custom_after_sleep = self._configuration.metamorph_after_sleep
custom_after_sleep = self.config.metamorph_after_sleep

# If is_at_home() is True, config.actor_run_id is always set
if not self._configuration.actor_run_id:
if not self.config.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

await self._apify_client.run(self._configuration.actor_run_id).metamorph(
await self._apify_client.run(self.config.actor_run_id).metamorph(
target_actor_id=target_actor_id,
run_input=run_input,
target_actor_build=target_actor_build,
Expand Down Expand Up @@ -1045,7 +1067,7 @@ async def reboot(
_ActorType._is_rebooting = True

if not custom_after_sleep:
custom_after_sleep = self._configuration.metamorph_after_sleep
custom_after_sleep = self.config.metamorph_after_sleep

# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
Expand All @@ -1065,10 +1087,10 @@ async def reboot(
*[listener(EventMigratingData()) for listener in migrating_listeners],
)

if not self._configuration.actor_run_id:
if not self.config.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

await self._apify_client.run(self._configuration.actor_run_id).reboot()
await self._apify_client.run(self.config.actor_run_id).reboot()

if custom_after_sleep:
await asyncio.sleep(custom_after_sleep.total_seconds())
Expand Down Expand Up @@ -1107,11 +1129,11 @@ async def add_webhook(
return

# If is_at_home() is True, config.actor_run_id is always set
if not self._configuration.actor_run_id:
if not self.config.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

await self._apify_client.webhooks().create(
actor_run_id=self._configuration.actor_run_id,
actor_run_id=self.config.actor_run_id,
event_types=webhook.event_types,
request_url=webhook.request_url,
payload_template=webhook.payload_template,
Expand Down Expand Up @@ -1143,10 +1165,10 @@ async def set_status_message(
return None

# If is_at_home() is True, config.actor_run_id is always set
if not self._configuration.actor_run_id:
if not self.config.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

api_result = await self._apify_client.run(self._configuration.actor_run_id).update(
api_result = await self._apify_client.run(self.config.actor_run_id).update(
status_message=status_message, is_status_message_terminal=is_terminal
)

Expand Down Expand Up @@ -1201,7 +1223,7 @@ async def create_proxy_configuration(
country_code=country_code,
proxy_urls=proxy_urls,
new_url_function=new_url_function,
_actor_config=self._configuration,
_actor_config=self.config,
_apify_client=self._apify_client,
)

Expand Down
35 changes: 32 additions & 3 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import traceback
from datetime import datetime, timedelta
from decimal import Decimal
from logging import getLogger
Expand All @@ -8,6 +9,8 @@
from pydantic import AliasChoices, BeforeValidator, Field, model_validator
from typing_extensions import Self, deprecated

import crawlee
from crawlee._service_locator import ServiceLocator
from crawlee._utils.models import timedelta_ms
from crawlee._utils.urls import validate_http_url
from crawlee.configuration import Configuration as CrawleeConfiguration
Expand Down Expand Up @@ -417,6 +420,7 @@ def disable_browser_sandbox_on_platform(self) -> Self:
"""
if self.is_at_home and not self.disable_browser_sandbox:
self.disable_browser_sandbox = True
logger.info('Stack trace:\n%s', ''.join(traceback.format_stack()))
logger.warning('Actor is running on the Apify platform, `disable_browser_sandbox` was changed to True.')
return self

Expand All @@ -427,8 +431,33 @@ def get_global_configuration(cls) -> Configuration:
Mostly for the backwards compatibility. It is recommended to use the `service_locator.get_configuration()`
instead.
"""
return cls()
return service_locator.get_configuration()


# Monkey-patch the base class so that it works with the extended configuration
CrawleeConfiguration.get_global_configuration = Configuration.get_global_configuration # type: ignore[method-assign]
class ApifyServiceLocator(ServiceLocator):
"""Same as ServiceLocator from Crawlee, but it always returns Apify Configuration."""

def get_configuration(self) -> Configuration:
# ApifyServiceLocator can store any children of Crawlee Configuration, but in Apify context it is desired to
# return Apify Configuration.
if isinstance(self._configuration, Configuration):
# If Apify configuration was already stored in service locator, return it.
return self._configuration

stored_configuration = super().get_configuration()
apify_configuration = Configuration()

# Ensure the returned configuration is of type Apify Configuration.
# Most likely crawlee configuration was already set. Create Apify configuration from it.
# Due to known Pydantic issue https://github.com/pydantic/pydantic/issues/9516, creating new instance of
# Configuration from existing one in situation where environment can have some fields set by alias is very
# unpredictable. Use the stable workaround.
for name in stored_configuration.model_fields:
setattr(apify_configuration, name, getattr(stored_configuration, name))

return apify_configuration


# Ensure that ApifyServiceLocator is used to make sure Apify Configuration is used.
service_locator = ApifyServiceLocator()
crawlee.service_locator = service_locator
1 change: 0 additions & 1 deletion src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async def open_kvs() -> KeyValueStore:
storage_client = ApifyStorageClient()
return await KeyValueStore.open(
name=kvs_name,
configuration=configuration,
storage_client=storage_client,
)
return await KeyValueStore.open(name=kvs_name)
Expand Down
1 change: 0 additions & 1 deletion src/apify/scrapy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ async def open_rq() -> RequestQueue:
if configuration.is_at_home:
storage_client = ApifyStorageClient()
return await RequestQueue.open(
configuration=configuration,
storage_client=storage_client,
)
return await RequestQueue.open()
Expand Down
Loading
Loading