Skip to content

Commit 1cc3d31

Browse files
committed
fix: prevent reboot loop, allow calling reboot from migrating handler, align reboot behavior with JS SDK
1 parent 2d4b8d0 commit 1cc3d31

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

src/apify/_actor.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import sys
66
from datetime import timedelta
7+
from itertools import chain
78
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
89

910
from lazy_object_proxy import Proxy
@@ -13,7 +14,7 @@
1314
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
1415
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
1516
from crawlee import service_container
16-
from crawlee.events._types import Event, EventPersistStateData
17+
from crawlee.events._types import Event, EventMigratingData, EventPersistStateData
1718

1819
from apify._configuration import Configuration
1920
from apify._consts import EVENT_LISTENERS_TIMEOUT
@@ -48,6 +49,7 @@ class _ActorType:
4849
_apify_client: ApifyClientAsync
4950
_configuration: Configuration
5051
_is_exiting = False
52+
_is_rebooting = False
5153

5254
def __init__(
5355
self,
@@ -839,12 +841,29 @@ async def reboot(
839841
self.log.error('Actor.reboot() is only supported when running on the Apify platform.')
840842
return
841843

844+
if self._is_rebooting:
845+
self.log.debug('Actor is already rebooting, skipping the additional reboot call.')
846+
return
847+
848+
self._is_rebooting = True
849+
842850
if not custom_after_sleep:
843851
custom_after_sleep = self._configuration.metamorph_after_sleep
844852

845-
self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True))
853+
# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
854+
# We can't just emit the events and wait for all listeners to finish,
855+
# because this method might be called from an event listener itself, and we would deadlock.
856+
persist_state_listeners = chain.from_iterable(
857+
(self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
858+
)
859+
migrating_listeners = chain.from_iterable(
860+
(self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
861+
)
846862

847-
await self._event_manager.__aexit__(None, None, None)
863+
await asyncio.gather(
864+
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
865+
*[listener(EventMigratingData()) for listener in migrating_listeners],
866+
)
848867

849868
if not self._configuration.actor_run_id:
850869
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

0 commit comments

Comments
 (0)