Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(
# Set the event manager based on whether the Actor is running on the platform or locally.
self._event_manager = (
ApifyEventManager(
config=self._configuration,
configuration=self._configuration,
persist_state_interval=self._configuration.persist_state_interval,
)
if self.is_at_home()
Expand Down
61 changes: 39 additions & 22 deletions src/apify/events/_apify_event_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
from typing import TYPE_CHECKING, Annotated

import websockets.asyncio.client
Expand Down Expand Up @@ -29,39 +30,51 @@

@docs_group('Event managers')
class ApifyEventManager(EventManager):
"""A class for managing Actor events.
"""Event manager for the Apify platform.

You shouldn't use this class directly,
but instead use it via the `Actor.on()` and `Actor.off()` methods.
"""
This class extends Crawlee's `EventManager` to provide Apify-specific functionality, including websocket
connectivity to the Apify platform for receiving platform events.

The event manager handles:
- Registration and emission of events and their listeners.
- Websocket connection to Apify platform events.
- Processing and validation of platform messages.
- Automatic event forwarding from the platform to local event listeners.

_platform_events_websocket: websockets.asyncio.client.ClientConnection | None = None
_process_platform_messages_task: asyncio.Task | None = None
_send_system_info_interval_task: asyncio.Task | None = None
_connected_to_platform_websocket: asyncio.Future = asyncio.Future()
This class should not be used directly. Use the `Actor.on` and `Actor.off` methods to interact
with the event system.
"""

def __init__(self, config: Configuration, **kwargs: Unpack[EventManagerOptions]) -> None:
"""Create an instance of the EventManager.
def __init__(self, configuration: Configuration, **kwargs: Unpack[EventManagerOptions]) -> None:
"""Initialize a new instance.

Args:
config: The Actor configuration to be used in this event manager.
kwargs: Event manager options - forwarded to the base class
configuration: The Actor configuration for the event manager.
**kwargs: Additional event manager options passed to the parent class.
"""
super().__init__(**kwargs)

self._config = config
self._listener_tasks = set()
self._connected_to_platform_websocket = asyncio.Future[bool]()
self._configuration = configuration
"""The Actor configuration for the event manager."""

self._platform_events_websocket: websockets.asyncio.client.ClientConnection | None = None
"""WebSocket connection to the platform events."""

self._process_platform_messages_task: asyncio.Task | None = None
"""Task for processing messages from the platform websocket."""

self._connected_to_platform_websocket: asyncio.Future[bool] | None = None
"""Future that resolves when the connection to the platform websocket is established."""

@override
async def __aenter__(self) -> Self:
await super().__aenter__()
self._connected_to_platform_websocket = asyncio.Future()

# Run tasks but don't await them
if self._config.actor_events_ws_url:
if self._configuration.actor_events_ws_url:
self._process_platform_messages_task = asyncio.create_task(
self._process_platform_messages(self._config.actor_events_ws_url)
self._process_platform_messages(self._configuration.actor_events_ws_url)
)
is_connected = await self._connected_to_platform_websocket
if not is_connected:
Expand All @@ -81,16 +94,19 @@ async def __aexit__(
if self._platform_events_websocket:
await self._platform_events_websocket.close()

if self._process_platform_messages_task:
await self._process_platform_messages_task
if self._process_platform_messages_task and not self._process_platform_messages_task.done():
self._process_platform_messages_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._process_platform_messages_task

await super().__aexit__(exc_type, exc_value, exc_traceback)

async def _process_platform_messages(self, ws_url: str) -> None:
try:
async with websockets.asyncio.client.connect(ws_url) as websocket:
self._platform_events_websocket = websocket
self._connected_to_platform_websocket.set_result(True)
if self._connected_to_platform_websocket is not None:
self._connected_to_platform_websocket.set_result(True)

async for message in websocket:
try:
Expand All @@ -110,7 +126,7 @@ async def _process_platform_messages(self, ws_url: str) -> None:
event=parsed_message.name,
event_data=parsed_message.data
if not isinstance(parsed_message.data, SystemInfoEventData)
else parsed_message.data.to_crawlee_format(self._config.dedicated_cpus or 1),
else parsed_message.data.to_crawlee_format(self._configuration.dedicated_cpus or 1),
)

if parsed_message.name == Event.MIGRATING:
Expand All @@ -120,4 +136,5 @@ async def _process_platform_messages(self, ws_url: str) -> None:
logger.exception('Cannot parse Actor event', extra={'message': message})
except Exception:
logger.exception('Error in websocket connection')
self._connected_to_platform_websocket.set_result(False)
if self._connected_to_platform_websocket is not None:
self._connected_to_platform_websocket.set_result(False)
Loading