|
7 | 7 | from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
|
8 | 8 |
|
9 | 9 | from lazy_object_proxy import Proxy
|
| 10 | +from more_itertools import flatten |
10 | 11 | from pydantic import AliasChoices
|
11 | 12 |
|
12 | 13 | from apify_client import ApifyClientAsync
|
13 | 14 | from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
|
14 | 15 | from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
|
15 | 16 | from crawlee import service_container
|
16 |
| -from crawlee.events._types import Event, EventPersistStateData |
| 17 | +from crawlee.events._types import Event, EventMigratingData, EventPersistStateData |
17 | 18 |
|
18 | 19 | from apify._configuration import Configuration
|
19 | 20 | from apify._consts import EVENT_LISTENERS_TIMEOUT
|
@@ -48,6 +49,7 @@ class _ActorType:
|
48 | 49 | _apify_client: ApifyClientAsync
|
49 | 50 | _configuration: Configuration
|
50 | 51 | _is_exiting = False
|
| 52 | + _is_rebooting = False |
51 | 53 |
|
52 | 54 | def __init__(
|
53 | 55 | self,
|
@@ -839,12 +841,32 @@ async def reboot(
|
839 | 841 | self.log.error('Actor.reboot() is only supported when running on the Apify platform.')
|
840 | 842 | return
|
841 | 843 |
|
| 844 | + if self._is_rebooting: |
| 845 | + self.log.debug('Actor is already rebooting, skipping the additional reboot call.') |
| 846 | + return |
| 847 | + |
| 848 | + self._is_rebooting = True |
| 849 | + |
842 | 850 | if not custom_after_sleep:
|
843 | 851 | custom_after_sleep = self._configuration.metamorph_after_sleep
|
844 | 852 |
|
845 |
| - self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True)) |
| 853 | + # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. |
| 854 | + # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. |
| 855 | + # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot. |
| 856 | + # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. |
| 857 | + # We can't just emit the events and wait for all listeners to finish, |
| 858 | + # because this method might be called from an event listener itself, and we would deadlock. |
| 859 | + persist_state_listeners = flatten( |
| 860 | + (self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 |
| 861 | + ) |
| 862 | + migrating_listeners = flatten( |
| 863 | + (self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 |
| 864 | + ) |
846 | 865 |
|
847 |
| - await self._event_manager.__aexit__(None, None, None) |
| 866 | + await asyncio.gather( |
| 867 | + *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], |
| 868 | + *[listener(EventMigratingData()) for listener in migrating_listeners], |
| 869 | + ) |
848 | 870 |
|
849 | 871 | if not self._configuration.actor_run_id:
|
850 | 872 | raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
|
|
0 commit comments