1
1
from __future__ import annotations
2
2
3
3
import asyncio
4
+ import contextlib
4
5
from typing import TYPE_CHECKING , Annotated
5
6
6
7
import websockets .asyncio .client
29
30
30
31
@docs_group ('Event managers' )
31
32
class ApifyEventManager (EventManager ):
32
- """A class for managing Actor events .
33
+ """Event manager for the Apify platform .
33
34
34
- You shouldn't use this class directly,
35
- but instead use it via the `Actor.on()` and `Actor.off()` methods.
36
- """
35
+ This class extends Crawlee's `EventManager` to provide Apify-specific functionality, including websocket
36
+ connectivity to the Apify platform for receiving platform events.
37
+
38
+ The event manager handles:
39
+ - Registration and emission of events and their listeners.
40
+ - Websocket connection to Apify platform events.
41
+ - Processing and validation of platform messages.
42
+ - Automatic event forwarding from the platform to local event listeners.
37
43
38
- _platform_events_websocket : websockets .asyncio .client .ClientConnection | None = None
39
- _process_platform_messages_task : asyncio .Task | None = None
40
- _send_system_info_interval_task : asyncio .Task | None = None
41
- _connected_to_platform_websocket : asyncio .Future = asyncio .Future ()
44
+ This class should not be used directly. Use the `Actor.on` and `Actor.off` methods to interact
45
+ with the event system.
46
+ """
42
47
43
- def __init__ (self , config : Configuration , ** kwargs : Unpack [EventManagerOptions ]) -> None :
44
- """Create an instance of the EventManager .
48
+ def __init__ (self , configuration : Configuration , ** kwargs : Unpack [EventManagerOptions ]) -> None :
49
+ """Initialize a new instance .
45
50
46
51
Args:
47
- config : The Actor configuration to be used in this event manager.
48
- kwargs: Event manager options - forwarded to the base class
52
+ configuration : The Actor configuration for the event manager.
53
+ ** kwargs: Additional event manager options passed to the parent class.
49
54
"""
50
55
super ().__init__ (** kwargs )
51
56
52
- self ._config = config
53
- self ._listener_tasks = set ()
54
- self ._connected_to_platform_websocket = asyncio .Future [bool ]()
57
+ self ._configuration = configuration
58
+ """The Actor configuration for the event manager."""
59
+
60
+ self ._platform_events_websocket : websockets .asyncio .client .ClientConnection | None = None
61
+ """WebSocket connection to the platform events."""
62
+
63
+ self ._process_platform_messages_task : asyncio .Task | None = None
64
+ """Task for processing messages from the platform websocket."""
65
+
66
+ self ._connected_to_platform_websocket : asyncio .Future [bool ] | None = None
67
+ """Future that resolves when the connection to the platform websocket is established."""
55
68
56
69
@override
57
70
async def __aenter__ (self ) -> Self :
58
71
await super ().__aenter__ ()
59
72
self ._connected_to_platform_websocket = asyncio .Future ()
60
73
61
74
# Run tasks but don't await them
62
- if self ._config .actor_events_ws_url :
75
+ if self ._configuration .actor_events_ws_url :
63
76
self ._process_platform_messages_task = asyncio .create_task (
64
- self ._process_platform_messages (self ._config .actor_events_ws_url )
77
+ self ._process_platform_messages (self ._configuration .actor_events_ws_url )
65
78
)
66
79
is_connected = await self ._connected_to_platform_websocket
67
80
if not is_connected :
@@ -81,16 +94,19 @@ async def __aexit__(
81
94
if self ._platform_events_websocket :
82
95
await self ._platform_events_websocket .close ()
83
96
84
- if self ._process_platform_messages_task :
85
- await self ._process_platform_messages_task
97
+ if self ._process_platform_messages_task and not self ._process_platform_messages_task .done ():
98
+ self ._process_platform_messages_task .cancel ()
99
+ with contextlib .suppress (asyncio .CancelledError ):
100
+ await self ._process_platform_messages_task
86
101
87
102
await super ().__aexit__ (exc_type , exc_value , exc_traceback )
88
103
89
104
async def _process_platform_messages (self , ws_url : str ) -> None :
90
105
try :
91
106
async with websockets .asyncio .client .connect (ws_url ) as websocket :
92
107
self ._platform_events_websocket = websocket
93
- self ._connected_to_platform_websocket .set_result (True )
108
+ if self ._connected_to_platform_websocket is not None :
109
+ self ._connected_to_platform_websocket .set_result (True )
94
110
95
111
async for message in websocket :
96
112
try :
@@ -110,7 +126,7 @@ async def _process_platform_messages(self, ws_url: str) -> None:
110
126
event = parsed_message .name ,
111
127
event_data = parsed_message .data
112
128
if not isinstance (parsed_message .data , SystemInfoEventData )
113
- else parsed_message .data .to_crawlee_format (self ._config .dedicated_cpus or 1 ),
129
+ else parsed_message .data .to_crawlee_format (self ._configuration .dedicated_cpus or 1 ),
114
130
)
115
131
116
132
if parsed_message .name == Event .MIGRATING :
@@ -120,4 +136,5 @@ async def _process_platform_messages(self, ws_url: str) -> None:
120
136
logger .exception ('Cannot parse Actor event' , extra = {'message' : message })
121
137
except Exception :
122
138
logger .exception ('Error in websocket connection' )
123
- self ._connected_to_platform_websocket .set_result (False )
139
+ if self ._connected_to_platform_websocket is not None :
140
+ self ._connected_to_platform_websocket .set_result (False )
0 commit comments