From bdd213befe769f2e247c3c83a9ffd724e6e5ad0b Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 20 Aug 2025 11:07:15 +0200 Subject: [PATCH] fix: Resolve DeprecationWarning in ApifyEventManager --- src/apify/_actor.py | 2 +- src/apify/events/_apify_event_manager.py | 61 +++++++++++++++--------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 631295cf..c9044117 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -130,7 +130,7 @@ def __init__( # Set the event manager based on whether the Actor is running on the platform or locally. self._event_manager = ( ApifyEventManager( - config=self._configuration, + configuration=self._configuration, persist_state_interval=self._configuration.persist_state_interval, ) if self.is_at_home() diff --git a/src/apify/events/_apify_event_manager.py b/src/apify/events/_apify_event_manager.py index 5b6e6f55..e947cb50 100644 --- a/src/apify/events/_apify_event_manager.py +++ b/src/apify/events/_apify_event_manager.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib from typing import TYPE_CHECKING, Annotated import websockets.asyncio.client @@ -29,29 +30,41 @@ @docs_group('Event managers') class ApifyEventManager(EventManager): - """A class for managing Actor events. + """Event manager for the Apify platform. - You shouldn't use this class directly, - but instead use it via the `Actor.on()` and `Actor.off()` methods. - """ + This class extends Crawlee's `EventManager` to provide Apify-specific functionality, including websocket + connectivity to the Apify platform for receiving platform events. + + The event manager handles: + - Registration and emission of events and their listeners. + - Websocket connection to Apify platform events. + - Processing and validation of platform messages. + - Automatic event forwarding from the platform to local event listeners. - _platform_events_websocket: websockets.asyncio.client.ClientConnection | None = None - _process_platform_messages_task: asyncio.Task | None = None - _send_system_info_interval_task: asyncio.Task | None = None - _connected_to_platform_websocket: asyncio.Future = asyncio.Future() + This class should not be used directly. Use the `Actor.on` and `Actor.off` methods to interact + with the event system. + """ - def __init__(self, config: Configuration, **kwargs: Unpack[EventManagerOptions]) -> None: - """Create an instance of the EventManager. + def __init__(self, configuration: Configuration, **kwargs: Unpack[EventManagerOptions]) -> None: + """Initialize a new instance. Args: - config: The Actor configuration to be used in this event manager. - kwargs: Event manager options - forwarded to the base class + configuration: The Actor configuration for the event manager. + **kwargs: Additional event manager options passed to the parent class. """ super().__init__(**kwargs) - self._config = config - self._listener_tasks = set() - self._connected_to_platform_websocket = asyncio.Future[bool]() + self._configuration = configuration + """The Actor configuration for the event manager.""" + + self._platform_events_websocket: websockets.asyncio.client.ClientConnection | None = None + """WebSocket connection to the platform events.""" + + self._process_platform_messages_task: asyncio.Task | None = None + """Task for processing messages from the platform websocket.""" + + self._connected_to_platform_websocket: asyncio.Future[bool] | None = None + """Future that resolves when the connection to the platform websocket is established.""" @override async def __aenter__(self) -> Self: @@ -59,9 +72,9 @@ async def __aenter__(self) -> Self: self._connected_to_platform_websocket = asyncio.Future() # Run tasks but don't await them - if self._config.actor_events_ws_url: + if self._configuration.actor_events_ws_url: self._process_platform_messages_task = asyncio.create_task( - self._process_platform_messages(self._config.actor_events_ws_url) + self._process_platform_messages(self._configuration.actor_events_ws_url) ) is_connected = await self._connected_to_platform_websocket if not is_connected: @@ -81,8 +94,10 @@ async def __aexit__( if self._platform_events_websocket: await self._platform_events_websocket.close() - if self._process_platform_messages_task: - await self._process_platform_messages_task + if self._process_platform_messages_task and not self._process_platform_messages_task.done(): + self._process_platform_messages_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._process_platform_messages_task await super().__aexit__(exc_type, exc_value, exc_traceback) @@ -90,7 +105,8 @@ async def _process_platform_messages(self, ws_url: str) -> None: try: async with websockets.asyncio.client.connect(ws_url) as websocket: self._platform_events_websocket = websocket - self._connected_to_platform_websocket.set_result(True) + if self._connected_to_platform_websocket is not None: + self._connected_to_platform_websocket.set_result(True) async for message in websocket: try: @@ -110,7 +126,7 @@ async def _process_platform_messages(self, ws_url: str) -> None: event=parsed_message.name, event_data=parsed_message.data if not isinstance(parsed_message.data, SystemInfoEventData) - else parsed_message.data.to_crawlee_format(self._config.dedicated_cpus or 1), + else parsed_message.data.to_crawlee_format(self._configuration.dedicated_cpus or 1), ) if parsed_message.name == Event.MIGRATING: @@ -120,4 +136,5 @@ async def _process_platform_messages(self, ws_url: str) -> None: logger.exception('Cannot parse Actor event', extra={'message': message}) except Exception: logger.exception('Error in websocket connection') - self._connected_to_platform_websocket.set_result(False) + if self._connected_to_platform_websocket is not None: + self._connected_to_platform_websocket.set_result(False)