-
Notifications
You must be signed in to change notification settings - Fork 14
feat: Add StatusMessageWatcher
#407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
a71ae41
69ff84c
862cacc
753427a
cc0d944
cbcabd3
81577e8
b9bc44d
9720327
85ead2f
4ad39fa
74595f9
cba571f
02a1eb2
2674cf2
2a6f2ec
1263450
b1338f1
669a749
737cde9
2914e50
8fbbffa
8e70e59
a3a629e
18f4f51
268e568
335b8c3
1e5e976
350fc67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,13 +4,15 @@ | |
| import logging | ||
| import re | ||
| import threading | ||
| import time | ||
| from asyncio import Task | ||
| from contextlib import asynccontextmanager, contextmanager | ||
| from datetime import datetime, timezone | ||
| from datetime import datetime, timedelta, timezone | ||
| from threading import Thread | ||
| from typing import TYPE_CHECKING, Any, cast | ||
|
|
||
| from apify_shared.utils import ignore_docs | ||
| from typing_extensions import Self | ||
|
|
||
| from apify_client._errors import ApifyApiError | ||
| from apify_client._utils import catch_not_found_or_throw | ||
|
|
@@ -23,6 +25,8 @@ | |
| import httpx | ||
| from typing_extensions import Self | ||
|
|
||
| from apify_client.clients import RunClient, RunClientAsync | ||
|
|
||
|
|
||
| class LogClient(ResourceClient): | ||
| """Sub-client for manipulating logs.""" | ||
|
|
@@ -228,9 +232,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non | |
| logs for long-running actors in stand-by. | ||
|
|
||
| """ | ||
| self._to_logger = to_logger | ||
| if self._force_propagate: | ||
| to_logger.propagate = True | ||
| self._to_logger = to_logger | ||
| self._stream_buffer = list[bytes]() | ||
| self._split_marker = re.compile(rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)') | ||
| self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc) | ||
|
|
@@ -378,3 +382,160 @@ async def _stream_log(self) -> None: | |
|
|
||
| # If the stream is finished, then the last part will be also processed. | ||
| self._log_buffer_content(include_last_part=True) | ||
|
|
||
|
|
||
| class StatusMessageWatcher: | ||
| """Utility class for logging status messages from another Actor run. | ||
|
|
||
| Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged, | ||
| especially in cases of frequent status message changes. | ||
| """ | ||
|
|
||
| _force_propagate = False | ||
| # This is final sleep time to try to get the last status and status message of finished Actor run. | ||
| # The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the | ||
| # final message will be captured, but increases the chances of that. | ||
| _final_sleep_time_s = 6 | ||
|
|
||
| def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None: | ||
| """Initialize `StatusMessageWatcher`. | ||
|
|
||
| Args: | ||
| to_logger: The logger to which the status message will be redirected. | ||
| check_period: The period with which the status message will be polled. | ||
| """ | ||
| if self._force_propagate: | ||
| to_logger.propagate = True | ||
| self._to_logger = to_logger | ||
| self._check_period = check_period.total_seconds() | ||
| self._last_status_message = '' | ||
|
|
||
| def _log_run_data(self, run_data: dict[str, Any] | None) -> bool: | ||
| """Get relevant run data, log them if changed and return `True` if more data is expected. | ||
|
|
||
| Args: | ||
| run_data: The dictionary that contains the run data. | ||
|
|
||
| Returns: | ||
| `True` if more data is expected, `False` otherwise. | ||
| """ | ||
| if run_data is not None: | ||
| status = run_data.get('status', 'Unknown status') | ||
| status_message = run_data.get('statusMessage', '') | ||
| new_status_message = f'Status: {status}, Message: {status_message}' | ||
|
|
||
| if new_status_message != self._last_status_message: | ||
| self._last_status_message = new_status_message | ||
| self._to_logger.info(new_status_message) | ||
|
|
||
| return not (run_data.get('isStatusMessageTerminal', False)) | ||
| return True | ||
|
|
||
|
|
||
| class StatusMessageWatcherAsync(StatusMessageWatcher): | ||
| """Async variant of `StatusMessageWatcher` that is logging in task.""" | ||
|
|
||
| def __init__( | ||
| self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1) | ||
| ) -> None: | ||
| """Initialize `StatusMessageWatcherAsync`. | ||
|
|
||
| Args: | ||
| run_client: The client for run that will be used to get a status and message. | ||
| to_logger: The logger to which the status message will be redirected. | ||
| check_period: The period with which the status message will be polled. | ||
| """ | ||
| super().__init__(to_logger=to_logger, check_period=check_period) | ||
| self._run_client = run_client | ||
| self._logging_task: Task | None = None | ||
|
|
||
| def start(self) -> Task: | ||
| """Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method.""" | ||
| if self._logging_task: | ||
| raise RuntimeError('Logging task already active') | ||
| self._logging_task = asyncio.create_task(self._log_changed_status_message()) | ||
| return self._logging_task | ||
|
|
||
| def stop(self) -> None: | ||
| """Stop the logging task.""" | ||
| if not self._logging_task: | ||
| raise RuntimeError('Logging task is not active') | ||
|
|
||
| self._logging_task.cancel() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm afraid there might be GC-related warnings if you don't await the task (docs)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, added |
||
| self._logging_task = None | ||
|
|
||
| async def __aenter__(self) -> Self: | ||
| """Start the logging task within the context. Exiting the context will cancel the logging task.""" | ||
| self.start() | ||
| return self | ||
|
|
||
| async def __aexit__( | ||
| self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None | ||
| ) -> None: | ||
| """Cancel the logging task.""" | ||
| await asyncio.sleep(self._final_sleep_time_s) | ||
| self.stop() | ||
|
||
|
|
||
| async def _log_changed_status_message(self) -> None: | ||
| while True: | ||
| run_data = await self._run_client.get() | ||
| if not self._log_run_data(run_data): | ||
| break | ||
| await asyncio.sleep(self._check_period) | ||
|
|
||
|
|
||
| class StatusMessageWatcherSync(StatusMessageWatcher): | ||
| """Sync variant of `StatusMessageWatcher` that is logging in thread.""" | ||
|
|
||
| def __init__( | ||
| self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1) | ||
| ) -> None: | ||
| """Initialize `StatusMessageWatcherSync`. | ||
|
|
||
| Args: | ||
| run_client: The client for run that will be used to get a status and message. | ||
| to_logger: The logger to which the status message will be redirected. | ||
| check_period: The period with which the status message will be polled. | ||
| """ | ||
| super().__init__(to_logger=to_logger, check_period=check_period) | ||
| self._run_client = run_client | ||
| self._logging_thread: Thread | None = None | ||
| self._stop_logging = False | ||
|
|
||
| def start(self) -> Thread: | ||
| """Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method.""" | ||
| if self._logging_thread: | ||
| raise RuntimeError('Logging thread already active') | ||
| self._stop_logging = False | ||
| self._logging_thread = threading.Thread(target=self._log_changed_status_message) | ||
| self._logging_thread.start() | ||
| return self._logging_thread | ||
|
|
||
| def stop(self) -> None: | ||
| """Signal the _logging_thread thread to stop logging and wait for it to finish.""" | ||
| if not self._logging_thread: | ||
| raise RuntimeError('Logging thread is not active') | ||
| time.sleep(self._final_sleep_time_s) | ||
| self._stop_logging = True | ||
| self._logging_thread.join() | ||
| self._logging_thread = None | ||
| self._stop_logging = False | ||
|
|
||
| def __enter__(self) -> Self: | ||
| """Start the logging task within the context. Exiting the context will cancel the logging task.""" | ||
| self.start() | ||
| return self | ||
|
|
||
| def __exit__( | ||
| self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None | ||
| ) -> None: | ||
| """Cancel the logging task.""" | ||
| self.stop() | ||
|
|
||
| def _log_changed_status_message(self) -> None: | ||
| while True: | ||
| if not self._log_run_data(self._run_client.get()): | ||
| break | ||
| if self._stop_logging: | ||
| break | ||
| time.sleep(self._check_period) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these new classes be exposed publicly? They seem like implementation details to me - you usually create these using helper methods on the resource client, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper methods on clients are convenient constructors for these classes, but the user will interact with them directly calling either
start,closeor using them as context managers.(From
ActorClientpoint of view this is indeed implementation detail hidden in thecallmethod, but fromRunClientpoint of view it is actual public return value of one of the public method.)