Skip to content

Commit 862cacc

Browse files
committed
Polish spliting of messages and setting the log level
Update test to be able to use caplog
1 parent 69ff84c commit 862cacc

File tree

3 files changed

+58
-80
lines changed

3 files changed

+58
-80
lines changed

src/apify_client/_logging.py

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -125,33 +125,18 @@ def format(self, record: logging.LogRecord) -> str:
125125

126126
def create_redirect_logger(
127127
name: str,
128-
*,
129-
default_level: int = logging.INFO,
130-
respect_original_log_level: bool = True,
131128
) -> logging.Logger:
132129
"""Create a logger for redirecting logs from another Actor.
133130
134131
Args:
135132
name: The name of the logger. It can be used to inherit from other loggers. Example: `apify.xyz` will use logger
136133
named `xyz` and make it a children of `apify` logger.
137-
default_level: All logs will be emitted on this level, regardless of the original level.
138-
respect_original_log_level: Try to use the original log level of the other actor logger and use `default_level`
139-
only if the original level is not determined.
140134
141135
Returns:
142136
The created logger.
143137
"""
144138
to_logger = logging.getLogger(name)
145139

146-
if respect_original_log_level:
147-
to_logger.addFilter(
148-
_RedirectLogLevelFilter(
149-
default_level=default_level,
150-
)
151-
)
152-
else:
153-
to_logger.addFilter(_FixedLevelFilter(default_level=default_level))
154-
155140
to_logger.propagate = False
156141
handler = logging.StreamHandler()
157142
handler.setFormatter(RedirectLogFormatter())
@@ -160,40 +145,6 @@ def create_redirect_logger(
160145
return to_logger
161146

162147

163-
class _FixedLevelFilter(logging.Filter):
164-
def __init__(self, name: str = '', default_level: int = logging.INFO) -> None:
165-
super().__init__(name)
166-
self._default_level = default_level
167-
168-
def filter(self, record: logging.LogRecord) -> bool:
169-
"""Try to extract log level from the message content and set it accordingly."""
170-
record.levelno = self._default_level
171-
record.levelname = logging.getLevelName(record.levelno)
172-
return True
173-
174-
175-
class _RedirectLogLevelFilter(logging.Filter):
176-
def __init__(self, name: str = '', default_level: int = logging.INFO) -> None:
177-
super().__init__(name)
178-
self._default_level = default_level
179-
180-
def _guess_log_level_from_message(self, message: str) -> int:
181-
"""Guess the log level from the message."""
182-
# Using only levels explicitly mentioned in logging module
183-
known_levels = ('CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG', 'NOTSET')
184-
for level in known_levels:
185-
if level in message:
186-
# `getLevelName` returns an `int` when string is passed as input.
187-
return cast('int', logging.getLevelName(level))
188-
# Unknown log level. Fall back to the default.
189-
return self._default_level
190-
191-
def filter(self, record: logging.LogRecord) -> bool:
192-
"""Try to extract log level from the message content and set it accordingly."""
193-
record.levelno = self._guess_log_level_from_message(record.msg)
194-
record.levelname = logging.getLevelName(record.levelno)
195-
return True
196-
197148
class RedirectLogFormatter:
198149
"""Formater applied to default redirect logger."""
199150

src/apify_client/clients/resource_clients/log.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import asyncio
44
import logging
5+
import re
56
from asyncio import Task
67
from contextlib import asynccontextmanager, contextmanager
7-
from typing import TYPE_CHECKING, Any
8+
from typing import TYPE_CHECKING, Any, cast
89

910
from apify_shared.utils import ignore_docs
1011

@@ -195,7 +196,7 @@ def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> Non
195196
self._streaming_task: Task | None = None
196197

197198
def __call__(self) -> Task:
198-
"""Start the streaming task. The called has to handle any cleanup."""
199+
"""Start the streaming task. The caller has to handle any cleanup."""
199200
return asyncio.create_task(self._stream_log(self._to_logger))
200201

201202
async def __aenter__(self) -> Self:
@@ -214,21 +215,33 @@ async def __aexit__(
214215
raise RuntimeError('Streaming task is not active')
215216

216217
self._streaming_task.cancel()
217-
218218
self._streaming_task = None
219219

220220
async def _stream_log(self, to_logger: logging.Logger) -> None:
221221
async with self._log_client.stream() as log_stream:
222222
if not log_stream:
223223
return
224224
async for data in log_stream.aiter_bytes():
225-
log_level = logging.INFO # The Original log level is not known unless the message is inspected.
226-
# Adjust the log level in custom logger filter if needed.
227-
228-
# Split by lines for each line that does start with standard format, try to guess the log level
229-
# example split marker: \n2025-05-12T15:35:59.429Z
230-
231-
to_logger.log(level=log_level, msg=data.decode('utf-8'))
232-
#logging.getLogger("apify_client").info(data)
233-
# Cleanup in the end
234-
#log_stream.close()
225+
# Example split marker: \n2025-05-12T15:35:59.429Z
226+
date_time_marker_pattern = r"(\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)"
227+
splits = re.split(date_time_marker_pattern, data.decode('utf-8'))
228+
messages=splits[:1]
229+
230+
for split_marker, message_without_split_marker in zip(splits[1:-1:2],splits[2::2]):
231+
messages.append(split_marker+message_without_split_marker)
232+
233+
for message in messages:
234+
to_logger.log(level=self._guess_log_level_from_message(message), msg=message.strip())
235+
log_stream.close()
236+
237+
@staticmethod
238+
def _guess_log_level_from_message(message: str) -> int:
239+
"""Guess the log level from the message."""
240+
# Using only levels explicitly mentioned in the logging module
241+
known_levels = ('CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG', 'NOTSET')
242+
for level in known_levels:
243+
if level in message:
244+
# `getLevelName` returns an `int` when string is passed as input.
245+
return cast('int', logging.getLevelName(level))
246+
# Unknown log level. Fall back to the default.
247+
return logging.INFO

tests/unit/test_logging.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,24 @@
1515
@respx.mock
1616
async def test_redirected_logs(caplog) -> None:
1717
"""Test that redirected logs are formatted correctly."""
18-
mocked_actor_logs_logs = (b"INFO a", b"WARNING b", b"DEBUG c")
18+
mocked_actor_logs_logs = (
19+
b"2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n"
20+
b"2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n"
21+
b"2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.", # Several logs merged into one message
22+
b"2025-05-13T07:24:14.132Z [apify] INFO multiline \n log",
23+
b"2025-05-13T07:25:14.132Z [apify] WARNING some warning",
24+
b"2025-05-13T07:26:14.132Z [apify] DEBUG c")
1925
mocked_actor_name = "mocked_actor"
26+
mocked_run_id = "mocked_run_id"
27+
28+
expected_logs_and_levels = [
29+
("2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.", logging.INFO),
30+
("2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.", logging.INFO),
31+
("2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.", logging.INFO),
32+
("2025-05-13T07:24:14.132Z [apify] INFO multiline \n log", logging.INFO),
33+
("2025-05-13T07:25:14.132Z [apify] WARNING some warning", logging.WARNING),
34+
("2025-05-13T07:26:14.132Z [apify] DEBUG c", logging.DEBUG),
35+
]
2036

2137
class AsyncByteStream:
2238
async def __aiter__(self) -> AsyncIterator[bytes]:
@@ -27,27 +43,25 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
2743
async def aclose(self) -> None:
2844
pass
2945

30-
run_client = ApifyClientAsync(token="mocked_token", api_url='https://example.com').run(run_id="run_is_mocked")
31-
respx.get(url='https://example.com/v2/actor-runs/run_is_mocked').mock(
32-
return_value=httpx.Response(content=json.dumps({"data":{'actId': 'SbjD4JEucMevUdQAH'}}),status_code=200))
33-
respx.get(url='https://example.com/v2/actor-runs/run_is_mocked/log?stream=1').mock(
46+
respx.get(url=f'https://example.com/v2/actor-runs/{mocked_run_id}').mock(
47+
return_value=httpx.Response(content=json.dumps({"data":{'id': mocked_run_id}}),status_code=200))
48+
respx.get(url=f'https://example.com/v2/actor-runs/{mocked_run_id}/log?stream=1').mock(
3449
return_value=httpx.Response(stream=AsyncByteStream(), status_code=200))
35-
# {'http_version': b'HTTP/1.1', 'network_stream': <httpcore._backends.anyio.AnyIOStream object at 0x7fc82543db70>, 'reason_phrase': b'OK'}
36-
# [(b'Date', b'Mon, 12 May 2025 13:24:41 GMT'), (b'Content-Type', b'application/json; charset=utf-8'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'Cache-Control', b'no-cache, no-store, must-revalidate'), (b'Pragma', b'no-cache'), (b'Expires', b'0'), (b'Access-Control-Allow-Origin', b'*'), (b'Access-Control-Allow-Headers', b'User-Agent, Content-Type, Authorization, X-Apify-Request-Origin, openai-conversation-id, openai-ephemeral-user-id'), (b'Access-Control-Allow-Methods', b'GET, POST'), (b'Access-Control-Expose-Headers', b'X-Apify-Pagination-Total, X-Apify-Pagination-Offset, X-Apify-Pagination-Desc, X-Apify-Pagination-Count, X-Apify-Pagination-Limit'), (b'Referrer-Policy', b'no-referrer'), (b'X-Robots-Tag', b'none'), (b'X-RateLimit-Limit', b'200'), (b'Location', b'https://api.apify.com/v2/actor-runs/ywNUnFFbOksQLa4mH'), (b'Vary', b'Accept-Encoding'), (b'Content-Encoding', b'gzip')]
50+
51+
run_client = ApifyClientAsync(token="mocked_token", api_url='https://example.com').run(run_id=mocked_run_id)
3752
streamed_log = await run_client.get_streamed_log(actor_name=mocked_actor_name)
3853

39-
with caplog.at_level(logging.DEBUG):
54+
# Set `propagate=True` during the tests, so that caplog can see the logs..
55+
logger_name = f"apify.{mocked_actor_name}-{mocked_run_id}"
56+
logging.getLogger(logger_name).propagate = True
57+
58+
with caplog.at_level(logging.DEBUG, logger=logger_name):
4059
async with streamed_log:
60+
# Do stuff while the log from the other actor is being redirected to the logs.
4161
await asyncio.sleep(1)
42-
# do some stuff
43-
pass
4462

4563
records = caplog.records
46-
assert len(records) == 2
47-
48-
49-
50-
"""
51-
52-
{'actId': 'SbjD4JEucMevUdQAH', 'buildId': 'Jv7iIjo1JV0gEXQEm', 'buildNumber': '0.0.5', 'containerUrl': 'https://tlo2axp6qbc7.runs.apify.net', 'defaultDatasetId': 'DZq6uDwZ4gSXev8h2', 'defaultKeyValueStoreId': '7UswAGyvNKFGlddHS', 'defaultRequestQueueId': 'Gk4ye89GRCoqFNdsM', 'finishedAt': None, 'generalAccess': 'FOLLOW_USER_SETTING', 'id': 'u6Q52apBHWO09NjDP', 'meta': {'origin': 'API', 'userAgent': 'ApifyClient/1.9.0 (linux; Python/3.10.12); isAtHome/False'}, 'options': {'build': 'latest', 'diskMbytes': 2048, 'memoryMbytes': 1024, 'timeoutSecs': 3600}, 'startedAt': '2025-05-12T13:54:23.028Z', 'stats': {'computeUnits': 0, 'inputBodyLen': 15, 'migrationCount': 0, 'rebootCount': 0, 'restartCount': 0, 'resurrectCount': 0}, 'status': 'READY', 'usage': {'ACTOR_COMPUTE_UNITS': 0, 'DATASET_READS': 0, 'DATASET_WRITES': 0, 'DATA_TRANSFER_EXTERNAL_GBYTES': 0, 'DATA_TRANSFER_INTERNAL_GBYTES': 0, 'KEY_VALUE_STORE_LISTS': 0, 'KEY_VALUE_STORE_READS': 0, 'KEY_VALUE_STORE_WRITES': 1, 'PROXY_RESIDENTIAL_TRANSFER_GBYTES': 0, 'PROXY_SERPS': 0, 'REQUEST_QUEUE_READS': 0, 'REQUEST_QUEUE_WRITES': 0}, 'usageTotalUsd': 5e-05, 'usageUsd': {'ACTOR_COMPUTE_UNITS': 0, 'DATASET_READS': 0, 'DATASET_WRITES': 0, 'DATA_TRANSFER_EXTERNAL_GBYTES': 0, 'DATA_TRANSFER_INTERNAL_GBYTES': 0, 'KEY_VALUE_STORE_LISTS': 0, 'KEY_VALUE_STORE_READS': 0, 'KEY_VALUE_STORE_WRITES': 5e-05, 'PROXY_RESIDENTIAL_TRANSFER_GBYTES': 0, 'PROXY_SERPS': 0, 'REQUEST_QUEUE_READS': 0, 'REQUEST_QUEUE_WRITES': 0}, 'userId': 'LjAzEG1CadliECnrn'}
53-
"""
64+
assert len(records) == 6
65+
for expected_log_and_level, record in zip(expected_logs_and_levels, records):
66+
assert expected_log_and_level[0] == record.message
67+
assert expected_log_and_level[1] == record.levelno

0 commit comments

Comments
 (0)