Skip to content

Commit 2914e50

Browse files
committed
TODO: Finalize tests
1 parent 737cde9 commit 2914e50

File tree

3 files changed

+83
-29
lines changed

3 files changed

+83
-29
lines changed

src/apify_client/clients/resource_clients/log.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ class StatusMessageRedirector:
390390
Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
391391
especially in cases of frequent status message changes.
392392
"""
393+
_force_propagate = False
393394

394395
def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
395396
"""Initialize `StatusMessageRedirector`.
@@ -399,9 +400,11 @@ def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timed
399400
check_period: The period with which the status message will be polled.
400401
"""
401402
self._to_logger = to_logger
403+
self._to_logger.propagate = self._force_propagate
402404
self._check_period = check_period.total_seconds()
403405
self._last_status_message = ''
404406

407+
405408
def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
406409
"""Get relevant run data, log them if changed and return `True` if more data is expected.
407410
@@ -420,15 +423,15 @@ def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
420423
self._last_status_message = new_status_message
421424
self._to_logger.info(new_status_message)
422425

423-
return run_data.get('isStatusMessageTerminal', True)
426+
return not(run_data.get('isStatusMessageTerminal', False))
424427
return True
425428

426429

427430
class StatusMessageRedirectorAsync(StatusMessageRedirector):
428431
"""Async variant of `StatusMessageRedirector` that is logging in task."""
429432

430433
def __init__(
431-
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)
434+
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
432435
) -> None:
433436
"""Initialize `StatusMessageRedirectorAsync`.
434437
@@ -478,7 +481,7 @@ class StatusMessageRedirectorSync(StatusMessageRedirector):
478481
"""Sync variant of `StatusMessageRedirector` that is logging in thread."""
479482

480483
def __init__(
481-
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)
484+
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
482485
) -> None:
483486
"""Initialize `StatusMessageRedirectorSync`.
484487

src/apify_client/clients/resource_clients/run.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66
import string
77
import time
8+
from datetime import timedelta
89
from typing import TYPE_CHECKING, Any
910

1011
from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
@@ -20,7 +21,7 @@
2021
StatusMessageRedirectorAsync,
2122
StatusMessageRedirectorSync,
2223
StreamedLogAsync,
23-
StreamedLogSync,
24+
StreamedLogSync, StatusMessageRedirector,
2425
)
2526
from apify_client.clients.resource_clients.request_queue import RequestQueueClient, RequestQueueClientAsync
2627

@@ -321,14 +322,18 @@ def charge(
321322
),
322323
)
323324

324-
def get_status_message_logger(self, to_logger: logging.Logger | None = None) -> StatusMessageRedirectorSync:
325+
326+
def get_status_message_redirector(
327+
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1)
328+
) -> StatusMessageRedirectorSync:
325329
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.
326330
327-
`StatusMessageRedirector` can be directly called or used as a context manager.
331+
`StatusMessageRedirector` can be directly called or used as a context manager.
328332
329333
Args:
330334
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
331335
created.
336+
check_period: The period with which the status message will be polled.
332337
333338
Returns:
334339
`StatusMessageRedirector` instance for redirected logs.
@@ -344,7 +349,8 @@ def get_status_message_logger(self, to_logger: logging.Logger | None = None) ->
344349
name = '-'.join(part for part in (actor_name, run_id) if part)
345350
to_logger = create_redirect_logger(f'apify.{name}')
346351

347-
return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger)
352+
return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger, check_period=check_period)
353+
348354

349355

350356
class RunClientAsync(ActorJobBaseClientAsync):
@@ -642,7 +648,7 @@ async def charge(
642648
)
643649

