Skip to content

Commit 8fbbffa

Browse files
committed
Finalize tests.
Add final timeout.
1 parent 2914e50 commit 8fbbffa

File tree

4 files changed

+100
-39
lines changed

4 files changed

+100
-39
lines changed

src/apify_client/clients/resource_clients/actor.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ def call(
317317
waits indefinitely.
318318
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
319319
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
320-
will redirect logs to the provided logger.
320+
will redirect logs to the provided logger. The logger is also used to capture status and status message
321+
of the other Actor run.
321322
322323
Returns:
323324
The run object.
@@ -336,12 +337,11 @@ def call(
336337
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
337338

338339
run_client = self.root_client.run(run_id=started_run['id'])
340+
339341
if logger == 'default':
340-
log_context = run_client.get_streamed_log()
341-
else:
342-
log_context = run_client.get_streamed_log(to_logger=logger)
342+
logger = None
343343

344-
with log_context:
344+
with run_client.get_status_message_redirector(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
345345
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
346346

347347
def build(
@@ -722,7 +722,8 @@ async def call(
722722
waits indefinitely.
723723
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
724724
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
725-
will redirect logs to the provided logger.
725+
will redirect logs to the provided logger. The logger is also used to capture status and status message
726+
of the other Actor run.
726727
727728
Returns:
728729
The run object.

src/apify_client/clients/resource_clients/log.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,12 @@ class StatusMessageRedirector:
390390
Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
391391
especially in cases of frequent status message changes.
392392
"""
393+
393394
_force_propagate = False
395+
# This is final sleep time to try to get the last status and status message of finished Actor run.
396+
# The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
397+
# final message will be captured, but increases the chances of that.
398+
_final_sleep_time_s = 6
394399

395400
def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
396401
"""Initialize `StatusMessageRedirector`.
@@ -404,7 +409,6 @@ def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timed
404409
self._check_period = check_period.total_seconds()
405410
self._last_status_message = ''
406411

407-
408412
def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
409413
"""Get relevant run data, log them if changed and return `True` if more data is expected.
410414
@@ -423,7 +427,7 @@ def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
423427
self._last_status_message = new_status_message
424428
self._to_logger.info(new_status_message)
425429

426-
return not(run_data.get('isStatusMessageTerminal', False))
430+
return not (run_data.get('isStatusMessageTerminal', False))
427431
return True
428432

429433

@@ -468,6 +472,7 @@ async def __aexit__(
468472
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
469473
) -> None:
470474
"""Cancel the logging task."""
475+
await asyncio.sleep(self._final_sleep_time_s)
471476
self.stop()
472477

473478
async def _log_changed_status_message(self) -> None:
@@ -508,6 +513,7 @@ def stop(self) -> None:
508513
"""Signal the _logging_thread thread to stop logging and wait for it to finish."""
509514
if not self._logging_thread:
510515
raise RuntimeError('Logging thread is not active')
516+
time.sleep(self._final_sleep_time_s)
511517
self._stop_logging = True
512518
self._logging_thread.join()
513519
self._logging_thread = None

src/apify_client/clients/resource_clients/run.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
StatusMessageRedirectorAsync,
2222
StatusMessageRedirectorSync,
2323
StreamedLogAsync,
24-
StreamedLogSync, StatusMessageRedirector,
24+
StreamedLogSync,
2525
)
2626
from apify_client.clients.resource_clients.request_queue import RequestQueueClient, RequestQueueClientAsync
2727

@@ -322,7 +322,6 @@ def charge(
322322
),
323323
)
324324

325-
326325
def get_status_message_redirector(
327326
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1)
328327
) -> StatusMessageRedirectorSync:
@@ -352,7 +351,6 @@ def get_status_message_redirector(
352351
return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger, check_period=check_period)
353352

354353

355-
356354
class RunClientAsync(ActorJobBaseClientAsync):
357355
"""Async sub-client for manipulating a single Actor run."""
358356

@@ -648,7 +646,9 @@ async def charge(
648646
)
649647

650648
async def get_status_message_redirector(
651-
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1),
649+
self,
650+
to_logger: logging.Logger | None = None,
651+
check_period: timedelta = timedelta(seconds=1),
652652
) -> StatusMessageRedirectorAsync:
653653
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.
654654

tests/unit/test_logging.py

Lines changed: 81 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from apify_client import ApifyClient, ApifyClientAsync
1616
from apify_client._logging import RedirectLogFormatter
17-
from apify_client.clients.resource_clients.log import StreamedLog, StatusMessageRedirector
17+
from apify_client.clients.resource_clients.log import StatusMessageRedirector, StreamedLog
1818

1919
_MOCKED_API_URL = 'https://example.com'
2020
_MOCKED_RUN_ID = 'mocked_run_id'
@@ -54,24 +54,60 @@
5454
),
5555
)
5656

57+
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES = (
58+
('Status: RUNNING, Message: Initial message', logging.INFO),
59+
*_EXPECTED_MESSAGES_AND_LEVELS,
60+
('Status: RUNNING, Message: Another message', logging.INFO),
61+
('Status: SUCCEEDED, Message: Final message', logging.INFO),
62+
)
63+
5764

5865
@pytest.fixture
5966
def mock_api() -> None:
60-
61-
def create_status_responses_generator():
62-
for i in range(10):
67+
def create_status_responses_generator() -> Iterator[httpx.Response]:
68+
"""Simulate actor run that changes status 3 times."""
69+
for _ in range(5):
6370
yield httpx.Response(
6471
content=json.dumps(
65-
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING,
66-
'statusMessage': f'Status {i}', 'isStatusMessageTerminal': False}}
72+
{
73+
'data': {
74+
'id': _MOCKED_RUN_ID,
75+
'actId': _MOCKED_ACTOR_ID,
76+
'status': ActorJobStatus.RUNNING,
77+
'statusMessage': 'Initial message',
78+
'isStatusMessageTerminal': False,
79+
}
80+
}
81+
),
82+
status_code=200,
83+
)
84+
for _ in range(5):
85+
yield httpx.Response(
86+
content=json.dumps(
87+
{
88+
'data': {
89+
'id': _MOCKED_RUN_ID,
90+
'actId': _MOCKED_ACTOR_ID,
91+
'status': ActorJobStatus.RUNNING,
92+
'statusMessage': 'Another message',
93+
'isStatusMessageTerminal': False,
94+
}
95+
}
6796
),
6897
status_code=200,
6998
)
7099
while True:
71100
yield httpx.Response(
72101
content=json.dumps(
73-
{'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.SUCCEEDED,
74-
'statusMessage': f'Status 101', 'isStatusMessageTerminal': True}}
102+
{
103+
'data': {
104+
'id': _MOCKED_RUN_ID,
105+
'actId': _MOCKED_ACTOR_ID,
106+
'status': ActorJobStatus.SUCCEEDED,
107+
'statusMessage': 'Final message',
108+
'isStatusMessageTerminal': True,
109+
}
110+
}
75111
),
76112
status_code=200,
77113
)
@@ -133,6 +169,15 @@ def propagate_stream_logs() -> None:
133169
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
134170

135171

172+
@pytest.fixture
173+
def reduce_final_timeout_for_status_message_redirector() -> None:
174+
"""Reduce timeout used by the `StatusMessageRedirector`
175+
176+
This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests.
177+
"""
178+
StatusMessageRedirector._final_sleep_time_s = 2
179+
180+
136181
@pytest.mark.parametrize(
137182
('log_from_start', 'expected_log_count'),
138183
[
@@ -215,6 +260,7 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
215260
caplog: LogCaptureFixture,
216261
mock_api_async: None, # noqa: ARG001, fixture
217262
propagate_stream_logs: None, # noqa: ARG001, fixture
263+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
218264
) -> None:
219265
"""Test that logs are redirected correctly to the default logger.
220266
@@ -231,8 +277,8 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
231277
assert isinstance(logger.handlers[0], logging.StreamHandler)
232278

233279
# Ensure logs are propagated
234-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
235-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
280+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
281+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
236282
assert expected_message_and_level[0] == record.message
237283
assert expected_message_and_level[1] == record.levelno
238284

@@ -242,6 +288,7 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
242288
caplog: LogCaptureFixture,
243289
mock_api_sync: None, # noqa: ARG001, fixture
244290
propagate_stream_logs: None, # noqa: ARG001, fixture
291+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
245292
) -> None:
246293
"""Test that logs are redirected correctly to the default logger.
247294
@@ -258,8 +305,8 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
258305
assert isinstance(logger.handlers[0], logging.StreamHandler)
259306

260307
# Ensure logs are propagated
261-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
262-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
308+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
309+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
263310
assert expected_message_and_level[0] == record.message
264311
assert expected_message_and_level[1] == record.levelno
265312

@@ -299,6 +346,7 @@ async def test_actor_call_redirect_logs_to_custom_logger_async(
299346
caplog: LogCaptureFixture,
300347
mock_api_async: None, # noqa: ARG001, fixture
301348
propagate_stream_logs: None, # noqa: ARG001, fixture
349+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
302350
) -> None:
303351
"""Test that logs are redirected correctly to the custom logger."""
304352
logger_name = 'custom_logger'
@@ -308,8 +356,8 @@ async def test_actor_call_redirect_logs_to_custom_logger_async(
308356
with caplog.at_level(logging.DEBUG, logger=logger_name):
309357
await run_client.call(logger=logger)
310358

311-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
312-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
359+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
360+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
313361
assert expected_message_and_level[0] == record.message
314362
assert expected_message_and_level[1] == record.levelno
315363

@@ -319,6 +367,7 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
319367
caplog: LogCaptureFixture,
320368
mock_api_sync: None, # noqa: ARG001, fixture
321369
propagate_stream_logs: None, # noqa: ARG001, fixture
370+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
322371
) -> None:
323372
"""Test that logs are redirected correctly to the custom logger."""
324373
logger_name = 'custom_logger'
@@ -328,51 +377,56 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
328377
with caplog.at_level(logging.DEBUG, logger=logger_name):
329378
run_client.call(logger=logger)
330379

331-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
332-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
380+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
381+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
333382
assert expected_message_and_level[0] == record.message
334383
assert expected_message_and_level[1] == record.levelno
335384

385+
336386
@respx.mock
337387
async def test_redirect_status_message_async(
338388
*,
339389
caplog: LogCaptureFixture,
340390
mock_api: None, # noqa: ARG001, fixture
341391
propagate_stream_logs: None, # noqa: ARG001, fixture
392+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
342393
) -> None:
343394
"""Test redirected status and status messages."""
344395

345396
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
346397

347398
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
348399

349-
status_message_redirector = await run_client.get_status_message_redirector(check_period=timedelta(seconds =0.1))
400+
status_message_redirector = await run_client.get_status_message_redirector(check_period=timedelta(seconds=0))
350401
with caplog.at_level(logging.DEBUG, logger=logger_name):
351402
async with status_message_redirector:
352403
# Do stuff while the status from the other actor is being redirected to the logs.
353-
await asyncio.sleep(2)
404+
await asyncio.sleep(3)
405+
406+
assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message'
407+
assert caplog.records[1].message == 'Status: RUNNING, Message: Another message'
408+
assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message'
354409

355-
assert caplog.records[0].message == 'Status: RUNNING, Message: Status 1'
356-
assert caplog.records[1].message == 'Status: SUCCEEDED, Message: Status 2'
357410

358411
@respx.mock
359412
def test_redirect_status_message_sync(
360413
*,
361414
caplog: LogCaptureFixture,
362415
mock_api: None, # noqa: ARG001, fixture
363416
propagate_stream_logs: None, # noqa: ARG001, fixture
417+
reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture
364418
) -> None:
365419
"""Test redirected status and status messages."""
366420

367421
run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
368422

369423
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
370424

371-
status_message_redirector = run_client.get_status_message_redirector(check_period=timedelta(seconds =0.1))
372-
with caplog.at_level(logging.DEBUG, logger=logger_name):
373-
with status_message_redirector:
374-
# Do stuff while the status from the other actor is being redirected to the logs.
375-
time.sleep(2)
425+
status_message_redirector = run_client.get_status_message_redirector(check_period=timedelta(seconds=0))
426+
with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector:
427+
# Do stuff while the status from the other actor is being redirected to the logs.
428+
time.sleep(3)
376429

377-
assert caplog.records[0].message == 'Status: RUNNING, Message: Status 1'
378-
assert caplog.records[1].message == 'Status: SUCCEEDED, Message: Status 2'
430+
assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message'
431+
assert caplog.records[1].message == 'Status: RUNNING, Message: Another message'
432+
assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message'

0 commit comments

Comments
 (0)