Skip to content

Commit 268e568

Browse files
committed
Review comments
1 parent 18f4f51 commit 268e568

File tree

4 files changed

+39
-37
lines changed

4 files changed

+39
-37
lines changed

src/apify_client/clients/resource_clients/actor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def call(
341341
if logger == 'default':
342342
logger = None
343343

344-
with run_client.get_status_message_redirector(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
344+
with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
345345
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
346346

347347
def build(
@@ -747,7 +747,7 @@ async def call(
747747
if logger == 'default':
748748
logger = None
749749

750-
status_redirector = await run_client.get_status_message_redirector(to_logger=logger)
750+
status_redirector = await run_client.get_status_message_watcher(to_logger=logger)
751751
streamed_log = await run_client.get_streamed_log(to_logger=logger)
752752

753753
async with status_redirector, streamed_log:

src/apify_client/clients/resource_clients/log.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
232232
logs for long-running actors in stand-by.
233233
234234
"""
235-
self._to_logger = to_logger
236235
if self._force_propagate:
237236
to_logger.propagate = True
237+
self._to_logger = to_logger
238238
self._stream_buffer = list[bytes]()
239239
self._split_marker = re.compile(rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
240240
self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc)
@@ -384,7 +384,7 @@ async def _stream_log(self) -> None:
384384
self._log_buffer_content(include_last_part=True)
385385

386386

387-
class StatusMessageRedirector:
387+
class StatusMessageWatcher:
388388
"""Utility class for logging status messages from another Actor run.
389389
390390
Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
@@ -398,12 +398,14 @@ class StatusMessageRedirector:
398398
_final_sleep_time_s = 6
399399

400400
def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
401-
"""Initialize `StatusMessageRedirector`.
401+
"""Initialize `StatusMessageWatcher`.
402402
403403
Args:
404404
to_logger: The logger to which the status message will be redirected.
405405
check_period: The period with which the status message will be polled.
406406
"""
407+
if self._force_propagate:
408+
to_logger.propagate = True
407409
self._to_logger = to_logger
408410
self._to_logger.propagate = self._force_propagate
409411
self._check_period = check_period.total_seconds()
@@ -431,13 +433,13 @@ def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
431433
return True
432434

433435

434-
class StatusMessageRedirectorAsync(StatusMessageRedirector):
435-
"""Async variant of `StatusMessageRedirector` that is logging in task."""
436+
class StatusMessageWatcherAsync(StatusMessageWatcher):
437+
"""Async variant of `StatusMessageWatcher` that is logging in task."""
436438

437439
def __init__(
438440
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
439441
) -> None:
440-
"""Initialize `StatusMessageRedirectorAsync`.
442+
"""Initialize `StatusMessageWatcherAsync`.
441443
442444
Args:
443445
run_client: The client for run that will be used to get a status and message.
@@ -483,13 +485,13 @@ async def _log_changed_status_message(self) -> None:
483485
await asyncio.sleep(self._check_period)
484486

485487

486-
class StatusMessageRedirectorSync(StatusMessageRedirector):
487-
"""Sync variant of `StatusMessageRedirector` that is logging in thread."""
488+
class StatusMessageWatcherSync(StatusMessageWatcher):
489+
"""Sync variant of `StatusMessageWatcher` that is logging in thread."""
488490

489491
def __init__(
490492
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
491493
) -> None:
492-
"""Initialize `StatusMessageRedirectorSync`.
494+
"""Initialize `StatusMessageWatcherSync`.
493495
494496
Args:
495497
run_client: The client for run that will be used to get a status and message.

src/apify_client/clients/resource_clients/run.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from apify_client.clients.resource_clients.log import (
1919
LogClient,
2020
LogClientAsync,
21-
StatusMessageRedirectorAsync,
22-
StatusMessageRedirectorSync,
21+
StatusMessageWatcherAsync,
22+
StatusMessageWatcherSync,
2323
StreamedLogAsync,
2424
StreamedLogSync,
2525
)
@@ -274,7 +274,7 @@ def get_streamed_log(self, to_logger: logging.Logger | None = None, *, from_star
274274
`StreamedLog` instance for redirected logs.
275275
"""
276276
run_data = self.get()
277-
run_id = run_data.get('id', '') if run_data else ''
277+
run_id = f'runId:{run_data.get("id", "")}' if run_data else ''
278278

279279
actor_id = run_data.get('actId', '') if run_data else ''
280280
actor_data = self.root_client.actor(actor_id=actor_id).get() or {}
@@ -322,23 +322,23 @@ def charge(
322322
),
323323
)
324324

325-
def get_status_message_redirector(
325+
def get_status_message_watcher(
326326
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1)
327-
) -> StatusMessageRedirectorSync:
328-
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.
327+
) -> StatusMessageWatcherSync:
328+
"""Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs.
329329
330-
`StatusMessageRedirector` can be directly called or used as a context manager.
330+
`StatusMessageWatcher` can be directly called or used as a context manager.
331331
332332
Args:
333333
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
334334
created.
335335
check_period: The period with which the status message will be polled.
336336
337337
Returns:
338-
`StatusMessageRedirector` instance for redirected logs.
338+
`StatusMessageWatcher` instance.
339339
"""
340340
run_data = self.get()
341-
run_id = run_data.get('id', '') if run_data else ''
341+
run_id = f'runId:{run_data.get("id", "")}' if run_data else ''
342342

343343
actor_id = run_data.get('actId', '') if run_data else ''
344344
actor_data = self.root_client.actor(actor_id=actor_id).get() or {}
@@ -348,7 +348,7 @@ def get_status_message_redirector(
348348
name = '-'.join(part for part in (actor_name, run_id) if part)
349349
to_logger = create_redirect_logger(f'apify.{name}')
350350

351-
return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger, check_period=check_period)
351+
return StatusMessageWatcherSync(run_client=self, to_logger=to_logger, check_period=check_period)
352352

353353

354354
class RunClientAsync(ActorJobBaseClientAsync):
@@ -598,7 +598,7 @@ async def get_streamed_log(
598598
`StreamedLog` instance for redirected logs.
599599
"""
600600
run_data = await self.get()
601-
run_id = run_data.get('id', '') if run_data else ''
601+
run_id = f'runId:{run_data.get("id", "")}' if run_data else ''
602602

603603
actor_id = run_data.get('actId', '') if run_data else ''
604604
actor_data = await self.root_client.actor(actor_id=actor_id).get() or {}
@@ -645,25 +645,25 @@ async def charge(
645645
),
646646
)
647647

648-
async def get_status_message_redirector(
648+
async def get_status_message_watcher(
649649
self,
650650
to_logger: logging.Logger | None = None,
651651
check_period: timedelta = timedelta(seconds=1),
652-
) -> StatusMessageRedirectorAsync:
653-
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.
652+
) -> StatusMessageWatcherAsync:
653+
"""Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs.
654654
655-
`StatusMessageRedirector` can be directly called or used as a context manager.
655+
`StatusMessageWatcher` can be directly called or used as a context manager.
656656
657657
Args:
658658
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
659659
created.
660660
check_period: The period with which the status message will be polled.
661661
662662
Returns:
663-
`StatusMessageRedirector` instance for redirected logs.
663+
`StatusMessageWatcher` instance.
664664
"""
665665
run_data = await self.get()
666-
run_id = run_data.get('id', '') if run_data else ''
666+
run_id = f'runId:{run_data.get("id", "")}' if run_data else ''
667667

668668
actor_id = run_data.get('actId', '') if run_data else ''
669669
actor_data = await self.root_client.actor(actor_id=actor_id).get() or {}
@@ -673,4 +673,4 @@ async def get_status_message_redirector(
673673
name = '-'.join(part for part in (actor_name, run_id) if part)
674674
to_logger = create_redirect_logger(f'apify.{name}')
675675

676-
return StatusMessageRedirectorAsync(run_client=self, to_logger=to_logger, check_period=check_period)
676+
return StatusMessageWatcherAsync(run_client=self, to_logger=to_logger, check_period=check_period)

tests/unit/test_logging.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from apify_client import ApifyClient, ApifyClientAsync
1616
from apify_client._logging import RedirectLogFormatter
17-
from apify_client.clients.resource_clients.log import StatusMessageRedirector, StreamedLog
17+
from apify_client.clients.resource_clients.log import StatusMessageWatcher, StreamedLog
1818

1919
_MOCKED_API_URL = 'https://example.com'
2020
_MOCKED_RUN_ID = 'mocked_run_id'
@@ -165,17 +165,17 @@ def close(self) -> None:
165165
def propagate_stream_logs() -> None:
166166
# Enable propagation of logs to the caplog fixture
167167
StreamedLog._force_propagate = True
168-
StatusMessageRedirector._force_propagate = True
168+
StatusMessageWatcher._force_propagate = True
169169
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
170170

171171

172172
@pytest.fixture
173173
def reduce_final_timeout_for_status_message_redirector() -> None:
174-
"""Reduce timeout used by the `StatusMessageRedirector`
174+
"""Reduce timeout used by the `StatusMessageWatcher`
175175
176176
This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests.
177177
"""
178-
StatusMessageRedirector._final_sleep_time_s = 2
178+
StatusMessageWatcher._final_sleep_time_s = 2
179179

180180

181181
@pytest.mark.parametrize(
@@ -265,7 +265,7 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
265265
"""Test that logs are redirected correctly to the default logger.
266266
267267
Caplog contains logs before formatting, so formatting is not included in the test expectations."""
268-
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
268+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-runId:{_MOCKED_RUN_ID}'
269269
logger = logging.getLogger(logger_name)
270270
actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID)
271271

@@ -293,7 +293,7 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
293293
"""Test that logs are redirected correctly to the default logger.
294294
295295
Caplog contains logs before formatting, so formatting is not included in the test expectations."""
296-
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
296+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-runId:{_MOCKED_RUN_ID}'
297297
logger = logging.getLogger(logger_name)
298298
actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID)
299299

@@ -397,7 +397,7 @@ async def test_redirect_status_message_async(
397397

398398
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
399399

400-
status_message_redirector = await run_client.get_status_message_redirector(check_period=timedelta(seconds=0))
400+
status_message_redirector = await run_client.get_status_message_watcher(check_period=timedelta(seconds=0))
401401
with caplog.at_level(logging.DEBUG, logger=logger_name):
402402
async with status_message_redirector:
403403
# Do stuff while the status from the other Actor is being redirected to the logs.
@@ -422,7 +422,7 @@ def test_redirect_status_message_sync(
422422

423423
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
424424

425-
status_message_redirector = run_client.get_status_message_redirector(check_period=timedelta(seconds=0))
425+
status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0))
426426
with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector:
427427
# Do stuff while the status from the other Actor is being redirected to the logs.
428428
time.sleep(3)

0 commit comments

Comments
 (0)