12
12
from apify_client import ApifyClientAsync
13
13
from apify_shared .consts import ActorEnvVars , ActorExitCodes , ApifyEnvVars
14
14
from apify_shared .utils import ignore_docs , maybe_extract_enum_member_value
15
- from crawlee import service_container
15
+ from crawlee import service_locator
16
16
from crawlee .events ._types import Event , EventPersistStateData
17
+ from crawlee .memory_storage_client import MemoryStorageClient
17
18
18
19
from apify ._configuration import Configuration
19
20
from apify ._consts import EVENT_LISTENERS_TIMEOUT
@@ -69,17 +70,31 @@ def __init__(
69
70
self ._configure_logging = configure_logging
70
71
self ._apify_client = self .new_client ()
71
72
72
- self ._event_manager : EventManager
73
- if self ._configuration .is_at_home :
74
- self ._event_manager = PlatformEventManager (
73
+ # We need to keep both local & cloud storage clients because of the `force_cloud` option.
74
+ self ._local_storage_client = MemoryStorageClient .from_config ()
75
+ self ._cloud_storage_client = ApifyStorageClient (configuration = self ._configuration )
76
+
77
+ # Set the event manager based on whether the Actor is running on the platform or locally.
78
+ self ._event_manager = (
79
+ PlatformEventManager (
75
80
config = self ._configuration ,
76
81
persist_state_interval = self ._configuration .persist_state_interval ,
77
82
)
78
- else :
79
- self . _event_manager = LocalEventManager (
83
+ if self . is_at_home ()
84
+ else LocalEventManager (
80
85
system_info_interval = self ._configuration .system_info_interval ,
81
86
persist_state_interval = self ._configuration .persist_state_interval ,
82
87
)
88
+ )
89
+
90
+ # Register services in the service locator.
91
+ if self .is_at_home ():
92
+ service_locator .set_storage_client (self ._cloud_storage_client )
93
+ else :
94
+ service_locator .set_storage_client (self ._local_storage_client )
95
+
96
+ service_locator .set_event_manager (self .event_manager )
97
+ service_locator .set_configuration (self .configuration )
83
98
84
99
self ._is_initialized = False
85
100
@@ -93,7 +108,7 @@ async def __aenter__(self) -> Self:
93
108
executing the block code, the `Actor.fail` method is called.
94
109
"""
95
110
if self ._configure_logging :
96
- _configure_logging (self . _configuration )
111
+ _configure_logging ()
97
112
98
113
await self .init ()
99
114
return self
@@ -172,16 +187,6 @@ async def init(self) -> None:
172
187
if self ._is_initialized :
173
188
raise RuntimeError ('The Actor was already initialized!' )
174
189
175
- if self ._configuration .token :
176
- service_container .set_cloud_storage_client (ApifyStorageClient (configuration = self ._configuration ))
177
-
178
- if self ._configuration .is_at_home :
179
- service_container .set_default_storage_client_type ('cloud' )
180
- else :
181
- service_container .set_default_storage_client_type ('local' )
182
-
183
- service_container .set_event_manager (self ._event_manager )
184
-
185
190
self ._is_exiting = False
186
191
self ._was_final_persist_state_emitted = False
187
192
@@ -233,7 +238,6 @@ async def finalize() -> None:
233
238
await self ._event_manager .wait_for_all_listeners_to_complete (timeout = event_listeners_timeout )
234
239
235
240
await self ._event_manager .__aexit__ (None , None , None )
236
- cast (dict , service_container ._services ).clear () # noqa: SLF001
237
241
238
242
await asyncio .wait_for (finalize (), cleanup_timeout .total_seconds ())
239
243
self ._is_initialized = False
@@ -335,12 +339,13 @@ async def open_dataset(
335
339
An instance of the `Dataset` class for the given ID or name.
336
340
"""
337
341
self ._raise_if_not_initialized ()
342
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
338
343
339
344
return await Dataset .open (
340
345
id = id ,
341
346
name = name ,
342
347
configuration = self ._configuration ,
343
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
348
+ storage_client = storage_client ,
344
349
)
345
350
346
351
async def open_key_value_store (
@@ -367,12 +372,13 @@ async def open_key_value_store(
367
372
An instance of the `KeyValueStore` class for the given ID or name.
368
373
"""
369
374
self ._raise_if_not_initialized ()
375
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
370
376
371
377
return await KeyValueStore .open (
372
378
id = id ,
373
379
name = name ,
374
380
configuration = self ._configuration ,
375
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
381
+ storage_client = storage_client ,
376
382
)
377
383
378
384
async def open_request_queue (
@@ -401,12 +407,13 @@ async def open_request_queue(
401
407
An instance of the `RequestQueue` class for the given ID or name.
402
408
"""
403
409
self ._raise_if_not_initialized ()
410
+ storage_client = self ._cloud_storage_client if force_cloud else service_locator .get_storage_client ()
404
411
405
412
return await RequestQueue .open (
406
413
id = id ,
407
414
name = name ,
408
415
configuration = self ._configuration ,
409
- storage_client = service_container . get_storage_client ( client_type = 'cloud' if force_cloud else None ) ,
416
+ storage_client = storage_client ,
410
417
)
411
418
412
419
async def push_data (self , data : dict | list [dict ]) -> None :
0 commit comments