644650
async def get_status_message_redirector(
645-
self, to_logger: logging.Logger | None = None
651+
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1),
646652
) -> StatusMessageRedirectorAsync:
647653
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.
648654
@@ -651,6 +657,7 @@ async def get_status_message_redirector(
651657
Args:
652658
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
653659
created.
660+
check_period: The period with which the status message will be polled.
654661
655662
Returns:
656663
`StatusMessageRedirector` instance for redirected logs.
@@ -666,4 +673,4 @@ async def get_status_message_redirector(
666673
name = '-'.join(part for part in (actor_name, run_id) if part)
667674
to_logger = create_redirect_logger(f'apify.{name}')
668675

669-
return StatusMessageRedirectorAsync(run_client=self, to_logger=to_logger)
676+
return StatusMessageRedirectorAsync(run_client=self, to_logger=to_logger, check_period=check_period)

tests/unit/test_logging.py

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import time
55
from collections.abc import AsyncIterator, Iterator
6-
from datetime import datetime
6+
from datetime import datetime, timedelta
77
from unittest.mock import patch
88

99
import httpx
@@ -14,7 +14,7 @@
1414

1515
from apify_client import ApifyClient, ApifyClientAsync
1616
from apify_client._logging import RedirectLogFormatter
17-
from apify_client.clients.resource_clients.log import StreamedLog
17+
from apify_client.clients.resource_clients.log import StreamedLog, StatusMessageRedirector
1818

1919
_MOCKED_API_URL = 'https://example.com'
2020
_MOCKED_RUN_ID = 'mocked_run_id'
@@ -57,32 +57,30 @@
5757

5858
@pytest.fixture
5959
def mock_api() -> None:
60-
actor_runs_responses = iter(
61-
(
62-
httpx.Response(
63-
content=json.dumps(
64-
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING}}
65-
),
66-
status_code=200,
67-
),
68-
httpx.Response(
60+
61+
def create_status_responses_generator():
62+
for i in range(10):
63+
yield httpx.Response(
6964
content=json.dumps(
70-
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING}}
65+
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING,
66+
'statusMessage': f'Status {i}', 'isStatusMessageTerminal': False}}
7167
),
7268
status_code=200,
73-
),
74-
httpx.Response(
69+
)
70+
while True:
71+
yield httpx.Response(
7572
content=json.dumps(
76-
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.SUCCEEDED}}
73+
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.SUCCEEDED,
74+
'statusMessage': f'Status 101', 'isStatusMessageTerminal': True}}
7775
),
7876
status_code=200,
79-
),
80-
)
81-
)
77+
)
78+
79+
response_generator = create_status_responses_generator()
8280

8381
def actor_runs_side_effect(_: httpx.Request) -> httpx.Response:
8482
time.sleep(0.1)
85-
return next(actor_runs_responses)
83+
return next(response_generator)
8684

8785
respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect)
8886

@@ -129,7 +127,9 @@ def close(self) -> None:
129127

130128
@pytest.fixture
131129
def propagate_stream_logs() -> None:
132-
StreamedLog._force_propagate = True # Enable propagation of logs to the caplog fixture
130+
# Enable propagation of logs to the caplog fixture
131+
StreamedLog._force_propagate = True
132+
StatusMessageRedirector._force_propagate = True
133133
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
134134

135135

@@ -332,3 +332,47 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
332332
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
333333
assert expected_message_and_level[0] == record.message
334334
assert expected_message_and_level[1] == record.levelno
335+
336+
@respx.mock
337+
async def test_redirect_status_message_async(
338+
*,
339+
caplog: LogCaptureFixture,
340+
mock_api: None, # noqa: ARG001, fixture
341+
propagate_stream_logs: None, # noqa: ARG001, fixture
342+
) -> None:
343+
"""Test redirected status and status messages."""
344+
345+
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
346+
347+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
348+
349+
status_message_redirector = await run_client.get_status_message_redirector(check_period=timedelta(seconds =0.1))
350+
with caplog.at_level(logging.DEBUG, logger=logger_name):
351+
async with status_message_redirector:
352+
# Do stuff while the status from the other actor is being redirected to the logs.
353+
await asyncio.sleep(2)
354+
355+
assert caplog.records[0].message == 'Status: RUNNING, Message: Status 1'
356+
assert caplog.records[1].message == 'Status: SUCCEEDED, Message: Status 2'
357+
358+
@respx.mock
359+
def test_redirect_status_message_sync(
360+
*,
361+
caplog: LogCaptureFixture,
362+
mock_api: None, # noqa: ARG001, fixture
363+
propagate_stream_logs: None, # noqa: ARG001, fixture
364+
) -> None:
365+
"""Test redirected status and status messages."""
366+
367+
run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
368+
369+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
370+
371+
status_message_redirector = run_client.get_status_message_redirector(check_period=timedelta(seconds =0.1))
372+
with caplog.at_level(logging.DEBUG, logger=logger_name):
373+
with status_message_redirector:
374+
# Do stuff while the status from the other actor is being redirected to the logs.
375+
time.sleep(2)
376+
377+
assert caplog.records[0].message == 'Status: RUNNING, Message: Status 1'
378+
assert caplog.records[1].message == 'Status: SUCCEEDED, Message: Status 2'

0 commit comments

Comments
 (0)