-
Notifications
You must be signed in to change notification settings - Fork 15
feat: Add RemainingTime
option for timeout
argument of Actor.call
and Actor.start
#473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
a992dcb
b832919
74de324
b0cc5a7
c11b5d7
7202c17
05eed5f
3586632
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
import os | ||
import sys | ||
from contextlib import suppress | ||
from datetime import timedelta | ||
from datetime import datetime, timedelta, timezone | ||
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar, cast, overload | ||
|
||
from lazy_object_proxy import Proxy | ||
|
@@ -693,7 +693,7 @@ async def start( | |
content_type: str | None = None, | ||
build: str | None = None, | ||
memory_mbytes: int | None = None, | ||
timeout: timedelta | None = None, | ||
timeout: timedelta | None | Literal['RemainingTime'] = None, | ||
wait_for_finish: int | None = None, | ||
webhooks: list[Webhook] | None = None, | ||
) -> ActorRun: | ||
|
@@ -711,7 +711,8 @@ async def start( | |
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified | ||
in the default run configuration for the Actor. | ||
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in | ||
the default run configuration for the Actor. | ||
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other actor | ||
to the time remaining from this actor timeout. | ||
wait_for_finish: The maximum number of seconds the server waits for the run to finish. By default, | ||
it is 0, the maximum value is 300. | ||
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with | ||
|
@@ -732,18 +733,35 @@ async def start( | |
else: | ||
serialized_webhooks = None | ||
|
||
if timeout == 'RemainingTime': | ||
actor_start_timeout = self._get_remaining_time() | ||
elif isinstance(timeout, str): | ||
raise ValueError( | ||
f'`timeout` can be `None`, `RemainingTime` literal or `timedelta` instance, but is {timeout=}' | ||
) | ||
else: | ||
actor_start_timeout = timeout | ||
Pijukatel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
api_result = await client.actor(actor_id).start( | ||
run_input=run_input, | ||
content_type=content_type, | ||
build=build, | ||
memory_mbytes=memory_mbytes, | ||
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None, | ||
timeout_secs=int(actor_start_timeout.total_seconds()) if actor_start_timeout is not None else None, | ||
wait_for_finish=wait_for_finish, | ||
webhooks=serialized_webhooks, | ||
) | ||
|
||
return ActorRun.model_validate(api_result) | ||
|
||
def _get_remaining_time(self) -> timedelta | None: | ||
"""Get time remaining from the actor timeout. Returns `None` if not on an Apify platform.""" | ||
if self.is_at_home() and self.configuration.timeout_at: | ||
return self.configuration.timeout_at - datetime.now(tz=timezone.utc) | ||
|
||
self.log.warning('Using `RemainingTime` argument for timeout outside of the Apify platform. Returning `None`') | ||
|
||
return None | ||
|
||
async def abort( | ||
self, | ||
run_id: str, | ||
|
@@ -787,7 +805,7 @@ async def call( | |
content_type: str | None = None, | ||
build: str | None = None, | ||
memory_mbytes: int | None = None, | ||
timeout: timedelta | None = None, | ||
timeout: timedelta | None | Literal['RemainingTime'] = None, | ||
webhooks: list[Webhook] | None = None, | ||
wait: timedelta | None = None, | ||
) -> ActorRun | None: | ||
|
@@ -805,7 +823,8 @@ async def call( | |
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified | ||
in the default run configuration for the Actor. | ||
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in | ||
the default run configuration for the Actor. | ||
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other actor | ||
to the time remaining from this actor timeout. | ||
webhooks: Optional webhooks (https://docs.apify.com/webhooks) associated with the Actor run, which can | ||
be used to receive a notification, e.g. when the Actor finished or failed. If you already have | ||
a webhook set up for the Actor, you do not have to add it again here. | ||
|
@@ -826,12 +845,21 @@ async def call( | |
else: | ||
serialized_webhooks = None | ||
|
||
if timeout == 'RemainingTime': | ||
actor_call_timeout = self._get_remaining_time() | ||
elif isinstance(timeout, str): | ||
raise ValueError( | ||
f'`timeout` can be `None`, `RemainingTime` literal or `timedelta` instance, but is {timeout=}' | ||
) | ||
else: | ||
actor_call_timeout = timeout | ||
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
api_result = await client.actor(actor_id).call( | ||
run_input=run_input, | ||
content_type=content_type, | ||
build=build, | ||
memory_mbytes=memory_mbytes, | ||
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None, | ||
timeout_secs=int(actor_call_timeout.total_seconds()) if actor_call_timeout is not None else None, | ||
webhooks=serialized_webhooks, | ||
wait_secs=int(wait.total_seconds()) if wait is not None else None, | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
from typing import TYPE_CHECKING | ||
|
||
from apify import Actor | ||
|
||
if TYPE_CHECKING: | ||
from .conftest import MakeActorFunction, RunActorFunction | ||
|
||
|
||
async def test_actor_start_remaining_timeout( | ||
make_actor: MakeActorFunction, | ||
run_actor: RunActorFunction, | ||
) -> None: | ||
async def main() -> None: | ||
from datetime import datetime, timezone | ||
|
||
async with Actor: | ||
actor_input = (await Actor.get_input()) or {} | ||
if actor_input.get('called_from_another_actor', False) is True: | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the Actor finishes immediately and you just check that the timeout was configured correctly? I agree that it's fine to trust the platform. Could you add a comment here that explains that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added more comments |
||
|
||
# Start another run of this actor with timeout set to the time remaining in this actor run | ||
other_run_data = await Actor.call( | ||
actor_id=Actor.configuration.actor_id or '', | ||
run_input={'called_from_another_actor': True}, | ||
timeout='RemainingTime', | ||
) | ||
assert other_run_data is not None | ||
try: | ||
# To make sure that the actor is started | ||
await asyncio.sleep(5) | ||
assert other_run_data.options is not None | ||
assert Actor.configuration.timeout_at is not None | ||
assert Actor.configuration.started_at is not None | ||
|
||
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc) | ||
|
||
assert other_run_data.options.timeout > remaining_time_after_actor_start | ||
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at | ||
finally: | ||
# Make sure the other actor run is aborted | ||
await Actor.apify_client.run(other_run_data.id).abort() | ||
|
||
actor = await make_actor(label='remaining-timeout', main_func=main) | ||
run_result = await run_actor(actor) | ||
|
||
assert run_result.status == 'SUCCEEDED' | ||
|
||
|
||
async def test_actor_call_remaining_timeout( | ||
make_actor: MakeActorFunction, | ||
run_actor: RunActorFunction, | ||
) -> None: | ||
async def main() -> None: | ||
from datetime import datetime, timezone | ||
|
||
async with Actor: | ||
actor_input = (await Actor.get_input()) or {} | ||
if actor_input.get('called_from_another_actor', False) is True: | ||
return | ||
|
||
await asyncio.sleep(1) | ||
|
||
# Start another run of this actor with timeout set to the time remaining in this actor run | ||
other_run_data = await Actor.call( | ||
actor_id=Actor.configuration.actor_id or '', | ||
run_input={'called_from_another_actor': True}, | ||
timeout='RemainingTime', | ||
) | ||
|
||
assert other_run_data is not None | ||
try: | ||
# To make sure that the actor is started | ||
await asyncio.sleep(5) | ||
|
||
assert other_run_data.options is not None | ||
assert Actor.configuration.timeout_at is not None | ||
assert Actor.configuration.started_at is not None | ||
|
||
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc) | ||
|
||
assert other_run_data.options.timeout > remaining_time_after_actor_start | ||
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at | ||
finally: | ||
# Make sure the other actor run is aborted | ||
await Actor.apify_client.run(other_run_data.id).abort() | ||
|
||
actor = await make_actor(label='remaining-timeout', main_func=main) | ||
run_result = await run_actor(actor) | ||
|
||
assert run_result.status == 'SUCCEEDED' |
Uh oh!
There was an error while loading. Please reload this page.