diff --git a/docs/02_concepts/09_running_webserver.mdx b/docs/02_concepts/09_running_webserver.mdx index 7d13a504..c35bf598 100644 --- a/docs/02_concepts/09_running_webserver.mdx +++ b/docs/02_concepts/09_running_webserver.mdx @@ -13,9 +13,9 @@ The URL is available in the following places: - In Apify Console, on the Actor run details page as the **Container URL** field. - In the API as the `container_url` property of the [Run object](https://docs.apify.com/api/v2#/reference/actors/run-object/get-run). -- In the Actor as the `Actor.config.container_url` property. +- In the Actor as the `Actor.configuration.container_url` property. -The web server running inside the container must listen at the port defined by the `Actor.config.container_port` property. When running Actors locally, the port defaults to `4321`, so the web server will be accessible at `http://localhost:4321`. +The web server running inside the container must listen at the port defined by the `Actor.configuration.container_port` property. When running Actors locally, the port defaults to `4321`, so the web server will be accessible at `http://localhost:4321`. ## Example diff --git a/docs/02_concepts/code/07_webhook_preventing.py b/docs/02_concepts/code/07_webhook_preventing.py index 988c531c..de5f189a 100644 --- a/docs/02_concepts/code/07_webhook_preventing.py +++ b/docs/02_concepts/code/07_webhook_preventing.py @@ -7,7 +7,7 @@ async def main() -> None: webhook = Webhook( event_types=['ACTOR.RUN.FAILED'], request_url='https://example.com/run-failed', - idempotency_key=Actor.config.actor_run_id, + idempotency_key=Actor.configuration.actor_run_id, ) # Add the webhook to the Actor. diff --git a/docs/02_concepts/code/09_webserver.py b/docs/02_concepts/code/09_webserver.py index 48a5c10d..e8b54200 100644 --- a/docs/02_concepts/code/09_webserver.py +++ b/docs/02_concepts/code/09_webserver.py @@ -22,9 +22,9 @@ def run_server() -> None: # and save a reference to the server. global http_server with ThreadingHTTPServer( - ('', Actor.config.web_server_port), RequestHandler + ('', Actor.configuration.web_server_port), RequestHandler ) as server: - Actor.log.info(f'Server running on {Actor.config.web_server_port}') + Actor.log.info(f'Server running on {Actor.configuration.web_server_port}') http_server = server server.serve_forever() diff --git a/docs/02_concepts/code/conditional_actor_charge.py b/docs/02_concepts/code/conditional_actor_charge.py index f4695cc4..12b03d96 100644 --- a/docs/02_concepts/code/conditional_actor_charge.py +++ b/docs/02_concepts/code/conditional_actor_charge.py @@ -13,6 +13,6 @@ async def main() -> None: if Actor.get_charging_manager().get_pricing_info().is_pay_per_event: # highlight-end await Actor.push_data({'hello': 'world'}, 'dataset-item') - elif charged_items < (Actor.config.max_paid_dataset_items or 0): + elif charged_items < (Actor.configuration.max_paid_dataset_items or 0): await Actor.push_data({'hello': 'world'}) charged_items += 1 diff --git a/docs/03_guides/code/03_playwright.py b/docs/03_guides/code/03_playwright.py index 14868ad8..0ecc7d45 100644 --- a/docs/03_guides/code/03_playwright.py +++ b/docs/03_guides/code/03_playwright.py @@ -40,7 +40,7 @@ async def main() -> None: async with async_playwright() as playwright: # Configure the browser to launch in headless mode as per Actor configuration. browser = await playwright.chromium.launch( - headless=Actor.config.headless, + headless=Actor.configuration.headless, args=['--disable-gpu'], ) context = await browser.new_context() diff --git a/docs/03_guides/code/04_selenium.py b/docs/03_guides/code/04_selenium.py index 8cffe606..0f919c71 100644 --- a/docs/03_guides/code/04_selenium.py +++ b/docs/03_guides/code/04_selenium.py @@ -41,7 +41,7 @@ async def main() -> None: Actor.log.info('Launching Chrome WebDriver...') chrome_options = ChromeOptions() - if Actor.config.headless: + if Actor.configuration.headless: chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') diff --git a/docs/04_upgrading/upgrading_to_v3.md b/docs/04_upgrading/upgrading_to_v3.md index d9f179e5..df571d34 100644 --- a/docs/04_upgrading/upgrading_to_v3.md +++ b/docs/04_upgrading/upgrading_to_v3.md @@ -9,6 +9,42 @@ This page summarizes the breaking changes between Apify Python SDK v2.x and v3.0 Support for Python 3.9 has been dropped. The Apify Python SDK v3.x now requires Python 3.10 or later. Make sure your environment is running a compatible version before upgrading. +## Actor initialization and ServiceLocator changes + +`Actor` initialization and global `service_locator` services setup is more strict and predictable. +- Services in `Actor` can't be changed after calling `Actor.init`, entering the `async with Actor` context manager or after requesting them from the `Actor`. +- Services in `Actor` can be different from services in Crawler. + + +**Now (v3.0):** + +```python +from crawlee.crawlers import BasicCrawler +from crawlee.storage_clients import MemoryStorageClient +from crawlee.configuration import Configuration +from crawlee.events import LocalEventManager +from apify import Actor + +async def main(): + + async with Actor(): + # This crawler will use same services as Actor and global service_locator + crawler_1 = BasicCrawler() + + # This crawler will use custom services + custom_configuration = Configuration() + custom_event_manager = LocalEventManager.from_config(custom_configuration) + custom_storage_client = MemoryStorageClient() + crawler_2 = BasicCrawler( + configuration=custom_configuration, + event_manager=custom_event_manager, + storage_client=custom_storage_client, + ) +``` + +## Removed Actor.config property +- `Actor.config` property has been removed. Use `Actor.configuration` instead. + ## Storages diff --git a/pyproject.toml b/pyproject.toml index 4b8b1368..50e348d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ keywords = [ dependencies = [ "apify-client>=2.0.0,<3.0.0", "apify-shared>=2.0.0,<3.0.0", - "crawlee==0.6.13b37", + "crawlee==0.6.13b42", "cachetools>=5.5.0", "cryptography>=42.0.0", "impit>=0.6.1", diff --git a/src/apify/_actor.py b/src/apify/_actor.py index eafd1889..92a45dae 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -5,6 +5,7 @@ import sys from contextlib import suppress from datetime import datetime, timedelta, timezone +from functools import cached_property from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast, overload from lazy_object_proxy import Proxy @@ -14,6 +15,7 @@ from apify_client import ApifyClientAsync from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars from crawlee import service_locator +from crawlee.errors import ServiceConflictError from crawlee.events import ( Event, EventAbortingData, @@ -23,6 +25,7 @@ EventPersistStateData, EventSystemInfoData, ) +from crawlee.storage_clients import FileSystemStorageClient from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation from apify._configuration import Configuration @@ -34,6 +37,7 @@ from apify.events import ApifyEventManager, EventManager, LocalEventManager from apify.log import _configure_logging, logger from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._file_system import ApifyFileSystemStorageClient from apify.storages import Dataset, KeyValueStore, RequestQueue if TYPE_CHECKING: @@ -119,28 +123,15 @@ def __init__( self._exit_process = self._get_default_exit_process() if exit_process is None else exit_process self._is_exiting = False - self._configuration = configuration or Configuration.get_global_configuration() - self._configure_logging = configure_logging - self._apify_client = self.new_client() + # Actor state when this method is being executed is unpredictable. + # Actor can be initialized by lazy object proxy or by user directly, or by both. + # Until `init` method is run, this state of uncertainty remains. This is the reason why any setting done here in + # `__init__` method should not be considered final. - # Create an instance of the cloud storage client, the local storage client is obtained - # from the service locator. - self._cloud_storage_client = ApifyStorageClient() - - # Set the event manager based on whether the Actor is running on the platform or locally. - self._event_manager = ( - ApifyEventManager( - configuration=self._configuration, - persist_state_interval=self._configuration.persist_state_interval, - ) - if self.is_at_home() - else LocalEventManager( - system_info_interval=self._configuration.system_info_interval, - persist_state_interval=self._configuration.persist_state_interval, - ) - ) - - self._charging_manager = ChargingManagerImplementation(self._configuration, self._apify_client) + self._configuration = configuration + self._configure_logging = configure_logging + self._apify_client: ApifyClientAsync | None = None + self._local_storage_client: StorageClient | None = None self._is_initialized = False @@ -198,32 +189,76 @@ def __call__( @property def apify_client(self) -> ApifyClientAsync: """The ApifyClientAsync instance the Actor instance uses.""" + if not self._apify_client: + self._apify_client = self.new_client() return self._apify_client - @property + @cached_property def configuration(self) -> Configuration: """The Configuration instance the Actor instance uses.""" - return self._configuration + if self._configuration: + return self._configuration + + try: + # Set implicit default Apify configuration, unless configuration was already set. + implicit_configuration = Configuration() + service_locator.set_configuration(implicit_configuration) + self._configuration = implicit_configuration + except ServiceConflictError: + self.log.debug( + 'Configuration in service locator was set explicitly before Actor.init was called.' + 'Using the existing configuration as implicit configuration for the Actor.' + ) - @property - def config(self) -> Configuration: - """The Configuration instance the Actor instance uses.""" + # Use the configuration from the service locator + self._configuration = Configuration.get_global_configuration() return self._configuration - @property + @cached_property def event_manager(self) -> EventManager: """The EventManager instance the Actor instance uses.""" - return self._event_manager + return ( + ApifyEventManager( + configuration=self.configuration, + persist_state_interval=self.configuration.persist_state_interval, + ) + if self.is_at_home() + else LocalEventManager( + system_info_interval=self.configuration.system_info_interval, + persist_state_interval=self.configuration.persist_state_interval, + ) + ) @property def log(self) -> logging.Logger: """The logging.Logger instance the Actor uses.""" return logger - @property - def _local_storage_client(self) -> StorageClient: - """The local storage client the Actor instance uses.""" - return service_locator.get_storage_client() + def _get_local_storage_client(self) -> StorageClient: + """Get the local storage client the Actor instance uses.""" + if self._local_storage_client: + return self._local_storage_client + + try: + # Set implicit default local storage client, unless local storage client was already set. + implicit_storage_client = ApifyFileSystemStorageClient() + service_locator.set_storage_client(implicit_storage_client) + self._local_storage_client = implicit_storage_client + except ServiceConflictError: + self.log.debug( + 'Storage client in service locator was set explicitly before Actor.init was called.' + 'Using the existing storage client as implicit storage client for the Actor.' + ) + + self._local_storage_client = service_locator.get_storage_client() + if type(self._local_storage_client) is FileSystemStorageClient: + self.log.warning( + f'Using {FileSystemStorageClient.__module__}.{FileSystemStorageClient.__name__} in Actor context is not' + f' recommended and can lead to problems with reading the input file. Use ' + f'`apify.storage_clients.FileSystemStorageClient` instead.' + ) + + return self._local_storage_client def _raise_if_not_initialized(self) -> None: if not self._is_initialized: @@ -233,7 +268,7 @@ def _raise_if_cloud_requested_but_not_configured(self, *, force_cloud: bool) -> if not force_cloud: return - if not self.is_at_home() and self.config.token is None: + if not self.is_at_home() and self.configuration.token is None: raise RuntimeError( 'In order to use the Apify cloud storage from your computer, ' 'you need to provide an Apify token using the APIFY_TOKEN environment variable.' @@ -250,12 +285,23 @@ async def init(self) -> None: This method should be called immediately before performing any additional Actor actions, and it should be called only once. """ + if self._configuration: + # Set explicitly the configuration in the service locator + service_locator.set_configuration(self.configuration) + else: + # Ensure that the configuration (cached property) is set + _ = self.configuration + if self._is_initialized: raise RuntimeError('The Actor was already initialized!') if _ActorType._is_any_instance_initialized: self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care') + # Create an instance of the cloud storage client, the local storage client is obtained + # from the service locator + self._cloud_storage_client = ApifyStorageClient() + # Make sure that the currently initialized instance is also available through the global `Actor` proxy cast('Proxy', Actor).__wrapped__ = self @@ -265,9 +311,11 @@ async def init(self) -> None: # If the Actor is running on the Apify platform, we set the cloud storage client. if self.is_at_home(): service_locator.set_storage_client(self._cloud_storage_client) + self._local_storage_client = self._cloud_storage_client + else: + self._get_local_storage_client() service_locator.set_event_manager(self.event_manager) - service_locator.set_configuration(self.configuration) # The logging configuration has to be called after all service_locator set methods. if self._configure_logging: @@ -279,10 +327,10 @@ async def init(self) -> None: # TODO: Print outdated SDK version warning (we need a new env var for this) # https://github.com/apify/apify-sdk-python/issues/146 - await self._event_manager.__aenter__() + await self.event_manager.__aenter__() self.log.debug('Event manager initialized') - await self._charging_manager.__aenter__() + await self._charging_manager_implementation.__aenter__() self.log.debug('Charging manager initialized') self._is_initialized = True @@ -323,10 +371,10 @@ async def finalize() -> None: await asyncio.sleep(0.1) if event_listeners_timeout: - await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout) + await self.event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout) - await self._event_manager.__aexit__(None, None, None) - await self._charging_manager.__aexit__(None, None, None) + await self.event_manager.__aexit__(None, None, None) + await self._charging_manager_implementation.__aexit__(None, None, None) await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds()) self._is_initialized = False @@ -385,8 +433,8 @@ def new_client( (increases exponentially from this value). timeout: The socket timeout of the HTTP requests sent to the Apify API. """ - token = token or self._configuration.token - api_url = api_url or self._configuration.api_base_url + token = token or self.configuration.token + api_url = api_url or self.configuration.api_base_url return ApifyClientAsync( token=token, api_url=api_url, @@ -427,13 +475,13 @@ async def open_dataset( self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) - storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client + storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() return await Dataset.open( id=id, alias=alias, name=name, - configuration=self._configuration, + configuration=self.configuration, storage_client=storage_client, ) @@ -465,13 +513,14 @@ async def open_key_value_store( """ self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) - storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client + + storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() return await KeyValueStore.open( id=id, alias=alias, name=name, - configuration=self._configuration, + configuration=self.configuration, storage_client=storage_client, ) @@ -506,13 +555,13 @@ async def open_request_queue( self._raise_if_not_initialized() self._raise_if_cloud_requested_but_not_configured(force_cloud=force_cloud) - storage_client = self._cloud_storage_client if force_cloud else self._local_storage_client + storage_client = self._cloud_storage_client if force_cloud else self._get_local_storage_client() return await RequestQueue.open( id=id, alias=alias, name=name, - configuration=self._configuration, + configuration=self.configuration, storage_client=storage_client, ) @@ -536,7 +585,7 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non data = data if isinstance(data, list) else [data] max_charged_count = ( - self._charging_manager.calculate_max_event_charge_count_within_limit(charged_event_name) + self.get_charging_manager().calculate_max_event_charge_count_within_limit(charged_event_name) if charged_event_name is not None else None ) @@ -550,7 +599,7 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non await dataset.push_data(data) if charged_event_name: - return await self._charging_manager.charge( + return await self.get_charging_manager().charge( event_name=charged_event_name, count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data), ) @@ -561,9 +610,9 @@ async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" self._raise_if_not_initialized() - input_value = await self.get_value(self._configuration.input_key) - input_secrets_private_key = self._configuration.input_secrets_private_key_file - input_secrets_key_passphrase = self._configuration.input_secrets_private_key_passphrase + input_value = await self.get_value(self.configuration.input_key) + input_secrets_private_key = self.configuration.input_secrets_private_key_file + input_secrets_key_passphrase = self.configuration.input_secrets_private_key_passphrase if input_secrets_private_key and input_secrets_key_passphrase: private_key = load_private_key( input_secrets_private_key, @@ -607,7 +656,11 @@ async def set_value( def get_charging_manager(self) -> ChargingManager: """Retrieve the charging manager to access granular pricing information.""" self._raise_if_not_initialized() - return self._charging_manager + return self._charging_manager_implementation + + @cached_property + def _charging_manager_implementation(self) -> ChargingManagerImplementation: + return ChargingManagerImplementation(self.configuration, self.apify_client) async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. @@ -619,7 +672,7 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: count: Number of events to charge for. """ self._raise_if_not_initialized() - return await self._charging_manager.charge(event_name, count) + return await self.get_charging_manager().charge(event_name, count) @overload def on( @@ -670,7 +723,7 @@ def on(self, event_name: Event, listener: EventListener[Any]) -> EventListener[A """ self._raise_if_not_initialized() - self._event_manager.on(event=event_name, listener=listener) + self.event_manager.on(event=event_name, listener=listener) return listener @overload @@ -696,11 +749,11 @@ def off(self, event_name: Event, listener: Callable | None = None) -> None: """ self._raise_if_not_initialized() - self._event_manager.off(event=event_name, listener=listener) + self.event_manager.off(event=event_name, listener=listener) def is_at_home(self) -> bool: """Return `True` when the Actor is running on the Apify platform, and `False` otherwise (e.g. local run).""" - return self._configuration.is_at_home + return self.configuration.is_at_home def get_env(self) -> dict: """Return a dictionary with information parsed from all the `APIFY_XXX` environment variables. @@ -726,7 +779,7 @@ def get_env(self) -> dict: aliases = [field_name] for alias in aliases: - config[alias] = getattr(self._configuration, field_name) + config[alias] = getattr(self.configuration, field_name) env_vars = {env_var.value.lower(): env_var.name.lower() for env_var in [*ActorEnvVars, *ApifyEnvVars]} return {option_name: config[env_var] for env_var, option_name in env_vars.items() if env_var in config} @@ -771,7 +824,7 @@ async def start( """ self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client + client = self.new_client(token=token) if token else self.apify_client if webhooks: serialized_webhooks = [ @@ -802,7 +855,7 @@ async def start( return ActorRun.model_validate(api_result) def _get_remaining_time(self) -> timedelta | None: - """Get time remaining from the actor timeout. Returns `None` if not on an Apify platform.""" + """Get time remaining from the Actor timeout. Returns `None` if not on an Apify platform.""" if self.is_at_home() and self.configuration.timeout_at: return self.configuration.timeout_at - datetime.now(tz=timezone.utc) @@ -838,7 +891,7 @@ async def abort( """ self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client + client = self.new_client(token=token) if token else self.apify_client if status_message: await client.run(run_id).update(status_message=status_message) @@ -891,7 +944,7 @@ async def call( """ self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client + client = self.new_client(token=token) if token else self.apify_client if webhooks: serialized_webhooks = [ @@ -963,7 +1016,7 @@ async def call_task( """ self._raise_if_not_initialized() - client = self.new_client(token=token) if token else self._apify_client + client = self.new_client(token=token) if token else self.apify_client if webhooks: serialized_webhooks = [ @@ -1014,13 +1067,13 @@ async def metamorph( return if not custom_after_sleep: - custom_after_sleep = self._configuration.metamorph_after_sleep + custom_after_sleep = self.configuration.metamorph_after_sleep - # If is_at_home() is True, config.actor_run_id is always set - if not self._configuration.actor_run_id: + # If is_at_home() is True, configuration.actor_run_id is always set + if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') - await self._apify_client.run(self._configuration.actor_run_id).metamorph( + await self.apify_client.run(self.configuration.actor_run_id).metamorph( target_actor_id=target_actor_id, run_input=run_input, target_actor_build=target_actor_build, @@ -1057,7 +1110,7 @@ async def reboot( _ActorType._is_rebooting = True if not custom_after_sleep: - custom_after_sleep = self._configuration.metamorph_after_sleep + custom_after_sleep = self.configuration.metamorph_after_sleep # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. @@ -1066,10 +1119,10 @@ async def reboot( # We can't just emit the events and wait for all listeners to finish, # because this method might be called from an event listener itself, and we would deadlock. persist_state_listeners = flatten( - (self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 + (self.event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 ) migrating_listeners = flatten( - (self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 + (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 ) await asyncio.gather( @@ -1077,10 +1130,10 @@ async def reboot( *[listener(EventMigratingData()) for listener in migrating_listeners], ) - if not self._configuration.actor_run_id: + if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') - await self._apify_client.run(self._configuration.actor_run_id).reboot() + await self.apify_client.run(self.configuration.actor_run_id).reboot() if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) @@ -1119,11 +1172,11 @@ async def add_webhook( return # If is_at_home() is True, config.actor_run_id is always set - if not self._configuration.actor_run_id: + if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') - await self._apify_client.webhooks().create( - actor_run_id=self._configuration.actor_run_id, + await self.apify_client.webhooks().create( + actor_run_id=self.configuration.actor_run_id, event_types=webhook.event_types, request_url=webhook.request_url, payload_template=webhook.payload_template, @@ -1155,10 +1208,10 @@ async def set_status_message( return None # If is_at_home() is True, config.actor_run_id is always set - if not self._configuration.actor_run_id: + if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') - api_result = await self._apify_client.run(self._configuration.actor_run_id).update( + api_result = await self.apify_client.run(self.configuration.actor_run_id).update( status_message=status_message, is_status_message_terminal=is_terminal ) @@ -1168,7 +1221,7 @@ async def create_proxy_configuration( self, *, actor_proxy_input: dict - | None = None, # this is the raw proxy input from the actor run input, it is not spread or snake_cased in here + | None = None, # this is the raw proxy input from the Actor run input, it is not spread or snake_cased in here password: str | None = None, groups: list[str] | None = None, country_code: str | None = None, @@ -1213,7 +1266,7 @@ async def create_proxy_configuration( country_code=country_code, proxy_urls=proxy_urls, new_url_function=new_url_function, - _actor_config=self._configuration, + _actor_config=self.configuration, _apify_client=self._apify_client, ) diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index f7e4f028..aba566b9 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -8,6 +8,7 @@ from pydantic import AliasChoices, BeforeValidator, Field, model_validator from typing_extensions import Self, deprecated +from crawlee import service_locator from crawlee._utils.models import timedelta_ms from crawlee._utils.urls import validate_http_url from crawlee.configuration import Configuration as CrawleeConfiguration @@ -424,11 +425,41 @@ def disable_browser_sandbox_on_platform(self) -> Self: def get_global_configuration(cls) -> Configuration: """Retrieve the global instance of the configuration. - Mostly for the backwards compatibility. It is recommended to use the `service_locator.get_configuration()` - instead. + This method ensures that ApifyConfigration is returned, even if CrawleeConfiguration was set in the + service locator. """ - return cls() + global_configuration = service_locator.get_configuration() + if isinstance(global_configuration, Configuration): + # If Apify configuration was already stored in service locator, return it. + return global_configuration -# Monkey-patch the base class so that it works with the extended configuration -CrawleeConfiguration.get_global_configuration = Configuration.get_global_configuration # type: ignore[method-assign] + logger.warning( + 'Non Apify Configration is set in the `service_locator` in the SDK context. ' + 'It is recommended to set `apify.Configuration` explicitly as early as possible by using ' + 'service_locator.set_configuration' + ) + + return cls.from_configuration(global_configuration) + + @classmethod + def from_configuration(cls, configuration: CrawleeConfiguration) -> Configuration: + """Create Apify Configuration from existing Crawlee Configuration. + + Args: + configuration: The existing Crawlee Configuration. + + Returns: + The created Apify Configuration. + """ + apify_configuration = cls() + + # Ensure the returned configuration is of type Apify Configuration. + # Most likely crawlee configuration was already set. Create Apify configuration from it. + # Due to known Pydantic issue https://github.com/pydantic/pydantic/issues/9516, creating new instance of + # Configuration from existing one in situation where environment can have some fields set by alias is very + # unpredictable. Use the stable workaround. + for name in configuration.model_fields: + setattr(apify_configuration, name, getattr(configuration, name)) + + return apify_configuration diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index bb13299a..e5ec91d0 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -11,8 +11,9 @@ from crawlee._utils.file import json_dumps from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata +from crawlee.storages import Dataset -from ._utils import resolve_alias_to_id, store_alias_mapping +from ._utils import AliasResolver if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -126,19 +127,19 @@ async def open( # Normalize 'default' alias to None alias = None if alias == 'default' else alias - # Handle alias resolution if alias: - # Try to resolve alias to existing storage ID - resolved_id = await resolve_alias_to_id(alias, 'dataset', configuration) - if resolved_id: - id = resolved_id - else: - # Create a new storage and store the alias mapping - new_storage_metadata = DatasetMetadata.model_validate( - await apify_datasets_client.get_or_create(), - ) - id = new_storage_metadata.id - await store_alias_mapping(alias, 'dataset', id, configuration) + # Check if there is pre-existing alias mapping in the default KVS. + async with AliasResolver(storage_type=Dataset, alias=alias, configuration=configuration) as _alias: + id = await _alias.resolve_id() + + # There was no pre-existing alias in the mapping. + # Create a new unnamed storage and store the mapping. + if id is None: + new_storage_metadata = DatasetMetadata.model_validate( + await apify_datasets_client.get_or_create(), + ) + id = new_storage_metadata.id + await _alias.store_mapping(storage_id=id) # If name is provided, get or create the storage by name. elif name: diff --git a/src/apify/storage_clients/_apify/_key_value_store_client.py b/src/apify/storage_clients/_apify/_key_value_store_client.py index 0237a338..9011d834 100644 --- a/src/apify/storage_clients/_apify/_key_value_store_client.py +++ b/src/apify/storage_clients/_apify/_key_value_store_client.py @@ -10,9 +10,10 @@ from apify_client import ApifyClientAsync from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata +from crawlee.storages import KeyValueStore from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage -from ._utils import resolve_alias_to_id, store_alias_mapping +from ._utils import AliasResolver from apify._crypto import create_hmac_signature if TYPE_CHECKING: @@ -117,19 +118,20 @@ async def open( # Normalize 'default' alias to None alias = None if alias == 'default' else alias - # Handle alias resolution if alias: - # Try to resolve alias to existing storage ID - resolved_id = await resolve_alias_to_id(alias, 'kvs', configuration) - if resolved_id: - id = resolved_id - else: - # Create a new storage and store the alias mapping - new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate( - await apify_kvss_client.get_or_create(), - ) - id = new_storage_metadata.id - await store_alias_mapping(alias, 'kvs', id, configuration) + # Check if there is pre-existing alias mapping in the default KVS. + async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias: + id = await _alias.resolve_id() + + # There was no pre-existing alias in the mapping. + # Create a new unnamed storage and store the mapping. + if id is None: + # Create a new storage and store the alias mapping + new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate( + await apify_kvss_client.get_or_create(), + ) + id = new_storage_metadata.id + await _alias.store_mapping(storage_id=id) # If name is provided, get or create the storage by name. elif name: diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 315de9d1..3ffefcd0 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -16,9 +16,10 @@ from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata +from crawlee.storages import RequestQueue from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead -from ._utils import resolve_alias_to_id, store_alias_mapping +from ._utils import AliasResolver from apify import Request if TYPE_CHECKING: @@ -195,19 +196,19 @@ async def open( # Normalize 'default' alias to None alias = None if alias == 'default' else alias - # Handle alias resolution if alias: - # Try to resolve alias to existing storage ID - resolved_id = await resolve_alias_to_id(alias, 'rq', configuration) - if resolved_id: - id = resolved_id - else: - # Create a new storage and store the alias mapping - new_storage_metadata = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ) - id = new_storage_metadata.id - await store_alias_mapping(alias, 'rq', id, configuration) + # Check if there is pre-existing alias mapping in the default KVS. + async with AliasResolver(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias: + id = await _alias.resolve_id() + + # There was no pre-existing alias in the mapping. + # Create a new unnamed storage and store the mapping. + if id is None: + new_storage_metadata = RequestQueueMetadata.model_validate( + await apify_rqs_client.get_or_create(), + ) + id = new_storage_metadata.id + await _alias.store_mapping(storage_id=id) # If name is provided, get or create the storage by name. elif name: diff --git a/src/apify/storage_clients/_apify/_storage_client.py b/src/apify/storage_clients/_apify/_storage_client.py index dbd958f4..2ff3fad9 100644 --- a/src/apify/storage_clients/_apify/_storage_client.py +++ b/src/apify/storage_clients/_apify/_storage_client.py @@ -9,16 +9,35 @@ from ._dataset_client import ApifyDatasetClient from ._key_value_store_client import ApifyKeyValueStoreClient from ._request_queue_client import ApifyRequestQueueClient +from ._utils import hash_api_base_url_and_token +from apify._configuration import Configuration as ApifyConfiguration from apify._utils import docs_group if TYPE_CHECKING: - from crawlee.configuration import Configuration + from collections.abc import Hashable + + from crawlee.configuration import Configuration as CrawleeConfiguration @docs_group('Storage clients') class ApifyStorageClient(StorageClient): """Apify storage client.""" + # This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent. + _lsp_violation_error_message_template = ( + 'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.' + ) + + @override + def get_additional_cache_key(self, configuration: CrawleeConfiguration) -> Hashable: + if isinstance(configuration, ApifyConfiguration): + return hash_api_base_url_and_token(configuration) + + config_class = type(configuration) + raise TypeError( + self._lsp_violation_error_message_template.format(f'{config_class.__module__}.{config_class.__name__}') + ) + @override async def create_dataset_client( self, @@ -26,19 +45,13 @@ async def create_dataset_client( id: str | None = None, name: str | None = None, alias: str | None = None, - configuration: Configuration | None = None, + configuration: CrawleeConfiguration | None = None, ) -> ApifyDatasetClient: - # Import here to avoid circular imports. - from apify import Configuration as ApifyConfiguration # noqa: PLC0415 - configuration = configuration or ApifyConfiguration.get_global_configuration() if isinstance(configuration, ApifyConfiguration): return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration) - raise TypeError( - f'Expected "configuration" to be an instance of "apify.Configuration", ' - f'but got {type(configuration).__name__} instead.' - ) + raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) @override async def create_kvs_client( @@ -47,19 +60,13 @@ async def create_kvs_client( id: str | None = None, name: str | None = None, alias: str | None = None, - configuration: Configuration | None = None, + configuration: CrawleeConfiguration | None = None, ) -> ApifyKeyValueStoreClient: - # Import here to avoid circular imports. - from apify import Configuration as ApifyConfiguration # noqa: PLC0415 - configuration = configuration or ApifyConfiguration.get_global_configuration() if isinstance(configuration, ApifyConfiguration): return await ApifyKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration) - raise TypeError( - f'Expected "configuration" to be an instance of "apify.Configuration", ' - f'but got {type(configuration).__name__} instead.' - ) + raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) @override async def create_rq_client( @@ -68,16 +75,10 @@ async def create_rq_client( id: str | None = None, name: str | None = None, alias: str | None = None, - configuration: Configuration | None = None, + configuration: CrawleeConfiguration | None = None, ) -> ApifyRequestQueueClient: - # Import here to avoid circular imports. - from apify import Configuration as ApifyConfiguration # noqa: PLC0415 - configuration = configuration or ApifyConfiguration.get_global_configuration() if isinstance(configuration, ApifyConfiguration): return await ApifyRequestQueueClient.open(id=id, name=name, alias=alias, configuration=configuration) - raise TypeError( - f'Expected "configuration" to be an instance of "apify.Configuration", ' - f'but got {type(configuration).__name__} instead.' - ) + raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__)) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index 25bd4816..6d05bff3 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -1,117 +1,167 @@ from __future__ import annotations +import logging +from asyncio import Lock from logging import getLogger -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, ClassVar from apify_client import ApifyClientAsync +from crawlee._utils.crypto import compute_short_hash + +from apify._configuration import Configuration if TYPE_CHECKING: + from types import TracebackType + from apify_client.clients import KeyValueStoreClientAsync + from crawlee.storages import Dataset, KeyValueStore, RequestQueue - from apify import Configuration logger = getLogger(__name__) -_ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' - - -async def resolve_alias_to_id( - alias: str, - storage_type: Literal['dataset', 'kvs', 'rq'], - configuration: Configuration, -) -> str | None: - """Resolve a storage alias to its corresponding storage ID. - - Args: - alias: The alias to resolve. - storage_type: Type of storage ('dataset', 'key_value_store', or 'request_queue'). - configuration: The configuration object containing API credentials. - - Returns: - The storage ID if found, None if the alias doesn't exist. - """ - default_kvs_client = await _get_default_kvs_client(configuration) - - # Create the dictionary key for this alias. - alias_key = f'alias-{storage_type}-{alias}' - - try: - record = await default_kvs_client.get_record(_ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict) and 'value' in record: - record = record['value'] - - # Extract the actual data from the KVS record - if isinstance(record, dict) and alias_key in record: - storage_id = record[alias_key] - return str(storage_id) - except Exception as exc: - # If there's any error accessing the record, treat it as not found. - logger.warning(f'Error accessing alias mapping for {alias}: {exc}') +class AliasResolver: + """Class for handling aliases. - return None - - -async def store_alias_mapping( - alias: str, - storage_type: Literal['dataset', 'kvs', 'rq'], - storage_id: str, - configuration: Configuration, -) -> None: - """Store a mapping from alias to storage ID in the default key-value store. - - Args: - alias: The alias to store. - storage_type: Type of storage ('dataset', 'key_value_store', or 'request_queue'). - storage_id: The storage ID to map the alias to. - configuration: The configuration object containing API credentials. + The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using + default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid + unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on + Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot. """ - default_kvs_client = await _get_default_kvs_client(configuration) - - # Create the dictionary key for this alias. - alias_key = f'alias-{storage_type}-{alias}' - - try: - record = await default_kvs_client.get_record(_ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict) and 'value' in record: - record = record['value'] - # Update or create the record with the new alias mapping - if isinstance(record, dict): - record[alias_key] = storage_id - else: - record = {alias_key: storage_id} - - # Store the mapping back in the KVS. - await default_kvs_client.set_record(_ALIAS_MAPPING_KEY, record) - except Exception as exc: - logger.warning(f'Error accessing alias mapping for {alias}: {exc}') - - -async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync: + _alias_map: ClassVar[dict[str, str]] = {} + """Map containing pre-existing alias storages and their ids. Global for all instances.""" + _alias_init_lock: Lock | None = None + """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" + + _ALIAS_STORAGE_KEY_SEPARATOR = ',' + _ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING' + + def __init__( + self, storage_type: type[Dataset | KeyValueStore | RequestQueue], alias: str, configuration: Configuration + ) -> None: + self._storage_type = storage_type + self._alias = alias + self._additional_cache_key = hash_api_base_url_and_token(configuration) + + async def __aenter__(self) -> AliasResolver: + """Context manager to prevent race condition in alias creation.""" + lock = await self._get_alias_init_lock() + await lock.acquire() + return self + + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: TracebackType | None + ) -> None: + lock = await self._get_alias_init_lock() + lock.release() + + @classmethod + async def _get_alias_init_lock(cls) -> Lock: + """Get lock for controlling the creation of the alias storages. + + The lock is shared for all instances of the AliasResolver class. + It is created in async method to ensure that some event loop is already running. + """ + if cls._alias_init_lock is None: + cls._alias_init_lock = Lock() + return cls._alias_init_lock + + @classmethod + async def _get_alias_map(cls) -> dict[str, str]: + """Get the aliases and storage ids mapping from the default kvs. + + Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. + + Returns: + Map of aliases and storage ids. + """ + if not cls._alias_map: + default_kvs_client = await _get_default_kvs_client() + + record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict): + if 'value' in record and isinstance(record['value'], dict): + cls._alias_map = record['value'] + else: + cls._alias_map = record + else: + cls._alias_map = dict[str, str]() + + return cls._alias_map + + async def resolve_id(self) -> str | None: + """Get id of the aliased storage. + + Either locate the id in the in-memory mapping or create the new storage. + + Returns: + Storage id if it exists, None otherwise. + """ + return (await self._get_alias_map()).get(self._storage_key, None) + + async def store_mapping(self, storage_id: str) -> None: + """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" + # Update in-memory mapping + (await self._get_alias_map())[self._storage_key] = storage_id + if not Configuration.get_global_configuration().is_at_home: + logging.getLogger(__name__).warning( + 'AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' + ) + return + + default_kvs_client = await _get_default_kvs_client() + await default_kvs_client.get() + + try: + record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) + + # get_record can return {key: ..., value: ..., content_type: ...} + if isinstance(record, dict) and 'value' in record: + record = record['value'] + + # Update or create the record with the new alias mapping + if isinstance(record, dict): + record[self._storage_key] = storage_id + else: + record = {self._storage_key: storage_id} + + # Store the mapping back in the KVS. + await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) + except Exception as exc: + logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') + + @property + def _storage_key(self) -> str: + """Get a unique storage key used for storing the alias in the mapping.""" + return self._ALIAS_STORAGE_KEY_SEPARATOR.join( + [ + self._storage_type.__name__, + self._alias, + self._additional_cache_key, + ] + ) + + +async def _get_default_kvs_client() -> KeyValueStoreClientAsync: """Get a client for the default key-value store.""" - token = configuration.token - if not token: - raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).') + configuration = Configuration.get_global_configuration() - api_url = configuration.api_base_url - if not api_url: - raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).') - - # Create Apify client with the provided token and API URL apify_client_async = ApifyClientAsync( - token=token, - api_url=api_url, + token=configuration.token, + api_url=configuration.api_base_url, max_retries=8, min_delay_between_retries_millis=500, timeout_secs=360, ) - # Get the default key-value store ID from configuration - default_kvs_id = configuration.default_key_value_store_id + return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) + - return apify_client_async.key_value_store(key_value_store_id=default_kvs_id) +def hash_api_base_url_and_token(configuration: Configuration) -> str: + """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" + if configuration.api_public_base_url is None or configuration.token is None: + raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") + return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode()) diff --git a/tests/integration/actor_source_base/requirements.txt b/tests/integration/actor_source_base/requirements.txt index 9f6b32a2..e9bc6dad 100644 --- a/tests/integration/actor_source_base/requirements.txt +++ b/tests/integration/actor_source_base/requirements.txt @@ -1,4 +1,4 @@ # The test fixture will put the Apify SDK wheel path on the next line APIFY_SDK_WHEEL_PLACEHOLDER uvicorn[standard] -crawlee[parsel] == 0.6.13b37 +crawlee[parsel]==0.6.13b42 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9c230acd..52bee53a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,6 +20,7 @@ from ._utils import generate_unique_resource_name from apify import Actor from apify._models import ActorRun +from apify.storage_clients._apify._utils import AliasResolver if TYPE_CHECKING: from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping @@ -58,18 +59,14 @@ def _prepare_test_env() -> None: service_locator._configuration = None service_locator._event_manager = None service_locator._storage_client = None - service_locator._storage_instance_manager = None + service_locator.storage_instance_manager.clear_cache() - # Reset the retrieval flags. - service_locator._configuration_was_retrieved = False - service_locator._event_manager_was_retrieved = False - service_locator._storage_client_was_retrieved = False + # Reset the AliasResolver class state. + AliasResolver._alias_map = {} + AliasResolver._alias_init_lock = None # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) - assert service_locator._configuration_was_retrieved is False - assert service_locator._storage_client_was_retrieved is False - assert service_locator._event_manager_was_retrieved is False return _prepare_test_env diff --git a/tests/integration/test_actor_create_proxy_configuration.py b/tests/integration/test_actor_create_proxy_configuration.py index 5861d43e..9ed60704 100644 --- a/tests/integration/test_actor_create_proxy_configuration.py +++ b/tests/integration/test_actor_create_proxy_configuration.py @@ -40,7 +40,10 @@ async def test_create_proxy_configuration_with_groups_and_country( async def main() -> None: await Actor.init() - proxy_url_suffix = f'{Actor.config.proxy_password}@{Actor.config.proxy_hostname}:{Actor.config.proxy_port}' + proxy_url_suffix = ( + f'{Actor.configuration.proxy_password}@{Actor.configuration.proxy_hostname}:' + f'{Actor.configuration.proxy_port}' + ) proxy_configuration = await Actor.create_proxy_configuration(actor_proxy_input={'useApifyProxy': True}) assert proxy_configuration is not None diff --git a/tests/integration/test_actor_key_value_store.py b/tests/integration/test_actor_key_value_store.py index ede8f885..19d63b0f 100644 --- a/tests/integration/test_actor_key_value_store.py +++ b/tests/integration/test_actor_key_value_store.py @@ -206,8 +206,8 @@ async def main() -> None: from apify.storage_clients._apify._models import ApifyKeyValueStoreMetadata async with Actor: - public_api_url = Actor.config.api_public_base_url - default_kvs_id = Actor.config.default_key_value_store_id + public_api_url = Actor.configuration.api_public_base_url + default_kvs_id = Actor.configuration.default_key_value_store_id record_key = 'public-record-key' kvs = await Actor.open_key_value_store() diff --git a/tests/integration/test_actor_lifecycle.py b/tests/integration/test_actor_lifecycle.py index 7a975c99..d0b7eb1e 100644 --- a/tests/integration/test_actor_lifecycle.py +++ b/tests/integration/test_actor_lifecycle.py @@ -117,3 +117,44 @@ async def main() -> None: assert run_result.exit_code == 91 assert run_result.status == 'FAILED' + + +async def test_actor_with_crawler_reboot(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None: + """Test that crawler in actor works as expected after reboot. + + Handle two requests. Reboot in between the two requests.""" + + async def main() -> None: + from crawlee._types import BasicCrawlingContext, ConcurrencySettings + from crawlee.crawlers import BasicCrawler + + from apify import Actor + + async with Actor: + crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1)) + requests = ['https://example.com/1', 'https://example.com/2'] + + run = await Actor.apify_client.run(Actor.configuration.actor_run_id or '').get() + assert run + first_run = run.get('stats', {}).get('rebootCount', 0) == 0 + + @crawler.router.default_handler + async def default_handler(context: BasicCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url} ...') + + # Simulate migration through reboot + if context.request.url == requests[1] and first_run: + context.log.info(f'Reclaiming {context.request.url} ...') + rq = await crawler.get_request_manager() + await rq.reclaim_request(context.request) + await Actor.reboot() + + await crawler.run(requests) + + # Each time one request is finished. + assert crawler.statistics.state.requests_finished == 1 + + actor = await make_actor(label='migration', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' diff --git a/tests/integration/test_apify_storages.py b/tests/integration/test_apify_storages.py new file mode 100644 index 00000000..0cf0c9af --- /dev/null +++ b/tests/integration/test_apify_storages.py @@ -0,0 +1,34 @@ +import asyncio + +import pytest + +from crawlee import service_locator +from crawlee.storages import Dataset, KeyValueStore, RequestQueue + +from apify import Configuration +from apify.storage_clients import ApifyStorageClient + + +@pytest.mark.parametrize( + 'storage_type', + [Dataset, KeyValueStore, RequestQueue], +) +async def test_alias_concurrent_creation_local( + storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str +) -> None: + """Test that storages created with same alias are created only once even when created concurrently.""" + service_locator.set_configuration(Configuration(token=apify_token)) + service_locator.set_storage_client(ApifyStorageClient()) + tasks = [asyncio.create_task(storage_type.open(alias='test')) for _ in range(2)] + + storages = await asyncio.gather(*tasks) + unique_storage_ids = {storage.id for storage in storages} + try: + # Only one aliased storage should be created. + assert len(unique_storage_ids) == 1 + + # Clean up + await storages[0].drop() + except AssertionError: + for storage in storages: + await storage.drop() diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index fed7c192..7d04557b 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -6,6 +6,7 @@ import sys from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, cast +from unittest import mock from unittest.mock import AsyncMock, Mock import pytest @@ -179,25 +180,24 @@ async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None } ) - monkeypatch.setattr(Actor._charging_manager, '_client', mock_run_client) - - async with Actor: - Actor.on(Event.PERSIST_STATE, log_persist_state) - await asyncio.sleep(2) - - for socket in ws_server.connections: - await socket.send( - json.dumps( - { - 'name': 'migrating', - 'data': { - 'isMigrating': True, - }, - } + with mock.patch.object(Actor, 'new_client', return_value=mock_run_client): + async with Actor: + Actor.on(Event.PERSIST_STATE, log_persist_state) + await asyncio.sleep(2) + + for socket in ws_server.connections: + await socket.send( + json.dumps( + { + 'name': 'migrating', + 'data': { + 'isMigrating': True, + }, + } + ) ) - ) - await asyncio.sleep(1) + await asyncio.sleep(1) assert len(persist_state_events_data) >= 3 diff --git a/tests/unit/actor/test_actor_non_default_instance.py b/tests/unit/actor/test_actor_non_default_instance.py index 3b3b5ccd..7adfd6c5 100644 --- a/tests/unit/actor/test_actor_non_default_instance.py +++ b/tests/unit/actor/test_actor_non_default_instance.py @@ -9,7 +9,7 @@ async def test_actor_with_non_default_config() -> None: config = Configuration(internal_timeout=timedelta(minutes=111)) async with Actor(config) as actor: - assert actor.config.internal_timeout == timedelta(minutes=111) + assert actor.configuration.internal_timeout == timedelta(minutes=111) async def test_actor_global_works() -> None: diff --git a/tests/unit/actor/test_configuration.py b/tests/unit/actor/test_configuration.py index 95f19f12..7212f5e3 100644 --- a/tests/unit/actor/test_configuration.py +++ b/tests/unit/actor/test_configuration.py @@ -1,6 +1,17 @@ +from pathlib import Path + import pytest -from apify import Configuration +from crawlee import Request, service_locator +from crawlee._types import BasicCrawlingContext +from crawlee.configuration import Configuration as CrawleeConfiguration +from crawlee.crawlers import BasicCrawler +from crawlee.errors import ServiceConflictError +from crawlee.storage_clients import FileSystemStorageClient + +from apify import Actor +from apify import Configuration as ApifyConfiguration +from apify.storage_clients import FileSystemStorageClient as ApifyFileSystemStorageClient @pytest.mark.parametrize( @@ -16,6 +27,231 @@ def test_disable_browser_sandbox( *, is_at_home: bool, disable_browser_sandbox_in: bool, disable_browser_sandbox_out: bool ) -> None: assert ( - Configuration(is_at_home=is_at_home, disable_browser_sandbox=disable_browser_sandbox_in).disable_browser_sandbox + ApifyConfiguration( + is_at_home=is_at_home, disable_browser_sandbox=disable_browser_sandbox_in + ).disable_browser_sandbox == disable_browser_sandbox_out ) + + +async def test_existing_apify_config_respected_by_actor() -> None: + """Set Apify Configuration in service_locator and verify that Actor respects it.""" + max_used_cpu_ratio = 0.123456 # Some unique value to verify configuration + apify_config = ApifyConfiguration(max_used_cpu_ratio=max_used_cpu_ratio) + service_locator.set_configuration(apify_config) + async with Actor: + pass + + returned_config = service_locator.get_configuration() + assert returned_config is apify_config + + +async def test_existing_crawlee_config_respected_by_actor() -> None: + """Set Crawlee Configuration in service_locator and verify that Actor respects it.""" + max_used_cpu_ratio = 0.123456 # Some unique value to verify configuration + crawlee_config = CrawleeConfiguration(max_used_cpu_ratio=max_used_cpu_ratio) + service_locator.set_configuration(crawlee_config) + async with Actor: + pass + + assert Actor.configuration is not crawlee_config + assert isinstance(Actor.configuration, ApifyConfiguration) + # Make sure the Crawlee Configuration was used to create ApifyConfiguration in Actor + assert Actor.configuration.max_used_cpu_ratio == max_used_cpu_ratio + + +async def test_existing_apify_config_throws_error_when_set_in_actor() -> None: + """Test that passing explicit configuration to actor after service locator configuration was already set, + raises exception.""" + service_locator.set_configuration(ApifyConfiguration()) + with pytest.raises(ServiceConflictError): + async with Actor(configuration=ApifyConfiguration()): + pass + + +async def test_setting_config_after_actor_raises_exception() -> None: + """Test that setting configuration in service locator after actor was created raises an exception.""" + async with Actor(): + with pytest.raises(ServiceConflictError): + service_locator.set_configuration(ApifyConfiguration()) + + +async def test_actor_using_input_configuration() -> None: + """Test that setting configuration in service locator after actor was created raises an exception.""" + apify_config = ApifyConfiguration() + async with Actor(configuration=apify_config): + pass + + assert service_locator.get_configuration() is apify_config + + +async def test_crawler_implicit_configuration_through_actor() -> None: + """Test that crawler uses Actor configuration unless explicit configuration was passed to it.""" + apify_config = ApifyConfiguration() + async with Actor(configuration=apify_config): + crawler = BasicCrawler() + + assert crawler._service_locator.get_configuration() is apify_config + assert service_locator.get_configuration() is apify_config + + +async def test_crawler_implicit_configuration() -> None: + """Test that crawler and Actor use implicit service_locator based configuration unless explicit configuration + was passed to them.""" + async with Actor(): + assert Actor.configuration is service_locator.get_configuration() + crawler = BasicCrawler() + + assert Actor.configuration is service_locator.get_configuration() + assert Actor.configuration is crawler._service_locator.get_configuration() + + +async def test_crawler_implicit_local_storage() -> None: + """Test that crawler and Actor use implicit ApifyFileSystemStorageClient.""" + async with Actor(): + crawler = BasicCrawler() + + assert isinstance(service_locator.get_storage_client(), ApifyFileSystemStorageClient) + assert isinstance(crawler._service_locator.get_storage_client(), ApifyFileSystemStorageClient) + + +async def test_crawlers_own_configuration(tmp_path: Path) -> None: + """Test that crawlers can use own configurations without crashing.""" + config_actor = ApifyConfiguration() + dir_1 = tmp_path / 'dir_1' + dir_2 = tmp_path / 'dir_2' + config_crawler_1 = ApifyConfiguration() + config_actor.storage_dir = str(dir_1) + config_crawler_2 = ApifyConfiguration() + config_crawler_2.storage_dir = str(dir_2) + + async with Actor(configuration=config_actor): + + async def request_handler(context: BasicCrawlingContext) -> None: + Actor.log.info(f'Processing: {context.request.url}') + + crawler_1 = BasicCrawler(configuration=config_crawler_1, request_handler=request_handler) + crawler_2 = BasicCrawler(configuration=config_crawler_2, request_handler=request_handler) + await crawler_1.add_requests([Request.from_url(url='http://example.com/1')]) + await crawler_2.add_requests( + [Request.from_url(url='http://example.com/2'), Request.from_url(url='http://example.com/3')] + ) + + await crawler_1.run() + await crawler_2.run() + + assert service_locator.get_configuration() is config_actor + assert crawler_1._service_locator.get_configuration() is config_crawler_1 + assert crawler_2._service_locator.get_configuration() is config_crawler_2 + + assert crawler_1.statistics.state.requests_total == 1 + assert crawler_2.statistics.state.requests_total == 2 + + +async def test_crawler_global_configuration() -> None: + """Test that crawler and Actor use service_locator based configuration unless explicit configuration + was passed to them.""" + config_global = ApifyConfiguration() + service_locator.set_configuration(config_global) + + async with Actor(): + crawler = BasicCrawler() + + assert service_locator.get_configuration() is config_global + assert crawler._service_locator.get_configuration() is config_global + + +async def test_crawler_uses_implicit_apify_config() -> None: + """Test that Actor is using implicit ApifyConfiguration in Actor context.""" + async with Actor: + assert isinstance(Actor.configuration, ApifyConfiguration) + + +async def test_storages_retrieved_is_different_with_different_config(tmp_path: Path) -> None: + """Test that retrieving storage depends on used configuration.""" + dir_1 = tmp_path / 'dir_1' + dir_2 = tmp_path / 'dir_2' + config_actor = ApifyConfiguration() + config_actor.storage_dir = str(dir_1) + config_crawler = ApifyConfiguration() + config_crawler.storage_dir = str(dir_2) + + async with Actor(configuration=config_actor): + actor_kvs = await Actor.open_key_value_store() + actor_dataset = await Actor.open_dataset() + actor_rq = await Actor.open_request_queue() + + crawler = BasicCrawler(configuration=config_crawler) + crawler_kvs = await crawler.get_key_value_store() + crawler_dataset = await crawler.get_dataset() + crawler_rq = await crawler.get_request_manager() + + assert actor_kvs is not crawler_kvs + assert actor_dataset is not crawler_dataset + assert actor_rq is not crawler_rq + + +async def test_storages_retrieved_is_same_with_equivalent_config() -> None: + """Test that retrieving storage depends on used configuration. If two equivalent configuration(even if they are + different instances) are used it returns same storage.""" + config_actor = ApifyConfiguration() + config_crawler = ApifyConfiguration() + + async with Actor(configuration=config_actor): + actor_kvs = await Actor.open_key_value_store() + actor_dataset = await Actor.open_dataset() + actor_rq = await Actor.open_request_queue() + + crawler = BasicCrawler(configuration=config_crawler) + crawler_kvs = await crawler.get_key_value_store() + crawler_dataset = await crawler.get_dataset() + crawler_rq = await crawler.get_request_manager() + + assert actor_kvs is crawler_kvs + assert actor_dataset is crawler_dataset + assert actor_rq is crawler_rq + + +async def test_storages_retrieved_is_same_with_same_config() -> None: + """Test that retrieving storage is same if same configuration is used.""" + async with Actor(): + actor_kvs = await Actor.open_key_value_store() + actor_dataset = await Actor.open_dataset() + actor_rq = await Actor.open_request_queue() + + crawler = BasicCrawler() + crawler_kvs = await crawler.get_key_value_store() + crawler_dataset = await crawler.get_dataset() + crawler_rq = await crawler.get_request_manager() + + assert actor_kvs is crawler_kvs + assert actor_dataset is crawler_dataset + assert actor_rq is crawler_rq + + +def test_apify_configuration_is_always_used(caplog: pytest.LogCaptureFixture) -> None: + """Set Crawlee Configuration in Actor and verify that Apify Configuration is used with warning.""" + max_used_cpu_ratio = 0.123456 # Some unique value to verify configuration + + service_locator.set_configuration(CrawleeConfiguration(max_used_cpu_ratio=max_used_cpu_ratio)) + + assert Actor.configuration.max_used_cpu_ratio == max_used_cpu_ratio + assert isinstance(Actor.configuration, ApifyConfiguration) + assert ( + 'Non Apify Configration is set in the `service_locator` in the SDK context. ' + 'It is recommended to set `apify.Configuration` explicitly as early as possible by using ' + 'service_locator.set_configuration' + ) in caplog.messages + + +async def test_file_system_storage_client_warning(caplog: pytest.LogCaptureFixture) -> None: + service_locator.set_storage_client(FileSystemStorageClient()) + caplog.set_level('WARNING') + async with Actor(): + ... + + assert ( + 'Using crawlee.storage_clients._file_system._storage_client.FileSystemStorageClient in Actor context is not ' + 'recommended and can lead to problems with reading the input file. Use ' + '`apify.storage_clients.FileSystemStorageClient` instead.' + ) in caplog.messages diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 7bff929b..bd041b50 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -17,6 +17,7 @@ import apify._actor import apify.log +from apify.storage_clients._apify._utils import AliasResolver if TYPE_CHECKING: from collections.abc import Callable, Iterator @@ -70,18 +71,14 @@ def _prepare_test_env() -> None: service_locator._configuration = None service_locator._event_manager = None service_locator._storage_client = None - service_locator._storage_instance_manager = None + service_locator.storage_instance_manager.clear_cache() - # Reset the retrieval flags. - service_locator._configuration_was_retrieved = False - service_locator._event_manager_was_retrieved = False - service_locator._storage_client_was_retrieved = False + # Reset the AliasResolver class state. + AliasResolver._alias_map = {} + AliasResolver._alias_init_lock = None # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) - assert service_locator._configuration_was_retrieved is False - assert service_locator._storage_client_was_retrieved is False - assert service_locator._event_manager_was_retrieved is False return _prepare_test_env diff --git a/tests/unit/events/test_apify_event_manager.py b/tests/unit/events/test_apify_event_manager.py index b48cae88..5d0d3059 100644 --- a/tests/unit/events/test_apify_event_manager.py +++ b/tests/unit/events/test_apify_event_manager.py @@ -24,9 +24,8 @@ async def test_lifecycle_local(caplog: pytest.LogCaptureFixture) -> None: caplog.set_level(logging.DEBUG, logger='apify') - config = Configuration.get_global_configuration() - async with ApifyEventManager(config): + async with ApifyEventManager(Configuration()): pass assert len(caplog.records) == 1 diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index 71bb8c53..7b938416 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -4,11 +4,10 @@ import json from typing import TYPE_CHECKING -from crawlee import service_locator from crawlee._consts import METADATA_FILENAME from apify import Actor, Configuration -from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient +from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient if TYPE_CHECKING: from pathlib import Path @@ -76,8 +75,5 @@ async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None: path_to_input.mkdir(parents=True) (path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input)) - # Remove this line after https://github.com/apify/apify-sdk-python/pull/576 - service_locator.set_storage_client(ApifyFileSystemStorageClient()) - async with Actor(): assert pre_existing_input == await Actor.get_input() diff --git a/tests/unit/test_apify_storages.py b/tests/unit/test_apify_storages.py new file mode 100644 index 00000000..5b392269 --- /dev/null +++ b/tests/unit/test_apify_storages.py @@ -0,0 +1,63 @@ +from datetime import datetime, timezone +from unittest import mock +from unittest.mock import AsyncMock + +import pytest + +from crawlee.storage_clients.models import StorageMetadata +from crawlee.storages._base import Storage + +from apify import Configuration +from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._apify import ApifyDatasetClient, ApifyKeyValueStoreClient, ApifyRequestQueueClient +from apify.storages import Dataset, KeyValueStore, RequestQueue + + +@pytest.mark.parametrize( + ('storage', '_storage_client'), + [ + (Dataset, ApifyDatasetClient), + (KeyValueStore, ApifyKeyValueStoreClient), + (RequestQueue, ApifyRequestQueueClient), + ], +) +async def test_get_additional_cache_key( + storage: Storage, _storage_client: ApifyDatasetClient | ApifyKeyValueStoreClient | ApifyRequestQueueClient +) -> None: + """Test that Storages based on `ApifyStorageClient` include `token` and `api_public_base_url` in + additional cache key.""" + + def create_metadata(id: str) -> StorageMetadata: + now = datetime.now(tz=timezone.utc) + return StorageMetadata(id=id, name=None, accessed_at=now, created_at=now, modified_at=now) + + storage_ids = iter(['1', '2', '3', '1', '3']) + + apify_storage_client = ApifyStorageClient() + + config_1 = Configuration(token='a') + config_2 = Configuration(token='b') + config_3 = Configuration(token='a', api_public_base_url='https://super_custom_api.com') + + config_4 = Configuration(token='a') + config_5 = Configuration(token='a', api_public_base_url='https://super_custom_api.com') + + mocked_client = AsyncMock(spec=type[_storage_client]) + mocked_client.get_metadata = AsyncMock(side_effect=lambda: create_metadata(next(storage_ids))) + mocked_open = AsyncMock(spec=_storage_client.open, return_value=mocked_client) + + with mock.patch.object(_storage_client, 'open', mocked_open): + storage_1 = await storage.open(storage_client=apify_storage_client, configuration=config_1) + storage_2 = await storage.open(storage_client=apify_storage_client, configuration=config_2) + storage_3 = await storage.open(storage_client=apify_storage_client, configuration=config_3) + storage_4 = await storage.open(storage_client=apify_storage_client, configuration=config_4) + storage_5 = await storage.open(storage_client=apify_storage_client, configuration=config_5) + + # Different configuration results in different storage clients. + assert storage_1 is not storage_2 + assert storage_1 is not storage_3 + assert storage_2 is not storage_3 + + # Equivalent configuration results in same storage clients. + assert storage_1 is storage_4 + assert storage_3 is storage_5 diff --git a/tests/unit/test_proxy_configuration.py b/tests/unit/test_proxy_configuration.py index d69032b8..72020ad0 100644 --- a/tests/unit/test_proxy_configuration.py +++ b/tests/unit/test_proxy_configuration.py @@ -523,6 +523,7 @@ async def test_initialize_when_not_connected(monkeypatch: pytest.MonkeyPatch, ht async def test_initialize_when_status_page_unavailable( monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture, httpserver: HTTPServer ) -> None: + caplog.set_level('WARNING') dummy_proxy_status_url = str(httpserver.url_for('/')).removesuffix('/') monkeypatch.setenv(ApifyEnvVars.PROXY_STATUS_URL.value, dummy_proxy_status_url) @@ -532,9 +533,10 @@ async def test_initialize_when_status_page_unavailable( await proxy_configuration.initialize() - assert len(caplog.records) == 1 - assert caplog.records[0].levelname == 'WARNING' - assert 'Apify Proxy access check timed out' in caplog.records[0].message + assert ( + 'Apify Proxy access check timed out. Watch out for errors with status code 407. If you see some, it most likely' + ' means you do not have access to either all or some of the proxies you are trying to use.' + ) in caplog.messages async def test_initialize_with_non_apify_proxy( diff --git a/uv.lock b/uv.lock index 93812ef0..e5158ab3 100644 --- a/uv.lock +++ b/uv.lock @@ -76,7 +76,7 @@ requires-dist = [ { name = "apify-client", specifier = ">=2.0.0,<3.0.0" }, { name = "apify-shared", specifier = ">=2.0.0,<3.0.0" }, { name = "cachetools", specifier = ">=5.5.0" }, - { name = "crawlee", specifier = "==0.6.13b37" }, + { name = "crawlee", specifier = "==0.6.13b42" }, { name = "cryptography", specifier = ">=42.0.0" }, { name = "impit", specifier = ">=0.6.1" }, { name = "lazy-object-proxy", specifier = ">=1.11.0" }, @@ -477,7 +477,7 @@ toml = [ [[package]] name = "crawlee" -version = "0.6.13b37" +version = "0.6.13b42" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cachetools" }, @@ -493,9 +493,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "yarl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/aa/64/13521e97bb0dcd606c1013c1f184a943e3a6b36e2e4ccf3adda9eb474efd/crawlee-0.6.13b37.tar.gz", hash = "sha256:77f8ca0e60689c19e41ec7d608ecc2fd65531eefe79ad98cf3cd77f3c6c1e412", size = 24839556, upload-time = "2025-09-12T16:04:27.011Z" } +sdist = { url = "https://files.pythonhosted.org/packages/98/8e/8c5bf3cd84335aeb157f95ecaadc5cb61b9bb0f1ffa28a50f9a2485c38a6/crawlee-0.6.13b42.tar.gz", hash = "sha256:5a8c7bcf6abf77c6b7be3323e3cfa017a9717f0b5e5275bbb7ad8de589c851af", size = 24842767, upload-time = "2025-09-17T15:19:26.706Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/74/da/463751960f64e73b8388ef11b0b6f9fddc2776467440c9c841a295b5dc62/crawlee-0.6.13b37-py3-none-any.whl", hash = "sha256:ed10223e27b9c2791056110eca31f4c03b4ab4535c14307754fc7731bd59f70a", size = 278512, upload-time = "2025-09-12T16:04:23.576Z" }, + { url = "https://files.pythonhosted.org/packages/6e/eb/6a048e5916a78c30ea1b550452a6ede24facf5cafd564bbb1bc5e8ba6fea/crawlee-0.6.13b42-py3-none-any.whl", hash = "sha256:e9c258d49c8d4269d41a1dd9babfc262d241c62c9549d4dd54d1cad0ddbf9569", size = 279764, upload-time = "2025-09-17T15:19:23.817Z" }, ] [package.optional-dependencies]