Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a71ae41
TODO: Figure out hwo to mock response with steram
Pijukatel May 12, 2025
69ff84c
WIP
Pijukatel May 12, 2025
862cacc
Polish spliting of messages and setting the log level
Pijukatel May 13, 2025
753427a
Draft with async implementation and example tests
Pijukatel May 13, 2025
cc0d944
Add `raw=True`
Pijukatel May 14, 2025
cbcabd3
Add chunck processing
Pijukatel May 14, 2025
81577e8
Merge remote-tracking branch 'origin/master' into redirected-actor-logs
Pijukatel May 14, 2025
b9bc44d
Add sync version of the logging.
Pijukatel May 14, 2025
9720327
Finalize, update comments
Pijukatel May 14, 2025
85ead2f
Add `from_start` argument for streaming from stand-by actors
Pijukatel May 15, 2025
4ad39fa
Skip first logs based on datetime of the marker
Pijukatel May 15, 2025
74595f9
Self review.
Pijukatel May 15, 2025
cba571f
Handle bytestream edgecase of chunk containing only half of the multi…
Pijukatel May 15, 2025
02a1eb2
Review comments
Pijukatel May 15, 2025
2674cf2
Remove unnecessary `actor_name` argument
Pijukatel May 16, 2025
2a6f2ec
Update split pattern to deal with multiple times redirected log
Pijukatel May 16, 2025
1263450
Review comment
Pijukatel May 16, 2025
b1338f1
Regenerate `uv.lock` with new version of `uv`
Pijukatel May 16, 2025
669a749
Test data time alignment.
Pijukatel May 16, 2025
737cde9
Add status redirector
Pijukatel May 19, 2025
2914e50
TODO: Finalize tests
Pijukatel May 19, 2025
8fbbffa
Finalize tests.
Pijukatel May 20, 2025
8e70e59
Merge remote-tracking branch 'origin/master' into redirect-status-mes…
Pijukatel May 20, 2025
a3a629e
Update syntax to avoid https://github.com/PyCQA/redbaron/issues/212
Pijukatel May 21, 2025
18f4f51
Update client names in tests to match their type
Pijukatel May 21, 2025
268e568
Review comments
Pijukatel May 28, 2025
335b8c3
Properly set _force_propagate
Pijukatel May 28, 2025
1e5e976
Use whitespace in default redirect logger name instead of `-`
Pijukatel Jun 2, 2025
350fc67
Review comments
Pijukatel Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions src/apify_client/clients/resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.

