Skip to content

Commit 753427a

Browse files
committed
Draft with async implementation and example tests
1 parent 862cacc commit 753427a

File tree

7 files changed

+195
-51
lines changed

7 files changed

+195
-51
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ classifiers = [
2525
keywords = ["apify", "api", "client", "automation", "crawling", "scraping"]
2626
dependencies = [
2727
"apify-shared>=1.4.1",
28+
"colorama~=0.4.0",
2829
"httpx>=0.25",
2930
"more_itertools>=10.0.0",
3031
]
@@ -52,6 +53,7 @@ dev = [
5253
"respx~=0.22.0",
5354
"ruff~=0.11.0",
5455
"setuptools", # setuptools are used by pytest but not explicitly required
56+
"types-colorama~=0.4.15.20240106",
5557
]
5658

5759
[tool.hatch.build.targets.wheel]

src/apify_client/_logging.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import json
66
import logging
77
from contextvars import ContextVar
8-
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast
8+
from typing import TYPE_CHECKING, Any, Callable, NamedTuple
99

10-
from colorama import Style, Fore
10+
from colorama import Fore, Style
1111

1212
# Conditional import only executed when type checking, otherwise we'd get circular dependency issues
1313
if TYPE_CHECKING:
@@ -123,6 +123,7 @@ def format(self, record: logging.LogRecord) -> str:
123123
log_string = f'{log_string} ({json.dumps(extra)})'
124124
return log_string
125125

126+
126127
def create_redirect_logger(
127128
name: str,
128129
) -> logging.Logger:
@@ -141,11 +142,11 @@ def create_redirect_logger(
141142
handler = logging.StreamHandler()
142143
handler.setFormatter(RedirectLogFormatter())
143144
to_logger.addHandler(handler)
144-
to_logger.setLevel(logging.INFO)
145+
to_logger.setLevel(logging.DEBUG)
145146
return to_logger
146147

147148

148-
class RedirectLogFormatter:
149+
class RedirectLogFormatter(logging.Formatter):
149150
"""Formater applied to default redirect logger."""
150151

151152
def format(self, record: logging.LogRecord) -> str:

src/apify_client/clients/resource_clients/actor.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING, Any
3+
from typing import TYPE_CHECKING, Any, Literal
44

55
from apify_shared.utils import (
66
filter_out_none_values_recursively,
@@ -27,6 +27,7 @@
2727

2828
if TYPE_CHECKING:
2929
from decimal import Decimal
30+
from logging import Logger
3031

3132
from apify_shared.consts import ActorJobStatus, MetaOrigin
3233

@@ -681,6 +682,7 @@ async def call(
681682
timeout_secs: int | None = None,
682683
webhooks: list[dict] | None = None,
683684
wait_secs: int | None = None,
685+
logger: Logger | None | Literal['default'] = 'default',
684686
) -> dict | None:
685687
"""Start the Actor and wait for it to finish before returning the Run object.
686688
@@ -705,6 +707,9 @@ async def call(
705707
a webhook set up for the Actor, you do not have to add it again here.
706708
wait_secs: The maximum number of seconds the server waits for the run to finish. If not provided,
707709
waits indefinitely.
710+
logger: Loger used to redirect logs from the Actor run. By default, it is set to "default" which means that
711+
the default logger will be created and used. Setting `None` will disable any log propagation. Passing
712+
custom logger will redirect logs to the provided logger.
708713
709714
Returns:
710715
The run object.
@@ -720,7 +725,18 @@ async def call(
720725
webhooks=webhooks,
721726
)
722727

723-
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
728+
if not logger:
729+
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
730+
731+
run_client = self.root_client.run(run_id=started_run['id'])
732+
if logger == 'default':
733+
actor_name = actor_data.get('name', '') if (actor_data := await self.get()) else ''
734+
log_context = await run_client.get_streamed_log(actor_name=actor_name)
735+
else:
736+
log_context = await run_client.get_streamed_log(to_logger=logger)
737+
738+
async with log_context:
739+
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
724740

725741
async def build(
726742
self,

src/apify_client/clients/resource_clients/log.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,15 @@ class StreamedLogSync:
190190
class StreamedLogAsync:
191191
"""Utility class for streaming logs from another actor."""
192192

193+
# Test related flag to enable propagation of logs to the `caplog` fixture during tests.
194+
_force_propagate = False
195+
193196
def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> None:
194197
self._log_client = log_client
195198
self._to_logger = to_logger
196199
self._streaming_task: Task | None = None
200+
if self._force_propagate:
201+
to_logger.propagate = True
197202

198203
def __call__(self) -> Task:
199204
"""Start the streaming task. The caller has to handle any cleanup."""
@@ -223,12 +228,12 @@ async def _stream_log(self, to_logger: logging.Logger) -> None:
223228
return
224229
async for data in log_stream.aiter_bytes():
225230
# Example split marker: \n2025-05-12T15:35:59.429Z
226-
date_time_marker_pattern = r"(\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)"
231+
date_time_marker_pattern = r'(\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)'
227232
splits = re.split(date_time_marker_pattern, data.decode('utf-8'))
228-
messages=splits[:1]
233+
messages = splits[:1]
229234

230-
for split_marker, message_without_split_marker in zip(splits[1:-1:2],splits[2::2]):
231-
messages.append(split_marker+message_without_split_marker)
235+
for split_marker, message_without_split_marker in zip(splits[1:-1:2], splits[2::2]):
236+
messages.append(split_marker + message_without_split_marker)
232237

233238
for message in messages:
234239
to_logger.log(level=self._guess_log_level_from_message(message), msg=message.strip())

src/apify_client/clients/resource_clients/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ async def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_
533533
run_id = run_data.get('id', '') if run_data else ''
534534

535535
if not to_logger:
536-
name = "-".join(part for part in (actor_name, run_id) if part)
536+
name = '-'.join(part for part in (actor_name, run_id) if part)
537537
to_logger = create_redirect_logger(f'apify.{name}')
538538

539539
return StreamedLogAsync(self.log(), to_logger)

tests/unit/test_logging.py

Lines changed: 147 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,174 @@
11
import asyncio
22
import json
33
import logging
4-
import time
54
from collections.abc import AsyncIterator
65

76
import httpx
7+
import pytest
88
import respx
9-
9+
from _pytest.logging import LogCaptureFixture
10+
from apify_shared.consts import ActorJobStatus
1011

1112
from apify_client import ApifyClientAsync
12-
from apify_client.clients import RunClientAsync
13+
from apify_client._logging import RedirectLogFormatter
14+
from apify_client.clients.resource_clients.log import StreamedLogAsync
1315

16+
_MOCKED_API_URL = 'https://example.com'
17+
_MOCKED_RUN_ID = 'mocked_run_id'
18+
_MOCKED_ACTOR_NAME = 'mocked_actor_name'
19+
_MOCKED_ACTOR_ID = 'mocked_actor_id'
20+
_MOCKED_ACTOR_LOGS = (
21+
b'2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n'
22+
b'2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n'
23+
b'2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', # Several logs merged into one message
24+
b'2025-05-13T07:24:14.132Z [apify] INFO multiline \n log',
25+
b'2025-05-13T07:25:14.132Z [apify] WARNING some warning',
26+
b'2025-05-13T07:26:14.132Z [apify] DEBUG c',
27+
)
1428

15-
@respx.mock
16-
async def test_redirected_logs(caplog) -> None:
17-
"""Test that redirected logs are formatted correctly."""
18-
mocked_actor_logs_logs = (
19-
b"2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n"
20-
b"2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n"
21-
b"2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.", # Several logs merged into one message
22-
b"2025-05-13T07:24:14.132Z [apify] INFO multiline \n log",
23-
b"2025-05-13T07:25:14.132Z [apify] WARNING some warning",
24-
b"2025-05-13T07:26:14.132Z [apify] DEBUG c")
25-
mocked_actor_name = "mocked_actor"
26-
mocked_run_id = "mocked_run_id"
27-
28-
expected_logs_and_levels = [
29-
("2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.", logging.INFO),
30-
("2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.", logging.INFO),
31-
("2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.", logging.INFO),
32-
("2025-05-13T07:24:14.132Z [apify] INFO multiline \n log", logging.INFO),
33-
("2025-05-13T07:25:14.132Z [apify] WARNING some warning", logging.WARNING),
34-
("2025-05-13T07:26:14.132Z [apify] DEBUG c", logging.DEBUG),
35-
]
36-
37-
class AsyncByteStream:
29+
_EXPECTED_MESSAGES_AND_LEVELS = (
30+
('2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.', logging.INFO),
31+
('2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.', logging.INFO),
32+
('2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', logging.INFO),
33+
('2025-05-13T07:24:14.132Z [apify] INFO multiline \n log', logging.INFO),
34+
('2025-05-13T07:25:14.132Z [apify] WARNING some warning', logging.WARNING),
35+
('2025-05-13T07:26:14.132Z [apify] DEBUG c', logging.DEBUG),
36+
)
37+
38+
39+
@pytest.fixture
40+
def mock_api() -> None:
41+
class AsyncByteStream(httpx._types.AsyncByteStream):
3842
async def __aiter__(self) -> AsyncIterator[bytes]:
39-
for i in mocked_actor_logs_logs:
43+
for i in _MOCKED_ACTOR_LOGS:
4044
yield i
4145
await asyncio.sleep(0.1)
4246

4347
async def aclose(self) -> None:
4448
pass
4549

46-
respx.get(url=f'https://example.com/v2/actor-runs/{mocked_run_id}').mock(
47-
return_value=httpx.Response(content=json.dumps({"data":{'id': mocked_run_id}}),status_code=200))
48-
respx.get(url=f'https://example.com/v2/actor-runs/{mocked_run_id}/log?stream=1').mock(
49-
return_value=httpx.Response(stream=AsyncByteStream(), status_code=200))
50+
actor_runs_responses = iter(
51+
(
52+
httpx.Response(
53+
content=json.dumps({'data': {'id': _MOCKED_RUN_ID, 'status': ActorJobStatus.RUNNING}}), status_code=200
54+
),
55+
httpx.Response(
56+
content=json.dumps({'data': {'id': _MOCKED_RUN_ID, 'status': ActorJobStatus.RUNNING}}), status_code=200
57+
),
58+
httpx.Response(
59+
content=json.dumps({'data': {'id': _MOCKED_RUN_ID, 'status': ActorJobStatus.SUCCEEDED}}),
60+
status_code=200,
61+
),
62+
)
63+
)
64+
65+
async def actor_runs_side_effect(_: httpx.Request) -> httpx.Response:
66+
await asyncio.sleep(0.5)
67+
return next(actor_runs_responses)
68+
69+
respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect)
70+
71+
respx.get(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}').mock(
72+
return_value=httpx.Response(content=json.dumps({'data': {'name': _MOCKED_ACTOR_NAME}}), status_code=200)
73+
)
74+
75+
respx.post(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}/runs').mock(
76+
return_value=httpx.Response(content=json.dumps({'data': {'id': _MOCKED_RUN_ID}}), status_code=200)
77+
)
5078

51-
run_client = ApifyClientAsync(token="mocked_token", api_url='https://example.com').run(run_id=mocked_run_id)
52-
streamed_log = await run_client.get_streamed_log(actor_name=mocked_actor_name)
79+
respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1').mock(
80+
return_value=httpx.Response(stream=AsyncByteStream(), status_code=200)
81+
)
82+
83+
84+
@pytest.fixture
85+
def propagate_stream_logs() -> None:
86+
StreamedLogAsync._force_propagate = True # Enable propagation of logs to the caplog fixture
87+
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
88+
89+
90+
@respx.mock
91+
async def test_redirected_logs(
92+
caplog: LogCaptureFixture,
93+
mock_api: None, # noqa: ARG001, fixture
94+
propagate_stream_logs: None, # noqa: ARG001, fixture
95+
) -> None:
96+
"""Test that redirected logs are formatted correctly."""
97+
98+
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
99+
streamed_log = await run_client.get_streamed_log(actor_name=_MOCKED_ACTOR_NAME)
53100

54101
# Set `propagate=True` during the tests, so that caplog can see the logs..
55-
logger_name = f"apify.{mocked_actor_name}-{mocked_run_id}"
56-
logging.getLogger(logger_name).propagate = True
102+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
57103

58104
with caplog.at_level(logging.DEBUG, logger=logger_name):
59105
async with streamed_log:
60106
# Do stuff while the log from the other actor is being redirected to the logs.
61107
await asyncio.sleep(1)
62108

63-
records = caplog.records
64-
assert len(records) == 6
65-
for expected_log_and_level, record in zip(expected_logs_and_levels, records):
66-
assert expected_log_and_level[0] == record.message
67-
assert expected_log_and_level[1] == record.levelno
109+
assert len(caplog.records) == 6
110+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
111+
assert expected_message_and_level[0] == record.message
112+
assert expected_message_and_level[1] == record.levelno
113+
114+
115+
@respx.mock
116+
async def test_actor_call_redirect_logs_to_default_logger(
117+
caplog: LogCaptureFixture,
118+
mock_api: None, # noqa: ARG001, fixture
119+
propagate_stream_logs: None, # noqa: ARG001, fixture
120+
) -> None:
121+
"""Test that logs are redirected correctly to the default logger.
122+
123+
Caplog contains logs before formatting, so formatting is not included in the test expectations."""
124+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
125+
logger = logging.getLogger(logger_name)
126+
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID)
127+
128+
with caplog.at_level(logging.DEBUG, logger=logger_name):
129+
await run_client.call()
130+
131+
# Ensure expected handler and formater
132+
assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter)
133+
assert isinstance(logger.handlers[0], logging.StreamHandler)
134+
135+
# Ensure logs are propagated
136+
assert len(caplog.records) == 6
137+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
138+
assert expected_message_and_level[0] == record.message
139+
assert expected_message_and_level[1] == record.levelno
140+
141+
142+
@respx.mock
143+
async def test_actor_call_no_redirect_logs(
144+
caplog: LogCaptureFixture,
145+
mock_api: None, # noqa: ARG001, fixture
146+
propagate_stream_logs: None, # noqa: ARG001, fixture
147+
) -> None:
148+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
149+
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID)
150+
151+
with caplog.at_level(logging.DEBUG, logger=logger_name):
152+
await run_client.call(logger=None)
153+
154+
assert len(caplog.records) == 0
155+
156+
157+
@respx.mock
158+
async def test_actor_call_redirect_logs_to_custom_logger(
159+
caplog: LogCaptureFixture,
160+
mock_api: None, # noqa: ARG001, fixture
161+
propagate_stream_logs: None, # noqa: ARG001, fixture
162+
) -> None:
163+
"""Test that logs are redirected correctly to the custom logger."""
164+
logger_name = 'custom_logger'
165+
logger = logging.getLogger(logger_name)
166+
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID)
167+
168+
with caplog.at_level(logging.DEBUG, logger=logger_name):
169+
await run_client.call(logger=logger)
170+
171+
assert len(caplog.records) == 6
172+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
173+
assert expected_message_and_level[0] == record.message
174+
assert expected_message_and_level[1] == record.levelno

0 commit comments

Comments
 (0)