diff --git a/src/apify_client/clients/base/actor_job_base_client.py b/src/apify_client/clients/base/actor_job_base_client.py index abf7e274..4e401684 100644 --- a/src/apify_client/clients/base/actor_job_base_client.py +++ b/src/apify_client/clients/base/actor_job_base_client.py @@ -4,6 +4,7 @@ import math import time from datetime import datetime, timezone +from typing import Any, Protocol from apify_shared.consts import ActorJobStatus @@ -20,7 +21,9 @@ class ActorJobBaseClient(ResourceClient): """Base sub-client class for Actor runs and Actor builds.""" - def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None: + def _wait_for_finish( + self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None + ) -> dict | None: started_at = datetime.now(timezone.utc) should_repeat = True job: dict | None = None @@ -39,6 +42,9 @@ def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None: ) job = parse_date_fields(pluck_data(response.json())) + if response_watcher: + response_watcher(job) + seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds()) if ActorJobStatus(job['status']).is_terminal or ( wait_secs is not None and seconds_elapsed >= wait_secs @@ -74,7 +80,9 @@ def _abort(self, *, gracefully: bool | None = None) -> dict: class ActorJobBaseClientAsync(ResourceClientAsync): """Base async sub-client class for Actor runs and Actor builds.""" - async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None: + async def _wait_for_finish( + self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None + ) -> dict | None: started_at = datetime.now(timezone.utc) should_repeat = True job: dict | None = None @@ -93,6 +101,9 @@ async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None: ) job = parse_date_fields(pluck_data(response.json())) + if response_watcher: + response_watcher(job) + seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds()) if ActorJobStatus(job['status']).is_terminal or ( wait_secs is not None and seconds_elapsed >= wait_secs @@ -123,3 +134,10 @@ async def _abort(self, *, gracefully: bool | None = None) -> dict: params=self._params(gracefully=gracefully), ) return parse_date_fields(pluck_data(response.json())) + + +class ResponseWatcher(Protocol): + """Protocol for a callable watching parsed responses from wait_for_finish methods.""" + + def __call__(self, response: dict[str, Any] | None) -> Any: + """Watch the parsed response and act if needed.""" diff --git a/src/apify_client/clients/resource_clients/actor.py b/src/apify_client/clients/resource_clients/actor.py index 28dd62db..9111d8e0 100644 --- a/src/apify_client/clients/resource_clients/actor.py +++ b/src/apify_client/clients/resource_clients/actor.py @@ -358,8 +358,12 @@ def call( if logger == 'default': logger = None - with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger): - return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) + status_watcher = run_client.get_status_message_watcher(to_logger=logger) + + with run_client.get_streamed_log(to_logger=logger): + return self.root_client.run(started_run['id']).wait_for_finish( + wait_secs=wait_secs, response_watcher=status_watcher.log_run_data + ) def build( self, @@ -779,11 +783,13 @@ async def call( if logger == 'default': logger = None - status_redirector = await run_client.get_status_message_watcher(to_logger=logger) + status_watcher = await run_client.get_status_message_watcher(to_logger=logger) streamed_log = await run_client.get_streamed_log(to_logger=logger) - async with status_redirector, streamed_log: - return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) + async with streamed_log: + return await self.root_client.run(started_run['id']).wait_for_finish( + wait_secs=wait_secs, response_watcher=status_watcher.log_run_data + ) async def build( self, diff --git a/src/apify_client/clients/resource_clients/log.py b/src/apify_client/clients/resource_clients/log.py index 759ae1fd..39fb6c50 100644 --- a/src/apify_client/clients/resource_clients/log.py +++ b/src/apify_client/clients/resource_clients/log.py @@ -404,25 +404,25 @@ def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timed self._check_period = check_period.total_seconds() self._last_status_message = '' - def _log_run_data(self, run_data: dict[str, Any] | None) -> bool: + def log_run_data(self, response: dict[str, Any] | None) -> bool: """Get relevant run data, log them if changed and return `True` if more data is expected. Args: - run_data: The dictionary that contains the run data. + response: The dictionary that contains the parsed run data. Returns: `True` if more data is expected, `False` otherwise. """ - if run_data is not None: - status = run_data.get('status', 'Unknown status') - status_message = run_data.get('statusMessage', '') + if response is not None: + status = response.get('status', 'Unknown status') + status_message = response.get('statusMessage', '') new_status_message = f'Status: {status}, Message: {status_message}' if new_status_message != self._last_status_message: self._last_status_message = new_status_message self._to_logger.info(new_status_message) - return not (run_data.get('isStatusMessageTerminal', False)) + return not (response.get('isStatusMessageTerminal', False)) return True @@ -476,7 +476,7 @@ async def __aexit__( async def _log_changed_status_message(self) -> None: while True: run_data = await self._run_client.get() - if not self._log_run_data(run_data): + if not self.log_run_data(run_data): break await asyncio.sleep(self._check_period) @@ -531,7 +531,7 @@ def __exit__( def _log_changed_status_message(self) -> None: while True: - if not self._log_run_data(self._run_client.get()): + if not self.log_run_data(self._run_client.get()): break if self._stop_logging: break diff --git a/src/apify_client/clients/resource_clients/run.py b/src/apify_client/clients/resource_clients/run.py index d934c01b..abbb20df 100644 --- a/src/apify_client/clients/resource_clients/run.py +++ b/src/apify_client/clients/resource_clients/run.py @@ -35,6 +35,8 @@ from apify_shared.consts import RunGeneralAccess + from apify_client.clients.base.actor_job_base_client import ResponseWatcher + class RunClient(ActorJobBaseClient): """Sub-client for manipulating a single Actor run.""" @@ -102,17 +104,20 @@ def abort(self, *, gracefully: bool | None = None) -> dict: """ return self._abort(gracefully=gracefully) - def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None: + def wait_for_finish( + self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None + ) -> dict | None: """Wait synchronously until the run finishes or the server times out. Args: wait_secs: How long does the client wait for run to finish. None for indefinite. + response_watcher: A callback function that will be called with parsed Actor run data. Returns: The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED, TIMED_OUT, ABORTED), then the run has not yet finished. """ - return self._wait_for_finish(wait_secs=wait_secs) + return self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher) def metamorph( self, @@ -417,17 +422,20 @@ async def abort(self, *, gracefully: bool | None = None) -> dict: """ return await self._abort(gracefully=gracefully) - async def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None: + async def wait_for_finish( + self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None + ) -> dict | None: """Wait synchronously until the run finishes or the server times out. Args: wait_secs: How long does the client wait for run to finish. None for indefinite. + response_watcher: A callback function that will be called with parsed Actor run data. Returns: The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED, TIMED_OUT, ABORTED), then the run has not yet finished. """ - return await self._wait_for_finish(wait_secs=wait_secs) + return await self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher) async def delete(self) -> None: """Delete the run.