Returns:
The run object.
Expand All @@ -336,12 +337,11 @@ def call(
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = run_client.get_streamed_log()
else:
log_context = run_client.get_streamed_log(to_logger=logger)
logger = None

with log_context:
with run_client.get_status_message_redirector(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

def build(
Expand Down Expand Up @@ -722,7 +722,8 @@ async def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.

Returns:
The run object.
Expand All @@ -742,12 +743,14 @@ async def call(
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = await run_client.get_streamed_log()
else:
log_context = await run_client.get_streamed_log(to_logger=logger)
logger = None

status_redirector = await run_client.get_status_message_redirector(to_logger=logger)
streamed_log = await run_client.get_streamed_log(to_logger=logger)

async with log_context:
async with status_redirector, streamed_log:
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

async def build(
Expand Down
162 changes: 161 additions & 1 deletion src/apify_client/clients/resource_clients/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import logging
import re
import threading
import time
from asyncio import Task
from contextlib import asynccontextmanager, contextmanager
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from threading import Thread
from typing import TYPE_CHECKING, Any, cast

from apify_shared.utils import ignore_docs
from typing_extensions import Self

from apify_client._errors import ApifyApiError
from apify_client._utils import catch_not_found_or_throw
Expand All @@ -23,6 +25,8 @@
import httpx
from typing_extensions import Self

from apify_client.clients import RunClient, RunClientAsync


class LogClient(ResourceClient):
"""Sub-client for manipulating logs."""
Expand Down Expand Up @@ -378,3 +382,159 @@ async def _stream_log(self) -> None:

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)


class StatusMessageRedirector:
"""Utility class for logging status messages from another Actor run.

Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
especially in cases of frequent status message changes.
"""

_force_propagate = False
# This is final sleep time to try to get the last status and status message of finished Actor run.
# The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
# final message will be captured, but increases the chances of that.
_final_sleep_time_s = 6

def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
"""Initialize `StatusMessageRedirector`.

Args:
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
self._to_logger = to_logger
self._to_logger.propagate = self._force_propagate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not nice to silently reconfigure a logger passed in by an unsuspecting client 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It should have been reconfigured only if the internal _force_propagate is changed to True, which is only the test use-case for being able to use caplog.

self._check_period = check_period.total_seconds()
self._last_status_message = ''

def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
"""Get relevant run data, log them if changed and return `True` if more data is expected.

Args:
run_data: The dictionary that contains the run data.

Returns:
`True` if more data is expected, `False` otherwise.
"""
if run_data is not None:
status = run_data.get('status', 'Unknown status')
status_message = run_data.get('statusMessage', '')
new_status_message = f'Status: {status}, Message: {status_message}'

if new_status_message != self._last_status_message:
self._last_status_message = new_status_message
self._to_logger.info(new_status_message)

return not (run_data.get('isStatusMessageTerminal', False))
return True


class StatusMessageRedirectorAsync(StatusMessageRedirector):
"""Async variant of `StatusMessageRedirector` that is logging in task."""

def __init__(
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageRedirectorAsync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_task: Task | None = None

def start(self) -> Task:
"""Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_task:
raise RuntimeError('Logging task already active')
self._logging_task = asyncio.create_task(self._log_changed_status_message())
return self._logging_task

def stop(self) -> None:
"""Stop the logging task."""
if not self._logging_task:
raise RuntimeError('Logging task is not active')

self._logging_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid there might be GC-related warnings if you don't await the task (docs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added awaits

self._logging_task = None

async def __aenter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
await asyncio.sleep(self._final_sleep_time_s)
self.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to remove start/stop and implement __aenter__ / __aexit__ directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__aenter__ / __aexit__ are there as well for convenience, but start and stop is exposed for flexibility to make it possible for the users to call these methods outside of sometimes limiting context manager and without the need to call double-underscored methods directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of exposing additional start/stop methods just to provide an alternative way to manually control what the context manager takes care of. If a user really wants to call enter / exit methods directly, I think he can just do it directly, rather than introducing extra methods that just duplicate that logic. Not gonna block the merge though, up to you.


async def _log_changed_status_message(self) -> None:
while True:
run_data = await self._run_client.get()
if not self._log_run_data(run_data):
break
await asyncio.sleep(self._check_period)


class StatusMessageRedirectorSync(StatusMessageRedirector):
"""Sync variant of `StatusMessageRedirector` that is logging in thread."""

def __init__(
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageRedirectorSync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_thread: Thread | None = None
self._stop_logging = False

def start(self) -> Thread:
"""Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_thread:
raise RuntimeError('Logging thread already active')
self._stop_logging = False
self._logging_thread = threading.Thread(target=self._log_changed_status_message)
self._logging_thread.start()
return self._logging_thread

def stop(self) -> None:
"""Signal the _logging_thread thread to stop logging and wait for it to finish."""
if not self._logging_thread:
raise RuntimeError('Logging thread is not active')
time.sleep(self._final_sleep_time_s)
self._stop_logging = True
self._logging_thread.join()
self._logging_thread = None
self._stop_logging = False

def __enter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
self.stop()

def _log_changed_status_message(self) -> None:
while True:
if not self._log_run_data(self._run_client.get()):
break
if self._stop_logging:
break
time.sleep(self._check_period)
62 changes: 62 additions & 0 deletions src/apify_client/clients/resource_clients/run.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import json
import logging
import random
import string
import time
from datetime import timedelta
from typing import TYPE_CHECKING, Any

from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
Expand All @@ -16,6 +18,8 @@
from apify_client.clients.resource_clients.log import (
LogClient,
LogClientAsync,
StatusMessageRedirectorAsync,
StatusMessageRedirectorSync,
StreamedLogAsync,
StreamedLogSync,
)
Expand Down Expand Up @@ -318,6 +322,34 @@ def charge(
),
)

def get_status_message_redirector(
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1)
) -> StatusMessageRedirectorSync:
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.

`StatusMessageRedirector` can be directly called or used as a context manager.

Args:
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
created.
check_period: The period with which the status message will be polled.

Returns:
`StatusMessageRedirector` instance for redirected logs.
"""
run_data = self.get()
run_id = run_data.get('id', '') if run_data else ''

actor_id = run_data.get('actId', '') if run_data else ''
actor_data = self.root_client.actor(actor_id=actor_id).get() or {}
actor_name = actor_data.get('name', '') if run_data else ''

if not to_logger:
name = '-'.join(part for part in (actor_name, run_id) if part)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's a good idea to include the run id by default? The user won't know what it is and it makes the logline kinda messy, especially with nested invocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actor name is for readability and run id is for uniqueness and ability to track the log to it's origin. Keeping only the actor name would make it harder to find the origin of the log.
Depending on the use-case this might seem less or more relevant. Maybe it could be something like f"{actor_name}-runId:{run_id}" to make it more explicit and clear what the id refers to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in which case do you actually need to track the log to its origin? Isn't the most frequent case calling a single Actor and showing the status so that there's some activity in the log?

Also, a - may not be an ideal separator, those appear in Actor names quite often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Any scenario where one actor is calling two different runs of another actor. There it would be crucial, but it is not a common use case.
  2. In normal "one-to-one" scenario it is useful to easily navigate to the relevant run of the called actor. That I do not think is uncommon scenario. Is there any other convenient way how to navigate to the called actor run that would make this information redundant in the logs?

About separator I am fine with anything. In logger it can be even white space I guess. Do you have any preference there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case 1, it would be nice if the user could opt into logging the run id.

In case 2, I'd prefer to just log the run id when starting the Actor. But don't take it as an authoritative answer, I'm just concerned about printing the run id a zillion times on each line.

As a separator, a space sounds like a good idea. Maybe with some arrow or >> for nested Actor runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I do not want to add any arguments for customization of the default logger is that it will be part of functions that already have many arguments, for example actor.call . So my idea was, either use default without any customization options, or create your own logger and nothing in between. This gives you all the freedom and does not add too many new arguments to functions that already have too many of them.

What about following idea to reduce the run id spam in the logs. Each redirected logger will first print some full identification message, like: "Redirect logger for actor XYZ and run id ABCDEF... will be shown as {some alias}"

Then we can talk about this alias. Which can be for example "apify.{actor_name} {redirect_logger_counter}" or "apify.redirect {n}" or something else ...?

So it will be shorter and you will still be able to uniquely identify the origin of the redirected log with the usage of the first log message.

BUT!!! Going on with such approach you will not be able to identify deeply nested redirected loggers if you choose an option to not redirect from the start of the actor run (in case of long running standby actors) as you will miss the first redirected logger identification message.

Having that in mind, I still feel that full info in each message is the safest for the default logger, even though it is quite verbose. I will add documentation once it is also integrated into SDK that will show how easy it is to create logger that allows the customization.

to_logger = create_redirect_logger(f'apify.{name}')

return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger, check_period=check_period)


class RunClientAsync(ActorJobBaseClientAsync):
"""Async sub-client for manipulating a single Actor run."""
Expand Down Expand Up @@ -612,3 +644,33 @@ async def charge(
}
),
)

async def get_status_message_redirector(
self,
to_logger: logging.Logger | None = None,
check_period: timedelta = timedelta(seconds=1),
) -> StatusMessageRedirectorAsync:
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.

`StatusMessageRedirector` can be directly called or used as a context manager.

Args:
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
created.
check_period: The period with which the status message will be polled.

Returns:
`StatusMessageRedirector` instance for redirected logs.
"""
run_data = await self.get()
run_id = run_data.get('id', '') if run_data else ''

actor_id = run_data.get('actId', '') if run_data else ''
actor_data = await self.root_client.actor(actor_id=actor_id).get() or {}
actor_name = actor_data.get('name', '') if run_data else ''

if not to_logger:
name = '-'.join(part for part in (actor_name, run_id) if part)
to_logger = create_redirect_logger(f'apify.{name}')

return StatusMessageRedirectorAsync(run_client=self, to_logger=to_logger, check_period=check_period)
Loading