Skip to content

Commit ba7f757

Browse files
Pijukatelvdusek
andauthored
feat: Add RemainingTime option for timeout argument of Actor.call and Actor.start (#473)
### Description Added convenient way to start another Actor with reasonable timeout to prevent lingering actor runs after main actor shuts down. Added test. `RemainingTime` value of the timeout argument is not a default one and has to be explicitly passed by the user. Using this value will calculate remaining time of this actor run and pass it as a timeout to the actor that is being started. ### Issues - Closes: #472 --------- Co-authored-by: Vlada Dusek <[email protected]>
1 parent 2d6d303 commit ba7f757

File tree

2 files changed

+144
-7
lines changed

2 files changed

+144
-7
lines changed

src/apify/_actor.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import sys
66
from contextlib import suppress
7-
from datetime import timedelta
7+
from datetime import datetime, timedelta, timezone
88
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar, cast, overload
99

1010
from lazy_object_proxy import Proxy
@@ -693,7 +693,7 @@ async def start(
693693
content_type: str | None = None,
694694
build: str | None = None,
695695
memory_mbytes: int | None = None,
696-
timeout: timedelta | None = None,
696+
timeout: timedelta | None | Literal['RemainingTime'] = None,
697697
wait_for_finish: int | None = None,
698698
webhooks: list[Webhook] | None = None,
699699
) -> ActorRun:
@@ -711,7 +711,8 @@ async def start(
711711
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
712712
in the default run configuration for the Actor.
713713
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
714-
the default run configuration for the Actor.
714+
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
715+
to the time remaining from this Actor timeout.
715716
wait_for_finish: The maximum number of seconds the server waits for the run to finish. By default,
716717
it is 0, the maximum value is 300.
717718
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with
@@ -732,18 +733,39 @@ async def start(
732733
else:
733734
serialized_webhooks = None
734735

736+
if timeout == 'RemainingTime':
737+
actor_start_timeout = self._get_remaining_time()
738+
elif timeout is None:
739+
actor_start_timeout = None
740+
elif isinstance(timeout, timedelta):
741+
actor_start_timeout = timeout
742+
else:
743+
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')
744+
735745
api_result = await client.actor(actor_id).start(
736746
run_input=run_input,
737747
content_type=content_type,
738748
build=build,
739749
memory_mbytes=memory_mbytes,
740-
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
750+
timeout_secs=int(actor_start_timeout.total_seconds()) if actor_start_timeout is not None else None,
741751
wait_for_finish=wait_for_finish,
742752
webhooks=serialized_webhooks,
743753
)
744754

745755
return ActorRun.model_validate(api_result)
746756

757+
def _get_remaining_time(self) -> timedelta | None:
758+
"""Get time remaining from the actor timeout. Returns `None` if not on an Apify platform."""
759+
if self.is_at_home() and self.configuration.timeout_at:
760+
return self.configuration.timeout_at - datetime.now(tz=timezone.utc)
761+
762+
self.log.warning(
763+
'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor'
764+
' is running on the Apify platform and when the timeout for the Actor run is set. '
765+
f'{self.is_at_home()=}, {self.configuration.timeout_at=}'
766+
)
767+
return None
768+
747769
async def abort(
748770
self,
749771
run_id: str,
@@ -787,7 +809,7 @@ async def call(
787809
content_type: str | None = None,
788810
build: str | None = None,
789811
memory_mbytes: int | None = None,
790-
timeout: timedelta | None = None,
812+
timeout: timedelta | None | Literal['RemainingTime'] = None,
791813
webhooks: list[Webhook] | None = None,
792814
wait: timedelta | None = None,
793815
) -> ActorRun | None:
@@ -805,7 +827,8 @@ async def call(
805827
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
806828
in the default run configuration for the Actor.
807829
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
808-
the default run configuration for the Actor.
830+
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
831+
to the time remaining from this Actor timeout.
809832
webhooks: Optional webhooks (https://docs.apify.com/webhooks) associated with the Actor run, which can
810833
be used to receive a notification, e.g. when the Actor finished or failed. If you already have
811834
a webhook set up for the Actor, you do not have to add it again here.
@@ -826,12 +849,21 @@ async def call(
826849
else:
827850
serialized_webhooks = None
828851

852+
if timeout == 'RemainingTime':
853+
actor_call_timeout = self._get_remaining_time()
854+
elif timeout is None:
855+
actor_call_timeout = None
856+
elif isinstance(timeout, timedelta):
857+
actor_call_timeout = timeout
858+
else:
859+
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')
860+
829861
api_result = await client.actor(actor_id).call(
830862
run_input=run_input,
831863
content_type=content_type,
832864
build=build,
833865
memory_mbytes=memory_mbytes,
834-
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
866+
timeout_secs=int(actor_call_timeout.total_seconds()) if actor_call_timeout is not None else None,
835867
webhooks=serialized_webhooks,
836868
wait_secs=int(wait.total_seconds()) if wait is not None else None,
837869
)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import TYPE_CHECKING
5+
6+
from apify import Actor
7+
8+
if TYPE_CHECKING:
9+
from .conftest import MakeActorFunction, RunActorFunction
10+
11+
12+
async def test_actor_start_remaining_timeout(
13+
make_actor: MakeActorFunction,
14+
run_actor: RunActorFunction,
15+
) -> None:
16+
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.
17+
18+
In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
19+
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor start."""
20+
21+
async def main() -> None:
22+
from datetime import datetime, timezone
23+
24+
async with Actor:
25+
actor_input = (await Actor.get_input()) or {}
26+
if actor_input.get('called_from_another_actor', False) is True:
27+
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
28+
# Asserts checking the timeout are in the first Actor run.
29+
return
30+
31+
# Start another run of this actor with timeout set to the time remaining in this actor run
32+
other_run_data = await Actor.call(
33+
actor_id=Actor.configuration.actor_id or '',
34+
run_input={'called_from_another_actor': True},
35+
timeout='RemainingTime',
36+
)
37+
assert other_run_data is not None
38+
try:
39+
# To make sure that the actor is started
40+
await asyncio.sleep(5)
41+
assert other_run_data.options is not None
42+
assert Actor.configuration.timeout_at is not None
43+
assert Actor.configuration.started_at is not None
44+
45+
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)
46+
47+
assert other_run_data.options.timeout > remaining_time_after_actor_start
48+
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
49+
finally:
50+
# Make sure the other actor run is aborted
51+
await Actor.apify_client.run(other_run_data.id).abort()
52+
53+
actor = await make_actor(label='remaining-timeout', main_func=main)
54+
run_result = await run_actor(actor)
55+
56+
assert run_result.status == 'SUCCEEDED'
57+
58+
59+
async def test_actor_call_remaining_timeout(
60+
make_actor: MakeActorFunction,
61+
run_actor: RunActorFunction,
62+
) -> None:
63+
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.
64+
65+
In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
66+
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor call."""
67+
68+
async def main() -> None:
69+
from datetime import datetime, timezone
70+
71+
async with Actor:
72+
actor_input = (await Actor.get_input()) or {}
73+
if actor_input.get('called_from_another_actor', False) is True:
74+
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
75+
# Asserts checking the timeout are in the first Actor run.
76+
return
77+
78+
# Start another run of this actor with timeout set to the time remaining in this actor run
79+
other_run_data = await Actor.call(
80+
actor_id=Actor.configuration.actor_id or '',
81+
run_input={'called_from_another_actor': True},
82+
timeout='RemainingTime',
83+
)
84+
85+
assert other_run_data is not None
86+
try:
87+
# To make sure that the actor is started
88+
await asyncio.sleep(5)
89+
90+
assert other_run_data.options is not None
91+
assert Actor.configuration.timeout_at is not None
92+
assert Actor.configuration.started_at is not None
93+
94+
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)
95+
96+
assert other_run_data.options.timeout > remaining_time_after_actor_start
97+
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
98+
finally:
99+
# Make sure the other actor run is aborted
100+
await Actor.apify_client.run(other_run_data.id).abort()
101+
102+
actor = await make_actor(label='remaining-timeout', main_func=main)
103+
run_result = await run_actor(actor)
104+
105+
assert run_result.status == 'SUCCEEDED'

0 commit comments

Comments
 (0)