77from typing import TYPE_CHECKING , Any , Callable , Literal , TypeVar , cast , overload
88
99from lazy_object_proxy import Proxy
10+ from more_itertools import flatten
1011from pydantic import AliasChoices
1112
1213from apify_client import ApifyClientAsync
1314from apify_shared .consts import ActorEnvVars , ActorExitCodes , ApifyEnvVars
1415from apify_shared .utils import ignore_docs , maybe_extract_enum_member_value
15- from crawlee import service_container
16+ from crawlee import service_locator
1617from crawlee .events import (
1718 Event ,
1819 EventAbortingData ,
4142 from typing_extensions import Self
4243
4344 from crawlee .proxy_configuration import _NewUrlFunction
45+ from crawlee .storage_clients import BaseStorageClient
4446
4547 from apify ._models import Webhook
4648
@@ -56,6 +58,7 @@ class _ActorType:
5658 _apify_client : ApifyClientAsync
5759 _configuration : Configuration
5860 _is_exiting = False
61+ _is_rebooting = False
5962
6063 def __init__ (
6164 self ,
@@ -77,17 +80,22 @@ def __init__(
7780 self ._configure_logging = configure_logging
7881 self ._apify_client = self .new_client ()
7982
80- self ._event_manager : EventManager
81- if self ._configuration .is_at_home :
82- self ._event_manager = PlatformEventManager (
83+ # Create an instance of the cloud storage client, the local storage client is obtained
84+ # from the service locator.
85+ self ._cloud_storage_client = ApifyStorageClient .from_config (config = self ._configuration )
86+
87+ # Set the event manager based on whether the Actor is running on the platform or locally.
88+ self ._event_manager = (
89+ PlatformEventManager (
8390 config = self ._configuration ,
8491 persist_state_interval = self ._configuration .persist_state_interval ,
8592 )
86- else :
87- self . _event_manager = LocalEventManager (
93+ if self . is_at_home ()
94+ else LocalEventManager (
8895 system_info_interval = self ._configuration .system_info_interval ,
8996 persist_state_interval = self ._configuration .persist_state_interval ,
9097 )
98+ )
9199
92100 self ._is_initialized = False
93101
@@ -100,9 +108,6 @@ async def __aenter__(self) -> Self:
100108 When you exit the `async with` block, the `Actor.exit()` method is called, and if any exception happens while
101109 executing the block code, the `Actor.fail` method is called.
102110 """
103- if self ._configure_logging :
104- _configure_logging (self ._configuration )
105-
106111 await self .init ()
107112 return self
108113
@@ -162,10 +167,25 @@ def log(self) -> logging.Logger:
162167 """The logging.Logger instance the Actor uses."""
163168 return logger
164169
170+ @property
171+ def _local_storage_client (self ) -> BaseStorageClient :
172+ """The local storage client the Actor instance uses."""
173+ return service_locator .get_storage_client ()
174+
165175 def _raise_if_not_initialized (self ) -> None :
166176 if not self ._is_initialized :
167177 raise RuntimeError ('The Actor was not initialized!' )
168178
179+ def _raise_if_cloud_requested_but_not_configured (self , * , force_cloud : bool ) -> None :
180+ if not force_cloud :
181+ return
182+
183+ if not self .is_at_home () and self .config .token is None :
184+ raise RuntimeError (
185+ 'In order to use the Apify cloud storage from your computer, '
186+ 'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
187+ )
188+
169189 async def init (self ) -> None :
170190 """Initialize the Actor instance.
171191
@@ -180,18 +200,19 @@ async def init(self) -> None:
180200 if self ._is_initialized :
181201 raise RuntimeError ('The Actor was already initialized!' )
182202
183- if self ._configuration . token :
184- service_container . set_cloud_storage_client ( ApifyStorageClient ( configuration = self ._configuration ))
203+ self ._is_exiting = False
204+ self ._was_final_persist_state_emitted = False
185205
186- if self ._configuration .is_at_home :
187- service_container .set_default_storage_client_type ('cloud' )
188- else :
189- service_container .set_default_storage_client_type ('local' )
206+ # If the Actor is running on the Apify platform, we set the cloud storage client.
207+ if self .is_at_home ():
208+ service_locator .set_storage_client (self ._cloud_storage_client )
190209
191- service_container .set_event_manager (self ._event_manager )
210+ service_locator .set_event_manager (self .event_manager )
211+ service_locator .set_configuration (self .configuration )
192212
193- self ._is_exiting = False
194- self ._was_final_persist_state_emitted = False
213+ # The logging configuration has to be called after all service_locator set methods.
214+ if self ._configure_logging :
215+ _configure_logging ()
195216
196217 self .log .info ('Initializing Actor...' )
197218 self .log .info ('System info' , extra = get_system_info ())
@@ -241,7 +262,6 @@ async def finalize() -> None:
241262 await self ._event_manager .wait_for_all_listeners_to_complete (timeout = event_listeners_timeout )
242263
243264 await self ._event_manager .__aexit__ (None , None , None )
244- cast (dict , service_container ._services ).clear () # noqa: SLF001
245265
246266 await asyncio .wait_for (finalize (), cleanup_timeout .total_seconds ())
247267 self ._is_initialized = False
@@ -343,12 +363,15 @@ async def open_dataset(
343363 An instance of the `Dataset` class for the given ID or name.
344364 """
345365 self ._raise_if_not_initialized ()
366+ self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
367+
368+ storage_client = self ._cloud_storage_client if force_cloud else self ._local_storage_client
346369
347370 return await Dataset .open (
348371 id = id ,
349372 name = name ,
350373 configuration = self ._configuration ,
351- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
374+ storage_client = storage_client ,
352375 )
353376
354377 async def open_key_value_store (
@@ -375,12 +398,14 @@ async def open_key_value_store(
375398 An instance of the `KeyValueStore` class for the given ID or name.
376399 """
377400 self ._raise_if_not_initialized ()
401+ self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
402+ storage_client = self ._cloud_storage_client if force_cloud else self ._local_storage_client
378403
379404 return await KeyValueStore .open (
380405 id = id ,
381406 name = name ,
382407 configuration = self ._configuration ,
383- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
408+ storage_client = storage_client ,
384409 )
385410
386411 async def open_request_queue (
@@ -409,12 +434,15 @@ async def open_request_queue(
409434 An instance of the `RequestQueue` class for the given ID or name.
410435 """
411436 self ._raise_if_not_initialized ()
437+ self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
438+
439+ storage_client = self ._cloud_storage_client if force_cloud else self ._local_storage_client
412440
413441 return await RequestQueue .open (
414442 id = id ,
415443 name = name ,
416444 configuration = self ._configuration ,
417- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
445+ storage_client = storage_client ,
418446 )
419447
420448 async def push_data (self , data : dict | list [dict ]) -> None :
@@ -870,12 +898,32 @@ async def reboot(
870898 self .log .error ('Actor.reboot() is only supported when running on the Apify platform.' )
871899 return
872900
901+ if self ._is_rebooting :
902+ self .log .debug ('Actor is already rebooting, skipping the additional reboot call.' )
903+ return
904+
905+ self ._is_rebooting = True
906+
873907 if not custom_after_sleep :
874908 custom_after_sleep = self ._configuration .metamorph_after_sleep
875909
876- self ._event_manager .emit (event = Event .PERSIST_STATE , event_data = EventPersistStateData (is_migrating = True ))
910+ # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
911+ # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
912+ # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot.
913+ # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests.
914+ # We can't just emit the events and wait for all listeners to finish,
915+ # because this method might be called from an event listener itself, and we would deadlock.
916+ persist_state_listeners = flatten (
917+ (self ._event_manager ._listeners_to_wrappers [Event .PERSIST_STATE ] or {}).values () # noqa: SLF001
918+ )
919+ migrating_listeners = flatten (
920+ (self ._event_manager ._listeners_to_wrappers [Event .MIGRATING ] or {}).values () # noqa: SLF001
921+ )
877922
878- await self ._event_manager .__aexit__ (None , None , None )
923+ await asyncio .gather (
924+ * [listener (EventPersistStateData (is_migrating = True )) for listener in persist_state_listeners ],
925+ * [listener (EventMigratingData ()) for listener in migrating_listeners ],
926+ )
879927
880928 if not self ._configuration .actor_run_id :
881929 raise RuntimeError ('actor_run_id cannot be None when running on the Apify platform.' )
@@ -972,7 +1020,7 @@ async def create_proxy_configuration(
9721020 password : str | None = None ,
9731021 groups : list [str ] | None = None ,
9741022 country_code : str | None = None ,
975- proxy_urls : list [str ] | None = None ,
1023+ proxy_urls : list [str | None ] | None = None ,
9761024 new_url_function : _NewUrlFunction | None = None ,
9771025 ) -> ProxyConfiguration | None :
9781026 """Create a ProxyConfiguration object with the passed proxy configuration.
0 commit comments