diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 965725e2..3d0b194b 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -32,7 +32,7 @@ from apify._crypto import decrypt_input_secrets, load_private_key from apify._models import ActorRun from apify._proxy_configuration import ProxyConfiguration -from apify._utils import docs_group, docs_name, get_system_info, is_running_in_ipython, maybe_extract_enum_member_value +from apify._utils import docs_group, docs_name, get_system_info, is_running_in_ipython from apify.events import ApifyEventManager, EventManager, LocalEventManager from apify.log import _configure_logging, logger from apify.storage_clients import ApifyStorageClient @@ -48,10 +48,10 @@ from typing_extensions import Self from crawlee.proxy_configuration import _NewUrlFunction + from crawlee.storage_clients import StorageClient from apify._models import Webhook - MainReturnType = TypeVar('MainReturnType') @@ -98,7 +98,10 @@ async def main() -> None: """ _is_rebooting = False + """Whether the Actor is currently rebooting.""" + _is_any_instance_initialized = False + """Whether any Actor instance was initialized.""" def __init__( self, @@ -106,63 +109,142 @@ def __init__( *, configure_logging: bool = True, exit_process: bool | None = None, + exit_code: int = 0, + status_message: str | None = None, + event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, + cleanup_timeout: timedelta = timedelta(seconds=30), ) -> None: - """Create an Actor instance. - - Note that you don't have to do this, all the functionality is accessible using the default instance - (e.g. `Actor.open_dataset()`). + """Initialize a new instance. Args: - configuration: The Actor configuration to be used. If not passed, a new Configuration instance will - be created. - configure_logging: Should the default logging configuration be configured? - exit_process: Whether the Actor should call `sys.exit` when the context manager exits. The default is - True except for the IPython, Pytest and Scrapy environments. + configuration: The Actor configuration to use. If not provided, a default configuration is created. + configure_logging: Whether to set up the default logging configuration. + exit_process: Whether the Actor should call `sys.exit` when the context manager exits. + Defaults to True, except in IPython, Pytest, and Scrapy environments. + exit_code: The exit code the Actor should use when exiting. + status_message: Final status message to display upon Actor termination. + event_listeners_timeout: Maximum time to wait for Actor event listeners to complete before exiting. + cleanup_timeout: Maximum time to wait for cleanup tasks to finish. """ + self._configuration = configuration + self._configure_logging = configure_logging self._exit_process = self._get_default_exit_process() if exit_process is None else exit_process - self._is_exiting = False + self._exit_code = exit_code + self._status_message = status_message + self._event_listeners_timeout = event_listeners_timeout + self._cleanup_timeout = cleanup_timeout # Actor state when this method is being executed is unpredictable. # Actor can be initialized by lazy object proxy or by user directly, or by both. # Until `init` method is run, this state of uncertainty remains. This is the reason why any setting done here in # `__init__` method should not be considered final. - self._configuration = configuration - self._configure_logging = configure_logging self._apify_client: ApifyClientAsync | None = None + self._local_storage_client: StorageClient | None = None + self._is_exiting = False self._is_initialized = False async def __aenter__(self) -> Self: - """Initialize the Actor. + """Enter the Actor context. + + Initializes the Actor when used in an `async with` block. This method: - Automatically initializes the Actor instance when you use it in an `async with ...` statement. + - Sets up local or cloud storage clients depending on whether the Actor runs locally or on the Apify platform. + - Configures the event manager and starts periodic state persistence. + - Initializes the charging manager for handling charging events. + - Configures logging after all core services are registered. - When you exit the `async with` block, the `Actor.exit()` method is called, and if any exception happens while - executing the block code, the `Actor.fail` method is called. + This method must be called exactly once per Actor instance. Re-initializing an Actor or having multiple + active Actor instances is not standard usage and may lead to warnings or unexpected behavior. """ - await self.init() + self.log.info('Initializing Actor...') + self.log.info('System info', extra=get_system_info()) + + if self._is_initialized: + raise RuntimeError('The Actor was already initialized!') + + if _ActorType._is_any_instance_initialized: + self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care.') + + if self._configuration: + # Set explicitly the configuration in the service locator. + service_locator.set_configuration(self.configuration) + else: + # Ensure that the configuration (cached property) is set. + _ = self.configuration + + # Make sure that the currently initialized instance is also available through the global `Actor` proxy. + cast('Proxy', Actor).__wrapped__ = self + + self._is_exiting = False + self._was_final_persist_state_emitted = False + + service_locator.set_event_manager(self.event_manager) + + # Initialize storage client to ensure it's available in service locator. + _ = self._storage_client + + # The logging configuration has to be called after all service_locator set methods. + if self._configure_logging: + _configure_logging() + + await self.event_manager.__aenter__() + await self._charging_manager_implementation.__aenter__() + + self._is_initialized = True + _ActorType._is_any_instance_initialized = True return self async def __aexit__( self, - _exc_type: type[BaseException] | None, + exc_type: type[BaseException] | None, exc_value: BaseException | None, - _exc_traceback: TracebackType | None, + exc_traceback: TracebackType | None, ) -> None: - """Exit the Actor, handling any exceptions properly. + """Exit the Actor context. - When you exit the `async with` block, the `Actor.exit()` method is called, and if any exception happens while - executing the block code, the `Actor.fail` method is called. + If the block exits with an exception, the Actor fails with a non-zero exit code. + Otherwise, it exits cleanly. In both cases the Actor: + + - Cancels periodic `PERSIST_STATE` events. + - Sends a final `PERSIST_STATE` event. + - Waits for all event listeners to finish. + - Stops the event manager and the charging manager. + - Optionally terminates the process with the selected exit code. """ - if not self._is_exiting: - if exc_value: - await self.fail( - exit_code=ActorExitCodes.ERROR_USER_FUNCTION_THREW.value, - exception=exc_value, - ) - else: - await self.exit() + if self._is_exiting: + return + + self._raise_if_not_initialized() + + if exc_value and not is_running_in_ipython(): + # In IPython, we don't run `sys.exit()` during Actor exits, + # so the exception traceback will be printed on its own + self.log.exception('Actor failed with an exception', exc_info=exc_value) + self.exit_value = ActorExitCodes.ERROR_USER_FUNCTION_THREW.value + + self._is_exiting = True + self.log.info('Exiting Actor', extra={'exit_code': self.exit_code}) + + async def finalize() -> None: + if self.status_message is not None: + await self.set_status_message(self.status_message, is_terminal=True) + + # Sleep for a bit so that the listeners have a chance to trigger + await asyncio.sleep(0.1) + + if self._event_listeners_timeout: + await self.event_manager.wait_for_all_listeners_to_complete(timeout=self._event_listeners_timeout) + + await self.event_manager.__aexit__(None, None, None) + await self._charging_manager_implementation.__aexit__(None, None, None) + + await asyncio.wait_for(finalize(), self._cleanup_timeout.total_seconds()) + self._is_initialized = False + + if self._exit_process: + sys.exit(self.exit_code) def __repr__(self) -> str: if self is cast('Proxy', Actor).__wrapped__: @@ -176,24 +258,58 @@ def __call__( *, configure_logging: bool = True, exit_process: bool | None = None, + exit_code: int = 0, + event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, + status_message: str | None = None, + cleanup_timeout: timedelta = timedelta(seconds=30), ) -> Self: - """Make a new Actor instance with a non-default configuration.""" + """Make a new Actor instance with a non-default configuration. + + This is necessary due to the lazy object proxying of the global `Actor` instance. + """ return self.__class__( configuration=configuration, configure_logging=configure_logging, exit_process=exit_process, + exit_code=exit_code, + event_listeners_timeout=event_listeners_timeout, + status_message=status_message, + cleanup_timeout=cleanup_timeout, ) + @property + def log(self) -> logging.Logger: + """Logger configured for this Actor.""" + return logger + + @property + def exit_code(self) -> int: + """The exit code the Actor will use when exiting.""" + return self._exit_code + + @exit_code.setter + def exit_code(self, value: int) -> None: + self._exit_code = value + + @property + def status_message(self) -> str | None: + """The final status message that the Actor will display upon termination.""" + return self._status_message + + @status_message.setter + def status_message(self, value: str | None) -> None: + self._status_message = value + @property def apify_client(self) -> ApifyClientAsync: - """The ApifyClientAsync instance the Actor instance uses.""" + """Asynchronous Apify client for interacting with the Apify API.""" if not self._apify_client: self._apify_client = self.new_client() return self._apify_client @cached_property def configuration(self) -> Configuration: - """The Configuration instance the Actor instance uses.""" + """Actor configuration, uses the default instance if not explicitly set.""" if self._configuration: return self._configuration @@ -214,7 +330,10 @@ def configuration(self) -> Configuration: @cached_property def event_manager(self) -> EventManager: - """The EventManager instance the Actor instance uses.""" + """Manages Apify platform events. + + It uses `ApifyEventManager` on the Apify platform and `LocalEventManager` otherwise. + """ return ( ApifyEventManager( configuration=self.configuration, @@ -227,18 +346,13 @@ def event_manager(self) -> EventManager: ) ) - @property - def log(self) -> logging.Logger: - """The logging.Logger instance the Actor uses.""" - return logger - - def _raise_if_not_initialized(self) -> None: - if not self._is_initialized: - raise RuntimeError('The Actor was not initialized!') + @cached_property + def _charging_manager_implementation(self) -> ChargingManagerImplementation: + return ChargingManagerImplementation(self.configuration, self.apify_client) @cached_property def _storage_client(self) -> SmartApifyStorageClient: - """Storage client used by the actor. + """Storage client used by the Actor. Depending on the initialization of the service locator the client can be created in different ways. """ @@ -250,7 +364,7 @@ def _storage_client(self) -> SmartApifyStorageClient: service_locator.set_storage_client(implicit_storage_client) except ServiceConflictError: self.log.debug( - 'Storage client in service locator was set explicitly before Actor.init was called.' + 'Storage client in service locator was set explicitly before Actor.init was called. ' 'Using the existing storage client as implicit storage client for the Actor.' ) else: @@ -270,100 +384,35 @@ def _storage_client(self) -> SmartApifyStorageClient: ) async def init(self) -> None: - """Initialize the Actor instance. - - This initializes the Actor instance. It configures the right storage client based on whether the Actor is - running locally or on the Apify platform, it initializes the event manager for processing Actor events, - and starts an interval for regularly sending `PERSIST_STATE` events, so that the Actor can regularly persist - its state in response to these events. + """Initialize the Actor without using context-manager syntax. - This method should be called immediately before performing any additional Actor actions, and it should be - called only once. + Equivalent to `await Actor.__aenter__()`. """ - self.log.info('Initializing Actor...') - if self._configuration: - # Set explicitly the configuration in the service locator - service_locator.set_configuration(self.configuration) - else: - # Ensure that the configuration (cached property) is set - _ = self.configuration - - if self._is_initialized: - raise RuntimeError('The Actor was already initialized!') - - if _ActorType._is_any_instance_initialized: - self.log.warning('Repeated Actor initialization detected - this is non-standard usage, proceed with care') - - # Make sure that the currently initialized instance is also available through the global `Actor` proxy - cast('Proxy', Actor).__wrapped__ = self - - self._is_exiting = False - self._was_final_persist_state_emitted = False - - self.log.debug(f'Storage client set to {self._storage_client}') - - service_locator.set_event_manager(self.event_manager) - - # The logging configuration has to be called after all service_locator set methods. - if self._configure_logging: - _configure_logging() - - self.log.info('System info', extra=get_system_info()) - - await self.event_manager.__aenter__() - self.log.debug('Event manager initialized') - - await self._charging_manager_implementation.__aenter__() - self.log.debug('Charging manager initialized') - - self._is_initialized = True - _ActorType._is_any_instance_initialized = True + await self.__aenter__() async def exit( self, *, exit_code: int = 0, - event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, status_message: str | None = None, + event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, cleanup_timeout: timedelta = timedelta(seconds=30), ) -> None: - """Exit the Actor instance. + """Exit the Actor without using context-manager syntax. - This stops the Actor instance. It cancels all the intervals for regularly sending `PERSIST_STATE` events, - sends a final `PERSIST_STATE` event, waits for all the event listeners to finish, and stops the event manager. + Equivalent to `await Actor.__aexit__()`. Args: - exit_code: The exit code with which the Actor should fail (defaults to `0`). - event_listeners_timeout: How long should the Actor wait for Actor event listeners to finish before exiting. - status_message: The final status message that the Actor should display. - cleanup_timeout: How long we should wait for event listeners. + exit_code: The exit code the Actor should use when exiting. + status_message: Final status message to display upon Actor termination. + event_listeners_timeout: Maximum time to wait for Actor event listeners to complete before exiting. + cleanup_timeout: Maximum time to wait for cleanup tasks to finish. """ - self._raise_if_not_initialized() - - self._is_exiting = True - - exit_code = maybe_extract_enum_member_value(exit_code) - - self.log.info('Exiting Actor', extra={'exit_code': exit_code}) - - async def finalize() -> None: - if status_message is not None: - await self.set_status_message(status_message, is_terminal=True) - - # Sleep for a bit so that the listeners have a chance to trigger - await asyncio.sleep(0.1) - - if event_listeners_timeout: - await self.event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout) - - await self.event_manager.__aexit__(None, None, None) - await self._charging_manager_implementation.__aexit__(None, None, None) - - await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds()) - self._is_initialized = False - - if self._exit_process: - sys.exit(exit_code) + self.exit_code = exit_code + self.status_message = status_message + self._event_listeners_timeout = event_listeners_timeout + self._cleanup_timeout = cleanup_timeout + await self.__aexit__(None, None, None) async def fail( self, @@ -372,23 +421,24 @@ async def fail( exception: BaseException | None = None, status_message: str | None = None, ) -> None: - """Fail the Actor instance. + """Fail the Actor instance without using context-manager syntax. - This performs all the same steps as Actor.exit(), but it additionally sets the exit code to `1` (by default). + Equivalent to setting the `self.exit_code` and `self.status_message` properties and using + `await Actor.__aexit__()`. Args: exit_code: The exit code with which the Actor should fail (defaults to `1`). exception: The exception with which the Actor failed. status_message: The final status message that the Actor should display. """ - self._raise_if_not_initialized() - - # In IPython, we don't run `sys.exit()` during Actor exits, - # so the exception traceback will be printed on its own - if exception and not is_running_in_ipython(): - self.log.exception('Actor failed with an exception', exc_info=exception) + self.exit_code = exit_code + self.status_message = status_message - await self.exit(exit_code=exit_code, status_message=status_message) + await self.__aexit__( + exc_type=type(exception) if exception else None, + exc_value=exception, + exc_traceback=exception.__traceback__ if exception else None, + ) def new_client( self, @@ -626,10 +676,6 @@ def get_charging_manager(self) -> ChargingManager: self._raise_if_not_initialized() return self._charging_manager_implementation - @cached_property - def _charging_manager_implementation(self) -> ChargingManagerImplementation: - return ChargingManagerImplementation(self.configuration, self.apify_client) - async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. @@ -822,18 +868,6 @@ async def start( return ActorRun.model_validate(api_result) - def _get_remaining_time(self) -> timedelta | None: - """Get time remaining from the Actor timeout. Returns `None` if not on an Apify platform.""" - if self.is_at_home() and self.configuration.timeout_at: - return self.configuration.timeout_at - datetime.now(tz=timezone.utc) - - self.log.warning( - 'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor' - ' is running on the Apify platform and when the timeout for the Actor run is set. ' - f'{self.is_at_home()=}, {self.configuration.timeout_at=}' - ) - return None - async def abort( self, run_id: str, @@ -1242,6 +1276,10 @@ async def create_proxy_configuration( return proxy_configuration + def _raise_if_not_initialized(self) -> None: + if not self._is_initialized: + raise RuntimeError('The Actor was not initialized!') + def _get_default_exit_process(self) -> bool: """Return False for IPython, Pytest, and Scrapy environments, True otherwise.""" if is_running_in_ipython(): @@ -1262,6 +1300,18 @@ def _get_default_exit_process(self) -> bool: return True + def _get_remaining_time(self) -> timedelta | None: + """Get time remaining from the Actor timeout. Returns `None` if not on an Apify platform.""" + if self.is_at_home() and self.configuration.timeout_at: + return self.configuration.timeout_at - datetime.now(tz=timezone.utc) + + self.log.warning( + 'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor' + ' is running on the Apify platform and when the timeout for the Actor run is set. ' + f'{self.is_at_home()=}, {self.configuration.timeout_at=}' + ) + return None + Actor = cast('_ActorType', Proxy(_ActorType)) """The entry point of the SDK, through which all the Actor operations should be done.""" diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 7d04557b..f9c9a18d 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -17,7 +17,6 @@ import apify._actor from apify import Actor -from apify._actor import _ActorType if TYPE_CHECKING: from collections.abc import Callable @@ -33,7 +32,7 @@ async def test_actor_properly_init_with_async() -> None: async def test_actor_init() -> None: - my_actor = _ActorType() + my_actor = Actor() await my_actor.init() assert my_actor._is_initialized is True @@ -57,7 +56,7 @@ async def test_double_init_raises_error(prepare_test_env: Callable) -> None: prepare_test_env() - async with _ActorType() as actor: + async with Actor() as actor: assert actor._is_initialized with pytest.raises(RuntimeError): await actor.init() @@ -78,7 +77,7 @@ def on_event(event_type: Event) -> Callable: return lambda data: on_system_info.append(data) return lambda data: print(data) - my_actor = _ActorType() + my_actor = Actor() async with my_actor: assert my_actor._is_initialized my_actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) @@ -102,17 +101,18 @@ async def test_exit_without_init_raises_error() -> None: async def test_actor_fails_cleanly() -> None: - async with _ActorType() as my_actor: - assert my_actor._is_initialized - await my_actor.fail() - assert my_actor._is_initialized is False + async with Actor() as actor: + assert actor._is_initialized + await actor.fail() + + assert actor._is_initialized is False async def test_actor_handles_failure_gracefully() -> None: my_actor = None with contextlib.suppress(Exception): - async with _ActorType() as my_actor: + async with Actor() as my_actor: assert my_actor._is_initialized raise Exception('Failed') # noqa: TRY002 diff --git a/tests/unit/actor/test_actor_log.py b/tests/unit/actor/test_actor_log.py index ecb90ab6..73925605 100644 --- a/tests/unit/actor/test_actor_log.py +++ b/tests/unit/actor/test_actor_log.py @@ -37,7 +37,7 @@ async def test_actor_logs_messages_correctly(caplog: pytest.LogCaptureFixture) - raise RuntimeError('Dummy RuntimeError') # Updated expected number of log records (an extra record is now captured) - assert len(caplog.records) == 15 + assert len(caplog.records) == 12 # Record 0: Extra Pytest context log assert caplog.records[0].levelno == logging.DEBUG @@ -51,58 +51,46 @@ async def test_actor_logs_messages_correctly(caplog: pytest.LogCaptureFixture) - assert caplog.records[2].levelno == logging.INFO assert caplog.records[2].message == 'Initializing Actor...' - # Record 2: Initializing Actor... - assert caplog.records[3].levelno == logging.DEBUG - assert caplog.records[3].message.startswith('Storage client set to') - # Record 3: System info - assert caplog.records[4].levelno == logging.INFO - assert caplog.records[4].message == 'System info' + assert caplog.records[3].levelno == logging.INFO + assert caplog.records[3].message == 'System info' # Record 4: Event manager initialized - assert caplog.records[5].levelno == logging.DEBUG - assert caplog.records[5].message == 'Event manager initialized' - - # Record 5: Charging manager initialized - assert caplog.records[6].levelno == logging.DEBUG - assert caplog.records[6].message == 'Charging manager initialized' - - # Record 6: Debug message - assert caplog.records[7].levelno == logging.DEBUG - assert caplog.records[7].message == 'Debug message' + assert caplog.records[4].levelno == logging.DEBUG + assert caplog.records[4].message == 'Debug message' # Record 7: Info message - assert caplog.records[8].levelno == logging.INFO - assert caplog.records[8].message == 'Info message' + assert caplog.records[5].levelno == logging.INFO + assert caplog.records[5].message == 'Info message' # Record 8: Warning message - assert caplog.records[9].levelno == logging.WARNING - assert caplog.records[9].message == 'Warning message' + assert caplog.records[6].levelno == logging.WARNING + assert caplog.records[6].message == 'Warning message' # Record 9: Error message - assert caplog.records[10].levelno == logging.ERROR - assert caplog.records[10].message == 'Error message' + assert caplog.records[7].levelno == logging.ERROR + assert caplog.records[7].message == 'Error message' # Record 10: Exception message with traceback (ValueError) - assert caplog.records[11].levelno == logging.ERROR - assert caplog.records[11].message == 'Exception message' - assert caplog.records[11].exc_info is not None - assert caplog.records[11].exc_info[0] is ValueError - assert isinstance(caplog.records[11].exc_info[1], ValueError) - assert str(caplog.records[11].exc_info[1]) == 'Dummy ValueError' + assert caplog.records[8].levelno == logging.ERROR + assert caplog.records[8].message == 'Exception message' + assert caplog.records[8].exc_info is not None + assert caplog.records[8].exc_info[0] is ValueError + assert isinstance(caplog.records[8].exc_info[1], ValueError) + assert str(caplog.records[8].exc_info[1]) == 'Dummy ValueError' # Record 11: Multiline log message - assert caplog.records[12].levelno == logging.INFO - assert caplog.records[12].message == 'Multi\nline\nlog\nmessage' + assert caplog.records[9].levelno == logging.INFO + assert caplog.records[9].message == 'Multi\nline\nlog\nmessage' # Record 12: Actor failed with an exception (RuntimeError) - assert caplog.records[13].levelno == logging.ERROR - assert caplog.records[13].message == 'Actor failed with an exception' - assert caplog.records[13].exc_info is not None - assert caplog.records[13].exc_info[0] is RuntimeError - assert isinstance(caplog.records[13].exc_info[1], RuntimeError) - assert str(caplog.records[13].exc_info[1]) == 'Dummy RuntimeError' + assert caplog.records[10].levelno == logging.ERROR + assert caplog.records[10].message == 'Actor failed with an exception' + assert caplog.records[10].exc_info is not None + assert caplog.records[10].exc_info[0] is RuntimeError + assert isinstance(caplog.records[10].exc_info[1], RuntimeError) + assert str(caplog.records[10].exc_info[1]) == 'Dummy RuntimeError' # Record 13: Exiting Actor - assert caplog.records[14].levelno == logging.INFO - assert caplog.records[14].message == 'Exiting Actor' + assert caplog.records[11].levelno == logging.INFO + assert caplog.records[11].message == 'Exiting Actor'