Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/apify_client/clients/base/actor_job_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import time
from datetime import datetime, timezone
from typing import Any, Protocol

from apify_shared.consts import ActorJobStatus

Expand All @@ -20,7 +21,9 @@
class ActorJobBaseClient(ResourceClient):
"""Base sub-client class for Actor runs and Actor builds."""

def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
def _wait_for_finish(
self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
) -> dict | None:
started_at = datetime.now(timezone.utc)
should_repeat = True
job: dict | None = None
Expand All @@ -39,6 +42,9 @@ def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
)
job = parse_date_fields(pluck_data(response.json()))

if response_watcher:
response_watcher(job)

seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
if ActorJobStatus(job['status']).is_terminal or (
wait_secs is not None and seconds_elapsed >= wait_secs
Expand Down Expand Up @@ -74,7 +80,9 @@ def _abort(self, *, gracefully: bool | None = None) -> dict:
class ActorJobBaseClientAsync(ResourceClientAsync):
"""Base async sub-client class for Actor runs and Actor builds."""

async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
async def _wait_for_finish(
self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
) -> dict | None:
started_at = datetime.now(timezone.utc)
should_repeat = True
job: dict | None = None
Expand All @@ -93,6 +101,9 @@ async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
)
job = parse_date_fields(pluck_data(response.json()))

if response_watcher:
response_watcher(job)

seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
if ActorJobStatus(job['status']).is_terminal or (
wait_secs is not None and seconds_elapsed >= wait_secs
Expand Down Expand Up @@ -123,3 +134,10 @@ async def _abort(self, *, gracefully: bool | None = None) -> dict:
params=self._params(gracefully=gracefully),
)
return parse_date_fields(pluck_data(response.json()))


class ResponseWatcher(Protocol):
"""Protocol for a callable watching parsed responses from wait_for_finish methods."""

def __call__(self, response: dict[str, Any] | None) -> Any:
"""Watch the parsed response and act if needed."""
16 changes: 11 additions & 5 deletions src/apify_client/clients/resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,12 @@ def call(
if logger == 'default':
logger = None

with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
status_watcher = run_client.get_status_message_watcher(to_logger=logger)

with run_client.get_streamed_log(to_logger=logger):
return self.root_client.run(started_run['id']).wait_for_finish(
wait_secs=wait_secs, response_watcher=status_watcher.log_run_data
)

def build(
self,
Expand Down Expand Up @@ -779,11 +783,13 @@ async def call(
if logger == 'default':
logger = None

status_redirector = await run_client.get_status_message_watcher(to_logger=logger)
status_watcher = await run_client.get_status_message_watcher(to_logger=logger)
streamed_log = await run_client.get_streamed_log(to_logger=logger)

async with status_redirector, streamed_log:
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
async with streamed_log:
return await self.root_client.run(started_run['id']).wait_for_finish(
wait_secs=wait_secs, response_watcher=status_watcher.log_run_data
)

async def build(
self,
Expand Down
16 changes: 8 additions & 8 deletions src/apify_client/clients/resource_clients/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,25 +404,25 @@ def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timed
self._check_period = check_period.total_seconds()
self._last_status_message = ''

def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
def log_run_data(self, response: dict[str, Any] | None) -> bool:
"""Get relevant run data, log them if changed and return `True` if more data is expected.

Args:
run_data: The dictionary that contains the run data.
response: The dictionary that contains the parsed run data.

Returns:
`True` if more data is expected, `False` otherwise.
"""
if run_data is not None:
status = run_data.get('status', 'Unknown status')
status_message = run_data.get('statusMessage', '')
if response is not None:
status = response.get('status', 'Unknown status')
status_message = response.get('statusMessage', '')
new_status_message = f'Status: {status}, Message: {status_message}'

if new_status_message != self._last_status_message:
self._last_status_message = new_status_message
self._to_logger.info(new_status_message)

return not (run_data.get('isStatusMessageTerminal', False))
return not (response.get('isStatusMessageTerminal', False))
return True


Expand Down Expand Up @@ -476,7 +476,7 @@ async def __aexit__(
async def _log_changed_status_message(self) -> None:
while True:
run_data = await self._run_client.get()
if not self._log_run_data(run_data):
if not self.log_run_data(run_data):
break
await asyncio.sleep(self._check_period)

Expand Down Expand Up @@ -531,7 +531,7 @@ def __exit__(

def _log_changed_status_message(self) -> None:
while True:
if not self._log_run_data(self._run_client.get()):
if not self.log_run_data(self._run_client.get()):
break
if self._stop_logging:
break
Expand Down
16 changes: 12 additions & 4 deletions src/apify_client/clients/resource_clients/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

from apify_shared.consts import RunGeneralAccess

from apify_client.clients.base.actor_job_base_client import ResponseWatcher


class RunClient(ActorJobBaseClient):
"""Sub-client for manipulating a single Actor run."""
Expand Down Expand Up @@ -102,17 +104,20 @@ def abort(self, *, gracefully: bool | None = None) -> dict:
"""
return self._abort(gracefully=gracefully)

def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
def wait_for_finish(
self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
) -> dict | None:
"""Wait synchronously until the run finishes or the server times out.

Args:
wait_secs: How long does the client wait for run to finish. None for indefinite.
response_watcher: A callback function that will be called with parsed Actor run data.

Returns:
The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED,
TIMED_OUT, ABORTED), then the run has not yet finished.
"""
return self._wait_for_finish(wait_secs=wait_secs)
return self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher)

def metamorph(
self,
Expand Down Expand Up @@ -417,17 +422,20 @@ async def abort(self, *, gracefully: bool | None = None) -> dict:
"""
return await self._abort(gracefully=gracefully)

async def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
async def wait_for_finish(
self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
) -> dict | None:
"""Wait synchronously until the run finishes or the server times out.

Args:
wait_secs: How long does the client wait for run to finish. None for indefinite.
response_watcher: A callback function that will be called with parsed Actor run data.

Returns:
The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED,
TIMED_OUT, ABORTED), then the run has not yet finished.
"""
return await self._wait_for_finish(wait_secs=wait_secs)
return await self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher)

async def delete(self) -> None:
"""Delete the run.
Expand Down
Loading