Skip to content

Commit a71ae41

Browse files
committed
TODO: Figure out hwo to mock response with steram
1 parent dac26eb commit a71ae41

File tree

5 files changed

+737
-557
lines changed

5 files changed

+737
-557
lines changed

src/apify_client/_logging.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import json
66
import logging
77
from contextvars import ContextVar
8-
from typing import TYPE_CHECKING, Any, Callable, NamedTuple
8+
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast
99

1010
# Conditional import only executed when type checking, otherwise we'd get circular dependency issues
1111
if TYPE_CHECKING:
@@ -120,3 +120,70 @@ def format(self, record: logging.LogRecord) -> str:
120120
if extra:
121121
log_string = f'{log_string} ({json.dumps(extra)})'
122122
return log_string
123+
124+
125+
def create_redirect_logger(
126+
name: str,
127+
*,
128+
default_level: int = logging.INFO,
129+
respect_original_log_level: bool = True,
130+
) -> logging.Logger:
131+
"""Create a logger for redirecting logs from another Actor.
132+
133+
Args:
134+
name: The name of the logger. It can be used to inherit from other loggers. Example: `apify.xyz` will use logger
135+
named `xyz` and make it a children of `apify` logger.
136+
default_level: All logs will be emitted on this level, regardless of the original level.
137+
respect_original_log_level: Try to use the original log level of the other actor logger and use `default_level`
138+
only if the original level is not determined.
139+
140+
Returns:
141+
The created logger.
142+
"""
143+
to_logger = logging.getLogger(name)
144+
to_logger.propagate = True
145+
if respect_original_log_level:
146+
to_logger.addFilter(
147+
_RedirectLogLevelFilter(
148+
default_level=default_level,
149+
)
150+
)
151+
else:
152+
to_logger.addFilter(_FixedLevelFilter(default_level=default_level))
153+
154+
return to_logger
155+
156+
157+
class _FixedLevelFilter(logging.Filter):
158+
def __init__(self, name: str = '', default_level: int = logging.INFO) -> None:
159+
super().__init__(name)
160+
self._default_level = default_level
161+
162+
def filter(self, record: logging.LogRecord) -> bool:
163+
"""Try to extract log level from the message content and set it accordingly."""
164+
record.levelno = self._default_level
165+
record.levelname = logging.getLevelName(record.levelno)
166+
return True
167+
168+
169+
class _RedirectLogLevelFilter(logging.Filter):
170+
def __init__(self, name: str = '', default_level: int = logging.INFO) -> None:
171+
super().__init__(name)
172+
self._default_level = default_level
173+
174+
def _guess_log_level_from_message(self, message: str) -> int:
175+
"""Guess the log level from the message."""
176+
# Using only levels explicitly mentioned in logging module
177+
known_levels = ('CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG', 'NOTSET')
178+
for level in known_levels:
179+
if level in message:
180+
# `getLevelName` returns an `int` when string is passed as input.
181+
return cast('int', logging.getLevelName(level))
182+
# Unknown log level. Fall back to the default.
183+
return self._default_level
184+
185+
def filter(self, record: logging.LogRecord) -> bool:
186+
"""Try to extract log level from the message content and set it accordingly."""
187+
record.levelno = self._guess_log_level_from_message(record.msg)
188+
record.levelname = logging.getLevelName(record.levelno)
189+
return True

src/apify_client/clients/resource_clients/log.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from __future__ import annotations
22

3+
import asyncio
4+
import logging
5+
from asyncio import Task
36
from contextlib import asynccontextmanager, contextmanager
47
from typing import TYPE_CHECKING, Any
58

@@ -11,8 +14,10 @@
1114

1215
if TYPE_CHECKING:
1316
from collections.abc import AsyncIterator, Iterator
17+
from types import TracebackType
1418

1519
import httpx
20+
from mypy.types import Self
1621

1722

1823
class LogClient(ResourceClient):
@@ -175,3 +180,50 @@ async def stream(self) -> AsyncIterator[httpx.Response | None]:
175180
finally:
176181
if response:
177182
await response.aclose()
183+
184+
185+
class StreamedLogSync:
186+
"""Utility class for streaming logs from another actor."""
187+
188+
189+
class StreamedLogAsync:
190+
"""Utility class for streaming logs from another actor."""
191+
192+
def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> None:
193+
self._log_client = log_client
194+
self._to_logger = to_logger
195+
self._streaming_task: Task | None = None
196+
197+
def __call__(self) -> Task:
198+
"""Start the streaming task. The called has to handle any cleanup."""
199+
return asyncio.create_task(self._stream_log(self._to_logger))
200+
201+
async def __aenter__(self) -> Self:
202+
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
203+
if self._streaming_task:
204+
raise RuntimeError('Streaming task already active')
205+
self._streaming_task = self()
206+
207+
return self
208+
209+
async def __aexit__(
210+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
211+
) -> None:
212+
"""Cancel the streaming task."""
213+
if not self._streaming_task:
214+
raise RuntimeError('Streaming task is not active')
215+
216+
self._streaming_task.cancel()
217+
218+
self._streaming_task = None
219+
220+
async def _stream_log(self, to_logger: logging.Logger) -> None:
221+
async with self._log_client.stream() as log_stream:
222+
if not log_stream:
223+
return
224+
async for data in log_stream.aiter_bytes():
225+
log_level = logging.INFO # The Original log level is not known unless the message is inspected.
226+
# Adjust the log level in custom logger filter if needed.
227+
to_logger.log(level=log_level, msg=data)
228+
# Cleanup in the end
229+
#log_stream.close()

