13
13
from apify_client import ApifyClientAsync
14
14
from apify_shared .consts import ActorEnvVars , ActorExitCodes , ApifyEnvVars
15
15
from apify_shared .utils import ignore_docs , maybe_extract_enum_member_value
16
- from crawlee import service_container
16
+ from crawlee import service_locator
17
17
from crawlee .events ._types import Event , EventMigratingData , EventPersistStateData
18
+ from crawlee .storage_clients import MemoryStorageClient
18
19
19
20
from apify ._configuration import Configuration
20
21
from apify ._consts import EVENT_LISTENERS_TIMEOUT
@@ -71,17 +72,22 @@ def __init__(
71
72
self ._configure_logging = configure_logging
72
73
self ._apify_client = self .new_client ()
73
74
74
- self ._event_manager : EventManager
75
- if self ._configuration .is_at_home :
76
- self ._event_manager = PlatformEventManager (
77
- config = self ._configuration ,
78
- persist_state_interval = self ._configuration .persist_state_interval ,
75
+ # We need to keep both local & cloud storage clients because of the `force_cloud` option.
76
+ self ._local_storage_client = MemoryStorageClient .from_config (config = self .config )
77
+ self ._cloud_storage_client = ApifyStorageClient .from_config (config = self .config )
78
+
79
+ # Set the event manager based on whether the Actor is running on the platform or locally.
80
+ self ._event_manager = (
81
+ PlatformEventManager (
82
+ config = self .config ,
83
+ persist_state_interval = self .config .persist_state_interval ,
79
84
)
80
- else :
81
- self . _event_manager = LocalEventManager (
82
- system_info_interval = self ._configuration .system_info_interval ,
83
- persist_state_interval = self ._configuration .persist_state_interval ,
85
+ if self . is_at_home ()
86
+ else LocalEventManager (
87
+ system_info_interval = self .config .system_info_interval ,
88
+ persist_state_interval = self .config .persist_state_interval ,
84
89
)
90
+ )
85
91
86
92
self ._is_initialized = False
87
93
@@ -94,9 +100,6 @@ async def __aenter__(self) -> Self:
94
100
When you exit the `async with` block, the `Actor.exit()` method is called, and if any exception happens while
95
101
executing the block code, the `Actor.fail` method is called.
96
102
"""
97
- if self ._configure_logging :
98
- _configure_logging (self ._configuration )
99
-
100
103
await self .init ()
101
104
return self
102
105
@@ -184,18 +187,21 @@ async def init(self) -> None:
184
187
if self ._is_initialized :
185
188
raise RuntimeError ('The Actor was already initialized!' )
186
189
187
- if self ._configuration . token :
188
- service_container . set_cloud_storage_client ( ApifyStorageClient ( configuration = self ._configuration ))
190
+ self ._is_exiting = False
191
+ self ._was_final_persist_state_emitted = False
189
192
190
- if self ._configuration .is_at_home :
191
- service_container .set_default_storage_client_type ('cloud' )
193
+ # Register services in the service locator.
194
+ if self .is_at_home ():
195
+ service_locator .set_storage_client (self ._cloud_storage_client )
192
196
else :
193
- service_container . set_default_storage_client_type ( 'local' )
197
+ service_locator . set_storage_client ( self . _local_storage_client )
194
198
195
- service_container .set_event_manager (self ._event_manager )
199
+ service_locator .set_event_manager (self .event_manager )
200
+ service_locator .set_configuration (self .configuration )
196
201
197
- self ._is_exiting = False
198
- self ._was_final_persist_state_emitted = False
202
+ # The logging configuration has to be called after all service_locator set methods.
203
+ if self ._configure_logging :
204
+ _configure_logging ()
199
205
200
206
self .log .info ('Initializing Actor...' )
201
207
self .log .info ('System info' , extra = get_system_info ())
@@ -245,7 +251,6 @@ async def finalize() -> None:
245
251
await self ._event_manager .wait_for_all_listeners_to_complete (timeout = event_listeners_timeout )
246
252
247
253
await self ._event_manager .__aexit__ (None , None , None )
248
- cast (dict , service_container ._services ).clear () # noqa: SLF001
249
254
250
255
await asyncio .wait_for (finalize (), cleanup_timeout .total_seconds ())
251
256
self ._is_initialized = False
@@ -349,11 +354,13 @@ async def open_dataset(
349
354
self ._raise_if_not_initialized ()
350
355
self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
351
356
357
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
358
+
352
359
return await Dataset .open (
353
360
id = id ,
354
361
name = name ,
355
362
configuration = self ._configuration ,
356
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
363
+ storage_client = storage_client ,
357
364
)
358
365
359
366
async def open_key_value_store (
@@ -381,12 +388,13 @@ async def open_key_value_store(
381
388
"""
382
389
self ._raise_if_not_initialized ()
383
390
self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
391
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
384
392
385
393
return await KeyValueStore .open (
386
394
id = id ,
387
395
name = name ,
388
396
configuration = self ._configuration ,
389
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
397
+ storage_client = storage_client ,
390
398
)
391
399
392
400
async def open_request_queue (
@@ -417,11 +425,13 @@ async def open_request_queue(
417
425
self ._raise_if_not_initialized ()
418
426
self ._raise_if_cloud_requested_but_not_configured (force_cloud = force_cloud )
419
427
428
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
429
+
420
430
return await RequestQueue .open (
421
431
id = id ,
422
432
name = name ,
423
433
configuration = self ._configuration ,
424
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
434
+ storage_client = storage_client ,
425
435
)
426
436
427
437
async def push_data (self , data : dict | list [dict ]) -> None :
@@ -963,7 +973,7 @@ async def create_proxy_configuration(
963
973
password : str | None = None ,
964
974
groups : list [str ] | None = None ,
965
975
country_code : str | None = None ,
966
- proxy_urls : list [str ] | None = None ,
976
+ proxy_urls : list [str | None ] | None = None ,
967
977
new_url_function : _NewUrlFunction | None = None ,
968
978
) -> ProxyConfiguration | None :
969
979
"""Create a ProxyConfiguration object with the passed proxy configuration.
0 commit comments