src/apify_client/clients/resource_clients/run.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@
88

99
from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
1010

11+
from apify_client._logging import create_redirect_logger
1112
from apify_client._utils import encode_key_value_store_record_value, pluck_data, to_safe_id
1213
from apify_client.clients.base import ActorJobBaseClient, ActorJobBaseClientAsync
1314
from apify_client.clients.resource_clients.dataset import DatasetClient, DatasetClientAsync
1415
from apify_client.clients.resource_clients.key_value_store import KeyValueStoreClient, KeyValueStoreClientAsync
15-
from apify_client.clients.resource_clients.log import LogClient, LogClientAsync
16+
from apify_client.clients.resource_clients.log import LogClient, LogClientAsync, StreamedLogAsync
1617
from apify_client.clients.resource_clients.request_queue import RequestQueueClient, RequestQueueClientAsync
1718

1819
if TYPE_CHECKING:
20+
import logging
1921
from decimal import Decimal
2022

2123
from apify_shared.consts import RunGeneralAccess
@@ -515,6 +517,26 @@ def log(self) -> LogClientAsync:
515517
**self._sub_resource_init_options(resource_path='log'),
516518
)
517519

520+
async def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_name: str = '') -> StreamedLogAsync:
521+
"""Get `StreamedLogAsync` instance that can be used to redirect logs.
522+
523+
`StreamedLogAsync` can be directly called or used as a context manager.
524+
525+
Args:
526+
to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created
527+
actor_name: Optional component of default logger name.
528+
529+
Returns:
530+
`StreamedLogAsync` instance for redirected logs.
531+
"""
532+
run_data = await self.get()
533+
run_id = run_data.get('id', '') if run_data else ''
534+
535+
if not to_logger:
536+
to_logger = create_redirect_logger(f'apify.{f"{actor_name}-{run_id}"}')
537+
538+
return StreamedLogAsync(self.log(), to_logger)
539+
518540
async def charge(
519541
self,
520542
event_name: str,

tests/unit/test_logging.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import time
2+
from collections.abc import AsyncIterator
3+
4+
import httpx
5+
import respx
6+
7+
8+
from apify_client import ApifyClientAsync
9+
from apify_client.clients import RunClientAsync
10+
11+
12+
13+
14+
@respx.mock
15+
async def test_redirected_logs(caplog) -> None:
16+
"""Test that redirected logs are formatted correctly."""
17+
18+
class AsyncByteStream:
19+
async def __aiter__(self) -> AsyncIterator[bytes]:
20+
for i in range(2):
21+
yield b"Some text"
22+
time.sleep(1)
23+
print("b")
24+
25+
async def aclose(self) -> None:
26+
print("a")
27+
pass
28+
29+
run_client = ApifyClientAsync(token="mocked_token", api_url='https://example.com').run(run_id="run_is_mocked")
30+
respx.get(url='https://example.com/v2/actor-runs/run_is_mocked').mock(
31+
return_value=httpx.Response(stream=AsyncByteStream(), status_code=200))
32+
# {'http_version': b'HTTP/1.1', 'network_stream': <httpcore._backends.anyio.AnyIOStream object at 0x7fc82543db70>, 'reason_phrase': b'OK'}
33+
# [(b'Date', b'Mon, 12 May 2025 13:24:41 GMT'), (b'Content-Type', b'application/json; charset=utf-8'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'Cache-Control', b'no-cache, no-store, must-revalidate'), (b'Pragma', b'no-cache'), (b'Expires', b'0'), (b'Access-Control-Allow-Origin', b'*'), (b'Access-Control-Allow-Headers', b'User-Agent, Content-Type, Authorization, X-Apify-Request-Origin, openai-conversation-id, openai-ephemeral-user-id'), (b'Access-Control-Allow-Methods', b'GET, POST'), (b'Access-Control-Expose-Headers', b'X-Apify-Pagination-Total, X-Apify-Pagination-Offset, X-Apify-Pagination-Desc, X-Apify-Pagination-Count, X-Apify-Pagination-Limit'), (b'Referrer-Policy', b'no-referrer'), (b'X-Robots-Tag', b'none'), (b'X-RateLimit-Limit', b'200'), (b'Location', b'https://api.apify.com/v2/actor-runs/ywNUnFFbOksQLa4mH'), (b'Vary', b'Accept-Encoding'), (b'Content-Encoding', b'gzip')]
34+
streamed_log = await run_client.get_streamed_log(actor_name="mocked_actor")
35+
async with streamed_log:
36+
# do some stuff
37+
pass
38+
39+
records = caplog.get_records()

0 commit comments

Comments
 (0)