From cc20641dedccf0a01fb277494fb3c9a225ac650c Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 00:34:49 +0000 Subject: [PATCH 1/8] change tests from respx mocks to pytest-httpserver --- pyproject.toml | 1 + tests/integration/__init__.py | 0 tests/unit/__init__.py | 0 tests/unit/conftest.py | 31 ++ tests/unit/logging_old.py | 437 ++++++++++++++++++++++++ tests/unit/test_client_errors.py | 32 +- tests/unit/test_client_request_queue.py | 46 +-- tests/unit/test_client_timeouts.py | 95 ++++-- tests/unit/test_logging.py | 280 +++++++-------- uv.lock | 28 +- 10 files changed, 732 insertions(+), 218 deletions(-) create mode 100644 tests/integration/__init__.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/conftest.py create mode 100644 tests/unit/logging_old.py diff --git a/pyproject.toml b/pyproject.toml index 9d429449..5309aa78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dev = [ "pytest-timeout>=2.4.0", "pytest-xdist~=3.8.0", "pytest~=8.4.0", + "pytest-httpserver>=1.1.3", "redbaron~=0.9.0", "respx~=0.22.0", "ruff~=0.12.0", diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000..aea69820 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,31 @@ +from collections.abc import Iterable, Iterator +from logging import getLogger + +import pytest +from pytest_httpserver import HTTPServer + + +@pytest.fixture(scope='session') +def make_httpserver() -> Iterable[HTTPServer]: + werkzeug_logger = getLogger('werkzeug') + werkzeug_logger.disabled = True + + server = HTTPServer(threaded=True) + server.start() + yield server + server.clear() # type: ignore[no-untyped-call] + if server.is_running(): + server.stop() # type: ignore[no-untyped-call] + + +@pytest.fixture(scope='session') +def httpserver(make_httpserver: HTTPServer) -> HTTPServer: + return make_httpserver + + +@pytest.fixture +def patch_basic_url(httpserver: HTTPServer, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: + server_url = httpserver.url_for('/').removesuffix('/') + monkeypatch.setattr('apify_client.client.DEFAULT_API_URL', server_url) + yield + monkeypatch.undo() diff --git a/tests/unit/logging_old.py b/tests/unit/logging_old.py new file mode 100644 index 00000000..19dd6ede --- /dev/null +++ b/tests/unit/logging_old.py @@ -0,0 +1,437 @@ +import asyncio +import json +import logging +import threading +import time +from collections.abc import AsyncIterator, Generator, Iterator +from datetime import datetime, timedelta +from unittest.mock import patch + +import httpx +import pytest +import respx +from _pytest.logging import LogCaptureFixture +from apify_shared.consts import ActorJobStatus + +from apify_client import ApifyClient, ApifyClientAsync +from apify_client._logging import RedirectLogFormatter +from apify_client.clients.resource_clients.log import StatusMessageWatcher, StreamedLog + +_MOCKED_API_URL = '/logging' +_MOCKED_RUN_ID = 'mocked_run_id' +_MOCKED_ACTOR_NAME = 'mocked_actor_name' +_MOCKED_ACTOR_ID = 'mocked_actor_id' +_MOCKED_ACTOR_LOGS = ( + b'2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n' + b'2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n' + b'2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.\n', # Several logs merged into one chunk + b'2025-05-13T07:26:14.132Z [apify] DEBUG \xc3', # Chunked log split in the middle of the multibyte character + b'\xa1\n', # part 2 + b'2025-05-13T07:24:14.132Z [apify] INFO multiline \n log\n', + b'2025-05-13T07:25:14.132Z [apify] WARNING some warning\n', + b'2025-05-13T07:26:14.132Z [apify] DEBUG c\n', + b'2025-05-13T0', # Chunked log that got split in the marker + b'7:26:14.132Z [apify] DEBUG d\n' # part 2 + b'2025-05-13T07:27:14.132Z [apify] DEB', # Chunked log that got split outside of marker + b'UG e\n', # part 2 + # Already redirected message + b'2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> 2025-05-13T07:27:14.132Z ACTOR:...\n', +) +_EXISTING_LOGS_BEFORE_REDIRECT_ATTACH = 3 + +_EXPECTED_MESSAGES_AND_LEVELS = ( + ('2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.', logging.INFO), + ('2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.', logging.INFO), + ('2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', logging.INFO), + ('2025-05-13T07:26:14.132Z [apify] DEBUG á', logging.DEBUG), + ('2025-05-13T07:24:14.132Z [apify] INFO multiline \n log', logging.INFO), + ('2025-05-13T07:25:14.132Z [apify] WARNING some warning', logging.WARNING), + ('2025-05-13T07:26:14.132Z [apify] DEBUG c', logging.DEBUG), + ('2025-05-13T07:26:14.132Z [apify] DEBUG d', logging.DEBUG), + ('2025-05-13T07:27:14.132Z [apify] DEBUG e', logging.DEBUG), + ( + '2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> ' + '2025-05-13T07:27:14.132Z ACTOR:...', + logging.INFO, + ), +) + +_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES = ( + ('Status: RUNNING, Message: Initial message', logging.INFO), + *_EXPECTED_MESSAGES_AND_LEVELS, + ('Status: RUNNING, Message: Another message', logging.INFO), + ('Status: SUCCEEDED, Message: Final message', logging.INFO), +) + + +@pytest.fixture +def mock_api() -> None: + test_server_lock = threading.Lock() + + def get_responses() -> Generator[httpx.Response, None, None]: + """Simulate actor run that changes status 3 times.""" + for _ in range(5): + yield httpx.Response( + content=json.dumps( + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.RUNNING, + 'statusMessage': 'Initial message', + 'isStatusMessageTerminal': False, + } + } + ), + status_code=200, + ) + for _ in range(5): + yield httpx.Response( + content=json.dumps( + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.RUNNING, + 'statusMessage': 'Another message', + 'isStatusMessageTerminal': False, + } + } + ), + status_code=200, + ) + while True: + yield httpx.Response( + content=json.dumps( + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.SUCCEEDED, + 'statusMessage': 'Final message', + 'isStatusMessageTerminal': True, + } + } + ), + status_code=200, + ) + + responses = get_responses() + + def actor_runs_side_effect(_: httpx.Request) -> httpx.Response: + test_server_lock.acquire() + # To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing` + response = next(responses) + test_server_lock.release_lock() + return response + + respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect) + + respx.get(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}').mock( + return_value=httpx.Response(content=json.dumps({'data': {'name': _MOCKED_ACTOR_NAME}}), status_code=200) + ) + + respx.post(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}/runs').mock( + return_value=httpx.Response(content=json.dumps({'data': {'id': _MOCKED_RUN_ID}}), status_code=200) + ) + + +@pytest.fixture +def mock_api_async(mock_api: None) -> None: # noqa: ARG001, fixture + class AsyncByteStream(httpx._types.AsyncByteStream): + async def __aiter__(self) -> AsyncIterator[bytes]: + for i in _MOCKED_ACTOR_LOGS: + yield i + await asyncio.sleep(0.01) + + async def aclose(self) -> None: + pass + + respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( + return_value=httpx.Response(stream=AsyncByteStream(), status_code=200) + ) + + +@pytest.fixture +def mock_api_sync(mock_api: None) -> None: # noqa: ARG001, fixture + class SyncByteStream(httpx._types.SyncByteStream): + def __iter__(self) -> Iterator[bytes]: + for i in _MOCKED_ACTOR_LOGS: + yield i + time.sleep(0.01) + + def close(self) -> None: + pass + + respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( + return_value=httpx.Response(stream=SyncByteStream(), status_code=200) + ) + + +@pytest.fixture +def propagate_stream_logs() -> None: + # Enable propagation of logs to the caplog fixture + StreamedLog._force_propagate = True + StatusMessageWatcher._force_propagate = True + logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG) + + +@pytest.fixture +def reduce_final_timeout_for_status_message_redirector() -> None: + """Reduce timeout used by the `StatusMessageWatcher` + + This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests. + """ + StatusMessageWatcher._final_sleep_time_s = 2 + + +@pytest.mark.parametrize( + ('log_from_start', 'expected_log_count'), + [ + (True, len(_EXPECTED_MESSAGES_AND_LEVELS)), + (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), + ], +) +@respx.mock +async def test_redirected_logs_async( + *, + caplog: LogCaptureFixture, + mock_api_async: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + log_from_start: bool, + expected_log_count: int, +) -> None: + """Test that redirected logs are formatted correctly.""" + + run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: + # Mock `now()` so that it has timestamp bigger than the first 3 logs + mocked_datetime.now.return_value = datetime.fromisoformat('2025-05-13T07:24:14.132+00:00') + streamed_log = await run_client.get_streamed_log(from_start=log_from_start) + + # Set `propagate=True` during the tests, so that caplog can see the logs.. + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with streamed_log: + # Do stuff while the log from the other Actor is being redirected to the logs. + await asyncio.sleep(2) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] + ) + + +@pytest.mark.parametrize( + ('log_from_start', 'expected_log_count'), + [ + (True, len(_EXPECTED_MESSAGES_AND_LEVELS)), + (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), + ], +) +@respx.mock +def test_redirected_logs_sync( + *, + caplog: LogCaptureFixture, + mock_api_sync: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + log_from_start: bool, + expected_log_count: int, +) -> None: + """Test that redirected logs are formatted correctly.""" + + run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: + # Mock `now()` so that it has timestamp bigger than the first 3 logs + mocked_datetime.now.return_value = datetime.fromisoformat('2025-05-13T07:24:14.132+00:00') + streamed_log = run_client.get_streamed_log(from_start=log_from_start) + + # Set `propagate=True` during the tests, so that caplog can see the logs.. + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: + # Do stuff while the log from the other Actor is being redirected to the logs. + time.sleep(2) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] + ) + + +@respx.mock +async def test_actor_call_redirect_logs_to_default_logger_async( + caplog: LogCaptureFixture, + mock_api_async: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test that logs are redirected correctly to the default logger. + + Caplog contains logs before formatting, so formatting is not included in the test expectations.""" + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' + logger = logging.getLogger(logger_name) + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + await actor_client.call() + + # Ensure expected handler and formater + assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) + assert isinstance(logger.handlers[0], logging.StreamHandler) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) + + +@respx.mock +def test_actor_call_redirect_logs_to_default_logger_sync( + caplog: LogCaptureFixture, + mock_api_sync: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test that logs are redirected correctly to the default logger. + + Caplog contains logs before formatting, so formatting is not included in the test expectations.""" + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' + logger = logging.getLogger(logger_name) + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + actor_client.call() + + # Ensure expected handler and formater + assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) + assert isinstance(logger.handlers[0], logging.StreamHandler) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) + + +@respx.mock +async def test_actor_call_no_redirect_logs_async( + caplog: LogCaptureFixture, + mock_api_async: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture +) -> None: + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + await actor_client.call(logger=None) + + assert len(caplog.records) == 0 + + +@respx.mock +def test_actor_call_no_redirect_logs_sync( + caplog: LogCaptureFixture, + mock_api_sync: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture +) -> None: + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + actor_client.call(logger=None) + + assert len(caplog.records) == 0 + + +@respx.mock +async def test_actor_call_redirect_logs_to_custom_logger_async( + caplog: LogCaptureFixture, + mock_api_async: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test that logs are redirected correctly to the custom logger.""" + logger_name = 'custom_logger' + logger = logging.getLogger(logger_name) + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + await actor_client.call(logger=logger) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) + + +@respx.mock +def test_actor_call_redirect_logs_to_custom_logger_sync( + caplog: LogCaptureFixture, + mock_api_sync: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test that logs are redirected correctly to the custom logger.""" + logger_name = 'custom_logger' + logger = logging.getLogger(logger_name) + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + + with caplog.at_level(logging.DEBUG, logger=logger_name): + actor_client.call(logger=logger) + + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) + + +@respx.mock +async def test_redirect_status_message_async( + *, + caplog: LogCaptureFixture, + mock_api: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test redirected status and status messages.""" + + run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + status_message_redirector = await run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with status_message_redirector: + # Do stuff while the status from the other Actor is being redirected to the logs. + await asyncio.sleep(3) + + assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' + assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' + assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' + + +@respx.mock +def test_redirect_status_message_sync( + *, + caplog: LogCaptureFixture, + mock_api: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test redirected status and status messages.""" + + run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) + with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: + # Do stuff while the status from the other Actor is being redirected to the logs. + time.sleep(3) + + assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' + assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' + assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' diff --git a/tests/unit/test_client_errors.py b/tests/unit/test_client_errors.py index 71d0e4ea..fe127509 100644 --- a/tests/unit/test_client_errors.py +++ b/tests/unit/test_client_errors.py @@ -1,14 +1,16 @@ -import json -from collections.abc import Generator +from __future__ import annotations + +from typing import TYPE_CHECKING -import httpx import pytest -import respx from apify_client._errors import ApifyApiError from apify_client._http_client import HTTPClient, HTTPClientAsync -_TEST_URL = 'http://example.com' +if TYPE_CHECKING: + from pytest_httpserver import HTTPServer + +_TEST_PATH = '/errors' _EXPECTED_MESSAGE = 'some_message' _EXPECTED_TYPE = 'some_type' _EXPECTED_DATA = { @@ -16,34 +18,32 @@ } -@pytest.fixture(autouse=True) -def mocked_response() -> Generator[respx.MockRouter]: - response_content = json.dumps( - {'error': {'message': _EXPECTED_MESSAGE, 'type': _EXPECTED_TYPE, 'data': _EXPECTED_DATA}} +@pytest.fixture +def test_endpoint(httpserver: HTTPServer) -> str: + httpserver.expect_request(_TEST_PATH).respond_with_json( + {'error': {'message': _EXPECTED_MESSAGE, 'type': _EXPECTED_TYPE, 'data': _EXPECTED_DATA}}, status=400 ) - with respx.mock() as respx_mock: - respx_mock.get(_TEST_URL).mock(return_value=httpx.Response(400, content=response_content)) - yield respx_mock + return str(httpserver.url_for(_TEST_PATH)) -def test_client_apify_api_error_with_data() -> None: +def test_client_apify_api_error_with_data(test_endpoint: str) -> None: """Test that client correctly throws ApifyApiError with error data from response.""" client = HTTPClient() with pytest.raises(ApifyApiError) as e: - client.call(method='GET', url=_TEST_URL) + client.call(method='GET', url=test_endpoint) assert e.value.message == _EXPECTED_MESSAGE assert e.value.type == _EXPECTED_TYPE assert e.value.data == _EXPECTED_DATA -async def test_async_client_apify_api_error_with_data() -> None: +async def test_async_client_apify_api_error_with_data(test_endpoint: str) -> None: """Test that async client correctly throws ApifyApiError with error data from response.""" client = HTTPClientAsync() with pytest.raises(ApifyApiError) as e: - await client.call(method='GET', url=_TEST_URL) + await client.call(method='GET', url=test_endpoint) assert e.value.message == _EXPECTED_MESSAGE assert e.value.type == _EXPECTED_TYPE diff --git a/tests/unit/test_client_request_queue.py b/tests/unit/test_client_request_queue.py index 8e339305..ec6bf606 100644 --- a/tests/unit/test_client_request_queue.py +++ b/tests/unit/test_client_request_queue.py @@ -1,9 +1,16 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING + import pytest -import respx import apify_client from apify_client import ApifyClient, ApifyClientAsync +if TYPE_CHECKING: + from pytest_httpserver import HTTPServer + _PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT = """{ "data": { "processedRequests": [ @@ -25,12 +32,11 @@ }""" -@respx.mock -async def test_batch_not_processed_raises_exception_async() -> None: +@pytest.mark.usefixtures('patch_basic_url') +async def test_batch_not_processed_raises_exception_async(httpserver: HTTPServer) -> None: """Test that client exceptions are not silently ignored""" - client = ApifyClientAsync(token='') - - respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) + client = ApifyClientAsync(token='placeholder_token') + httpserver.expect_oneshot_request(re.compile(r'.*'), method='POST').respond_with_data(status=401) requests = [ {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, @@ -41,12 +47,12 @@ async def test_batch_not_processed_raises_exception_async() -> None: await rq_client.batch_add_requests(requests=requests) -@respx.mock -async def test_batch_processed_partially_async() -> None: - client = ApifyClientAsync(token='') +@pytest.mark.usefixtures('patch_basic_url') +async def test_batch_processed_partially_async(httpserver: HTTPServer) -> None: + client = ApifyClientAsync(token='placeholder_token') - respx.route(method='POST', host='api.apify.com').mock( - return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT) + httpserver.expect_oneshot_request(re.compile(r'.*'), method='POST').respond_with_data( + status=200, response_data=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT ) requests = [ {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, @@ -59,12 +65,12 @@ async def test_batch_processed_partially_async() -> None: assert response['unprocessedRequests'] == [requests[1]] -@respx.mock -def test_batch_not_processed_raises_exception_sync() -> None: +@pytest.mark.usefixtures('patch_basic_url') +def test_batch_not_processed_raises_exception_sync(httpserver: HTTPServer) -> None: """Test that client exceptions are not silently ignored""" - client = ApifyClient(token='') + client = ApifyClient(token='placeholder_token') - respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) + httpserver.expect_oneshot_request(re.compile(r'.*'), method='POST').respond_with_data(status=401) requests = [ {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, @@ -75,12 +81,12 @@ def test_batch_not_processed_raises_exception_sync() -> None: rq_client.batch_add_requests(requests=requests) -@respx.mock -async def test_batch_processed_partially_sync() -> None: - client = ApifyClient(token='') +@pytest.mark.usefixtures('patch_basic_url') +async def test_batch_processed_partially_sync(httpserver: HTTPServer) -> None: + client = ApifyClient(token='placeholder_token') - respx.route(method='POST', host='api.apify.com').mock( - return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT) + httpserver.expect_oneshot_request(re.compile(r'.*'), method='POST').respond_with_data( + status=200, response_data=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT ) requests = [ {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index 82362644..3259c977 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -1,10 +1,13 @@ from __future__ import annotations +import time from functools import partial +from typing import TYPE_CHECKING +from unittest.mock import Mock import pytest import respx -from httpx import Request, Response, TimeoutException +from werkzeug import Response as WerkzeugResponse from apify_client import ApifyClient from apify_client._http_client import HTTPClient, HTTPClientAsync @@ -13,13 +16,17 @@ from apify_client.clients.resource_clients import dataset, request_queue from apify_client.clients.resource_clients import key_value_store as kvs +if TYPE_CHECKING: + from httpx import Request, Response + from pytest_httpserver import HTTPServer + from werkzeug import Request as WerkzeugRequest + class EndOfTestError(Exception): """Custom exception that is raised after the relevant part of the code is executed to stop the test.""" -@respx.mock -async def test_dynamic_timeout_async_client() -> None: +async def test_dynamic_timeout_async_client(httpserver: HTTPServer) -> None: """Tests timeout values for request with retriable errors. Values should increase with each attempt, starting from initial call value and bounded by the client timeout value. @@ -28,27 +35,35 @@ async def test_dynamic_timeout_async_client() -> None: call_timeout = 1 client_timeout = 5 expected_timeouts = iter((call_timeout, 2, 4, client_timeout)) + retry_counter_mock = Mock() + + def slow_handler(_request: WerkzeugRequest) -> WerkzeugResponse: + timeout = next(expected_timeouts) + should_raise = next(should_raise_error) + # Counter for retries + retry_counter_mock() + + if should_raise: + # We expect longer than the client is willing to wait. This will cause a timeout on the client side. + time.sleep(timeout + 0.02) - def check_timeout(request: Request) -> Response: - expected_timeout = next(expected_timeouts) - assert request.extensions['timeout'] == { - 'connect': expected_timeout, - 'pool': expected_timeout, - 'read': expected_timeout, - 'write': expected_timeout, - } - if next(should_raise_error): - raise TimeoutException('This error can be retried') - return Response(200) - - respx.get('https://example.com').mock(side_effect=check_timeout) - await HTTPClientAsync(timeout_secs=client_timeout).call( - method='GET', url='https://example.com', timeout_secs=call_timeout + return WerkzeugResponse('200 OK') + + httpserver.expect_request('/async_timeout', method='GET').respond_with_handler(slow_handler) + + server_url = str(httpserver.url_for('/async_timeout')) + response = await HTTPClientAsync(timeout_secs=client_timeout).call( + method='GET', url=server_url, timeout_secs=call_timeout ) + # Check that the retry counter was called the expected number of times + # (4 times: 3 retries + 1 final successful call) + assert retry_counter_mock.call_count == 4 + # Check that the response is successful + assert response.status_code == 200 -@respx.mock -def test_dynamic_timeout_sync_client() -> None: + +def test_dynamic_timeout_sync_client(httpserver: HTTPServer) -> None: """Tests timeout values for request with retriable errors. Values should increase with each attempt, starting from initial call value and bounded by the client timeout value. @@ -57,21 +72,31 @@ def test_dynamic_timeout_sync_client() -> None: call_timeout = 1 client_timeout = 5 expected_timeouts = iter((call_timeout, 2, 4, client_timeout)) + retry_counter_mock = Mock() + + def slow_handler(_request: WerkzeugRequest) -> WerkzeugResponse: + timeout = next(expected_timeouts) + should_raise = next(should_raise_error) + # Counter for retries + retry_counter_mock() + + if should_raise: + # We expect longer than the client is willing to wait. This will cause a timeout on the client side. + time.sleep(timeout + 0.02) + + return WerkzeugResponse('200 OK') + + httpserver.expect_request('/sync_timeout', method='GET').respond_with_handler(slow_handler) + + server_url = str(httpserver.url_for('/sync_timeout')) - def check_timeout(request: Request) -> Response: - expected_timeout = next(expected_timeouts) - assert request.extensions['timeout'] == { - 'connect': expected_timeout, - 'pool': expected_timeout, - 'read': expected_timeout, - 'write': expected_timeout, - } - if next(should_raise_error): - raise TimeoutException('This error can be retired') - return Response(200) + response = HTTPClient(timeout_secs=client_timeout).call(method='GET', url=server_url, timeout_secs=call_timeout) - respx.get('https://example.com').mock(side_effect=check_timeout) - HTTPClient(timeout_secs=client_timeout).call(method='GET', url='https://example.com', timeout_secs=call_timeout) + # Check that the retry counter was called the expected number of times + # (4 times: 3 retries + 1 final successful call) + assert retry_counter_mock.call_count == 4 + # Check that the response is successful + assert response.status_code == 200 def assert_timeout(expected_timeout: int, request: Request) -> Response: @@ -122,6 +147,8 @@ def assert_timeout(expected_timeout: int, request: Request) -> Response: ] +# This test will probably need to be reworked or skipped when switching to `impit`. +# Without the mock library, it's difficult to reproduce, maybe with monkeypatch? @pytest.mark.parametrize( ('client_type', 'method', 'expected_timeout', 'kwargs'), _timeout_params, @@ -139,6 +166,8 @@ def test_specific_timeouts_for_specific_endpoints_sync( getattr(client, method)(**kwargs) +# This test will probably need to be reworked or skipped when switching to `impit`. +# Without the mock library, it's difficult to reproduce, maybe with monkeypatch? @pytest.mark.parametrize( ('client_type', 'method', 'expected_timeout', 'kwargs'), _timeout_params, diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 636f0fc4..52d05f06 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -1,23 +1,27 @@ +from __future__ import annotations + import asyncio import json import logging -import threading import time -from collections.abc import AsyncIterator, Generator, Iterator from datetime import datetime, timedelta +from typing import TYPE_CHECKING from unittest.mock import patch -import httpx import pytest -import respx -from _pytest.logging import LogCaptureFixture from apify_shared.consts import ActorJobStatus +from werkzeug import Request, Response from apify_client import ApifyClient, ApifyClientAsync from apify_client._logging import RedirectLogFormatter from apify_client.clients.resource_clients.log import StatusMessageWatcher, StreamedLog -_MOCKED_API_URL = 'https://example.com' +if TYPE_CHECKING: + from collections.abc import Iterator + + from _pytest.logging import LogCaptureFixture + from pytest_httpserver import HTTPServer + _MOCKED_RUN_ID = 'mocked_run_id' _MOCKED_ACTOR_NAME = 'mocked_actor_name' _MOCKED_ACTOR_ID = 'mocked_actor_id' @@ -64,113 +68,90 @@ ) -@pytest.fixture -def mock_api() -> None: - test_server_lock = threading.Lock() - - def get_responses() -> Generator[httpx.Response, None, None]: - """Simulate actor run that changes status 3 times.""" - for _ in range(5): - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.RUNNING, - 'statusMessage': 'Initial message', - 'isStatusMessageTerminal': False, - } - } - ), - status_code=200, - ) - for _ in range(5): - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.RUNNING, - 'statusMessage': 'Another message', - 'isStatusMessageTerminal': False, - } - } - ), - status_code=200, - ) - while True: - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.SUCCEEDED, - 'statusMessage': 'Final message', - 'isStatusMessageTerminal': True, - } - } - ), - status_code=200, - ) - - responses = get_responses() - - def actor_runs_side_effect(_: httpx.Request) -> httpx.Response: - test_server_lock.acquire() - # To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing` - response = next(responses) - test_server_lock.release_lock() - return response - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect) - - respx.get(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}').mock( - return_value=httpx.Response(content=json.dumps({'data': {'name': _MOCKED_ACTOR_NAME}}), status_code=200) - ) +class StatusResponseGenerator: + """Generator for actor run status responses to simulate changing status over time.""" - respx.post(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}/runs').mock( - return_value=httpx.Response(content=json.dumps({'data': {'id': _MOCKED_RUN_ID}}), status_code=200) - ) + def __init__(self) -> None: + self.current_status_index = 0 + self.requests_for_current_status = 0 + self.min_requests_per_status = 5 + + self.statuses = [ + ('Initial message', ActorJobStatus.RUNNING, False), + ('Another message', ActorJobStatus.RUNNING, False), + ('Final message', ActorJobStatus.SUCCEEDED, True), + ] + + def get_response(self, _request: Request) -> Response: + if self.current_status_index < len(self.statuses): + message, status, is_terminal = self.statuses[self.current_status_index] + else: + message, status, is_terminal = self.statuses[-1] + + self.requests_for_current_status += 1 + + if ( + self.requests_for_current_status >= self.min_requests_per_status + and self.current_status_index < len(self.statuses) - 1 + and not is_terminal + ): + self.current_status_index += 1 + self.requests_for_current_status = 0 + + status_data = { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': status, + 'statusMessage': message, + 'isStatusMessageTerminal': is_terminal, + } + } + + return Response(response=json.dumps(status_data), status=200, mimetype='application/json') + + +def _streaming_log_handler(_request: Request) -> Response: + """Handler for streaming log requests.""" + + def generate_logs() -> Iterator[bytes]: + for chunk in _MOCKED_ACTOR_LOGS: + yield chunk + time.sleep(0.01) + + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') @pytest.fixture -def mock_api_async(mock_api: None) -> None: # noqa: ARG001, fixture - class AsyncByteStream(httpx._types.AsyncByteStream): - async def __aiter__(self) -> AsyncIterator[bytes]: - for i in _MOCKED_ACTOR_LOGS: - yield i - await asyncio.sleep(0.01) - - async def aclose(self) -> None: - pass - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( - return_value=httpx.Response(stream=AsyncByteStream(), status_code=200) +def mock_api(httpserver: HTTPServer) -> None: + """Set up HTTP server with mocked API endpoints.""" + httpserver.clear() # type: ignore[no-untyped-call] + + status_generator = StatusResponseGenerator() + + # Add actor run status endpoint + httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_handler( + status_generator.get_response ) + # Add actor info endpoint + httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json( + {'data': {'name': _MOCKED_ACTOR_NAME}} + ) -@pytest.fixture -def mock_api_sync(mock_api: None) -> None: # noqa: ARG001, fixture - class SyncByteStream(httpx._types.SyncByteStream): - def __iter__(self) -> Iterator[bytes]: - for i in _MOCKED_ACTOR_LOGS: - yield i - time.sleep(0.01) - - def close(self) -> None: - pass - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( - return_value=httpx.Response(stream=SyncByteStream(), status_code=200) + # Add actor run creation endpoint + httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}/runs', method='POST').respond_with_json( + {'data': {'id': _MOCKED_RUN_ID}} ) + httpserver.expect_request( + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=1&raw=1' + ).respond_with_handler(_streaming_log_handler) + @pytest.fixture def propagate_stream_logs() -> None: - # Enable propagation of logs to the caplog fixture + """Enable propagation of logs to the caplog fixture.""" StreamedLog._force_propagate = True StatusMessageWatcher._force_propagate = True logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG) @@ -178,7 +159,7 @@ def propagate_stream_logs() -> None: @pytest.fixture def reduce_final_timeout_for_status_message_redirector() -> None: - """Reduce timeout used by the `StatusMessageWatcher` + """Reduce timeout used by the `StatusMessageWatcher`. This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests. """ @@ -192,18 +173,19 @@ def reduce_final_timeout_for_status_message_redirector() -> None: (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), ], ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs') async def test_redirected_logs_async( *, caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture log_from_start: bool, expected_log_count: int, + httpserver: HTTPServer, ) -> None: """Test that redirected logs are formatted correctly.""" - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + api_url = httpserver.url_for('/').removesuffix('/') + + run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: # Mock `now()` so that it has timestamp bigger than the first 3 logs @@ -231,18 +213,19 @@ async def test_redirected_logs_async( (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), ], ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs') def test_redirected_logs_sync( *, caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture log_from_start: bool, expected_log_count: int, + httpserver: HTTPServer, ) -> None: """Test that redirected logs are formatted correctly.""" - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + api_url = httpserver.url_for('/').removesuffix('/') + + run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: # Mock `now()` so that it has timestamp bigger than the first 3 logs @@ -262,19 +245,19 @@ def test_redirected_logs_sync( ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') async def test_actor_call_redirect_logs_to_default_logger_async( caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test that logs are redirected correctly to the default logger. Caplog contains logs before formatting, so formatting is not included in the test expectations.""" + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' logger = logging.getLogger(logger_name) - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): await actor_client.call() @@ -289,19 +272,19 @@ async def test_actor_call_redirect_logs_to_default_logger_async( ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') def test_actor_call_redirect_logs_to_default_logger_sync( caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test that logs are redirected correctly to the default logger. Caplog contains logs before formatting, so formatting is not included in the test expectations.""" + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' logger = logging.getLogger(logger_name) - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): actor_client.call() @@ -316,14 +299,15 @@ def test_actor_call_redirect_logs_to_default_logger_sync( ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs') async def test_actor_call_no_redirect_logs_async( caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): await actor_client.call(logger=None) @@ -331,14 +315,15 @@ async def test_actor_call_no_redirect_logs_async( assert len(caplog.records) == 0 -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs') def test_actor_call_no_redirect_logs_sync( caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): actor_client.call(logger=None) @@ -346,17 +331,17 @@ def test_actor_call_no_redirect_logs_sync( assert len(caplog.records) == 0 -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') async def test_actor_call_redirect_logs_to_custom_logger_async( caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test that logs are redirected correctly to the custom logger.""" + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = 'custom_logger' logger = logging.getLogger(logger_name) - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): await actor_client.call(logger=logger) @@ -367,17 +352,17 @@ async def test_actor_call_redirect_logs_to_custom_logger_async( ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') def test_actor_call_redirect_logs_to_custom_logger_sync( caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test that logs are redirected correctly to the custom logger.""" + api_url = httpserver.url_for('/').removesuffix('/') + logger_name = 'custom_logger' logger = logging.getLogger(logger_name) - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=api_url).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): actor_client.call(logger=logger) @@ -388,17 +373,16 @@ def test_actor_call_redirect_logs_to_custom_logger_sync( ) -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') async def test_redirect_status_message_async( *, caplog: LogCaptureFixture, - mock_api: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test redirected status and status messages.""" + api_url = httpserver.url_for('/').removesuffix('/') - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' @@ -413,17 +397,17 @@ async def test_redirect_status_message_async( assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' -@respx.mock +@pytest.mark.usefixtures('mock_api', 'propagate_stream_logs', 'reduce_final_timeout_for_status_message_redirector') def test_redirect_status_message_sync( *, caplog: LogCaptureFixture, - mock_api: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture + httpserver: HTTPServer, ) -> None: """Test redirected status and status messages.""" - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + api_url = httpserver.url_for('/').removesuffix('/') + + run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' diff --git a/uv.lock b/uv.lock index f00a008e..f0e88fa6 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.9" resolution-markers = [ "python_full_version >= '3.10'", @@ -42,6 +42,7 @@ dev = [ { name = "pytest" }, { name = "pytest-asyncio" }, { name = "pytest-cov" }, + { name = "pytest-httpserver" }, { name = "pytest-timeout" }, { name = "pytest-xdist" }, { name = "redbaron" }, @@ -69,6 +70,7 @@ dev = [ { name = "pytest", specifier = "~=8.4.0" }, { name = "pytest-asyncio", specifier = "~=1.1.0" }, { name = "pytest-cov", specifier = "~=6.2.0" }, + { name = "pytest-httpserver", specifier = ">=1.1.3" }, { name = "pytest-timeout", specifier = ">=2.4.0" }, { name = "pytest-xdist", specifier = "~=3.8.0" }, { name = "redbaron", specifier = "~=0.9.0" }, @@ -892,6 +894,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bc/16/4ea354101abb1287856baa4af2732be351c7bee728065aed451b678153fd/pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5", size = 24644, upload-time = "2025-06-12T10:47:45.932Z" }, ] +[[package]] +name = "pytest-httpserver" +version = "1.1.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "werkzeug" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f1/d8/def15ba33bd696dd72dd4562a5287c0cba4d18a591eeb82e0b08ab385afc/pytest_httpserver-1.1.3.tar.gz", hash = "sha256:af819d6b533f84b4680b9416a5b3f67f1df3701f1da54924afd4d6e4ba5917ec", size = 68870, upload-time = "2025-04-10T08:17:15.6Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0d/d2/dfc2f25f3905921c2743c300a48d9494d29032f1389fc142e718d6978fb2/pytest_httpserver-1.1.3-py3-none-any.whl", hash = "sha256:5f84757810233e19e2bb5287f3826a71c97a3740abe3a363af9155c0f82fdbb9", size = 21000, upload-time = "2025-04-10T08:17:13.906Z" }, +] + [[package]] name = "pytest-timeout" version = "2.4.0" @@ -1202,6 +1216,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067, upload-time = "2024-11-01T14:07:11.845Z" }, ] +[[package]] +name = "werkzeug" +version = "3.1.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markupsafe" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9f/69/83029f1f6300c5fb2471d621ab06f6ec6b3324685a2ce0f9777fd4a8b71e/werkzeug-3.1.3.tar.gz", hash = "sha256:60723ce945c19328679790e3282cc758aa4a6040e4bb330f53d30fa546d44746", size = 806925, upload-time = "2024-11-08T15:52:18.093Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/52/24/ab44c871b0f07f491e5d2ad12c9bd7358e527510618cb1b803a88e986db1/werkzeug-3.1.3-py3-none-any.whl", hash = "sha256:54b78bf3716d19a65be4fceccc0d1d7b89e608834989dfae50ea87564639213e", size = 224498, upload-time = "2024-11-08T15:52:16.132Z" }, +] + [[package]] name = "wrapt" version = "1.17.2" From 919d39e5c0b225978a92448462bc75ed586199e3 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 01:07:29 +0000 Subject: [PATCH 2/8] test --- tests/unit/logging_old.py | 437 ------------------------------------- tests/unit/test_logging.py | 11 +- 2 files changed, 9 insertions(+), 439 deletions(-) delete mode 100644 tests/unit/logging_old.py diff --git a/tests/unit/logging_old.py b/tests/unit/logging_old.py deleted file mode 100644 index 19dd6ede..00000000 --- a/tests/unit/logging_old.py +++ /dev/null @@ -1,437 +0,0 @@ -import asyncio -import json -import logging -import threading -import time -from collections.abc import AsyncIterator, Generator, Iterator -from datetime import datetime, timedelta -from unittest.mock import patch - -import httpx -import pytest -import respx -from _pytest.logging import LogCaptureFixture -from apify_shared.consts import ActorJobStatus - -from apify_client import ApifyClient, ApifyClientAsync -from apify_client._logging import RedirectLogFormatter -from apify_client.clients.resource_clients.log import StatusMessageWatcher, StreamedLog - -_MOCKED_API_URL = '/logging' -_MOCKED_RUN_ID = 'mocked_run_id' -_MOCKED_ACTOR_NAME = 'mocked_actor_name' -_MOCKED_ACTOR_ID = 'mocked_actor_id' -_MOCKED_ACTOR_LOGS = ( - b'2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n' - b'2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n' - b'2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.\n', # Several logs merged into one chunk - b'2025-05-13T07:26:14.132Z [apify] DEBUG \xc3', # Chunked log split in the middle of the multibyte character - b'\xa1\n', # part 2 - b'2025-05-13T07:24:14.132Z [apify] INFO multiline \n log\n', - b'2025-05-13T07:25:14.132Z [apify] WARNING some warning\n', - b'2025-05-13T07:26:14.132Z [apify] DEBUG c\n', - b'2025-05-13T0', # Chunked log that got split in the marker - b'7:26:14.132Z [apify] DEBUG d\n' # part 2 - b'2025-05-13T07:27:14.132Z [apify] DEB', # Chunked log that got split outside of marker - b'UG e\n', # part 2 - # Already redirected message - b'2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> 2025-05-13T07:27:14.132Z ACTOR:...\n', -) -_EXISTING_LOGS_BEFORE_REDIRECT_ATTACH = 3 - -_EXPECTED_MESSAGES_AND_LEVELS = ( - ('2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.', logging.INFO), - ('2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.', logging.INFO), - ('2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', logging.INFO), - ('2025-05-13T07:26:14.132Z [apify] DEBUG á', logging.DEBUG), - ('2025-05-13T07:24:14.132Z [apify] INFO multiline \n log', logging.INFO), - ('2025-05-13T07:25:14.132Z [apify] WARNING some warning', logging.WARNING), - ('2025-05-13T07:26:14.132Z [apify] DEBUG c', logging.DEBUG), - ('2025-05-13T07:26:14.132Z [apify] DEBUG d', logging.DEBUG), - ('2025-05-13T07:27:14.132Z [apify] DEBUG e', logging.DEBUG), - ( - '2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> ' - '2025-05-13T07:27:14.132Z ACTOR:...', - logging.INFO, - ), -) - -_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES = ( - ('Status: RUNNING, Message: Initial message', logging.INFO), - *_EXPECTED_MESSAGES_AND_LEVELS, - ('Status: RUNNING, Message: Another message', logging.INFO), - ('Status: SUCCEEDED, Message: Final message', logging.INFO), -) - - -@pytest.fixture -def mock_api() -> None: - test_server_lock = threading.Lock() - - def get_responses() -> Generator[httpx.Response, None, None]: - """Simulate actor run that changes status 3 times.""" - for _ in range(5): - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.RUNNING, - 'statusMessage': 'Initial message', - 'isStatusMessageTerminal': False, - } - } - ), - status_code=200, - ) - for _ in range(5): - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.RUNNING, - 'statusMessage': 'Another message', - 'isStatusMessageTerminal': False, - } - } - ), - status_code=200, - ) - while True: - yield httpx.Response( - content=json.dumps( - { - 'data': { - 'id': _MOCKED_RUN_ID, - 'actId': _MOCKED_ACTOR_ID, - 'status': ActorJobStatus.SUCCEEDED, - 'statusMessage': 'Final message', - 'isStatusMessageTerminal': True, - } - } - ), - status_code=200, - ) - - responses = get_responses() - - def actor_runs_side_effect(_: httpx.Request) -> httpx.Response: - test_server_lock.acquire() - # To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing` - response = next(responses) - test_server_lock.release_lock() - return response - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect) - - respx.get(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}').mock( - return_value=httpx.Response(content=json.dumps({'data': {'name': _MOCKED_ACTOR_NAME}}), status_code=200) - ) - - respx.post(url=f'{_MOCKED_API_URL}/v2/acts/{_MOCKED_ACTOR_ID}/runs').mock( - return_value=httpx.Response(content=json.dumps({'data': {'id': _MOCKED_RUN_ID}}), status_code=200) - ) - - -@pytest.fixture -def mock_api_async(mock_api: None) -> None: # noqa: ARG001, fixture - class AsyncByteStream(httpx._types.AsyncByteStream): - async def __aiter__(self) -> AsyncIterator[bytes]: - for i in _MOCKED_ACTOR_LOGS: - yield i - await asyncio.sleep(0.01) - - async def aclose(self) -> None: - pass - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( - return_value=httpx.Response(stream=AsyncByteStream(), status_code=200) - ) - - -@pytest.fixture -def mock_api_sync(mock_api: None) -> None: # noqa: ARG001, fixture - class SyncByteStream(httpx._types.SyncByteStream): - def __iter__(self) -> Iterator[bytes]: - for i in _MOCKED_ACTOR_LOGS: - yield i - time.sleep(0.01) - - def close(self) -> None: - pass - - respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}/log?stream=1&raw=1').mock( - return_value=httpx.Response(stream=SyncByteStream(), status_code=200) - ) - - -@pytest.fixture -def propagate_stream_logs() -> None: - # Enable propagation of logs to the caplog fixture - StreamedLog._force_propagate = True - StatusMessageWatcher._force_propagate = True - logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG) - - -@pytest.fixture -def reduce_final_timeout_for_status_message_redirector() -> None: - """Reduce timeout used by the `StatusMessageWatcher` - - This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests. - """ - StatusMessageWatcher._final_sleep_time_s = 2 - - -@pytest.mark.parametrize( - ('log_from_start', 'expected_log_count'), - [ - (True, len(_EXPECTED_MESSAGES_AND_LEVELS)), - (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), - ], -) -@respx.mock -async def test_redirected_logs_async( - *, - caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - log_from_start: bool, - expected_log_count: int, -) -> None: - """Test that redirected logs are formatted correctly.""" - - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) - - with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: - # Mock `now()` so that it has timestamp bigger than the first 3 logs - mocked_datetime.now.return_value = datetime.fromisoformat('2025-05-13T07:24:14.132+00:00') - streamed_log = await run_client.get_streamed_log(from_start=log_from_start) - - # Set `propagate=True` during the tests, so that caplog can see the logs.. - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - - with caplog.at_level(logging.DEBUG, logger=logger_name): - async with streamed_log: - # Do stuff while the log from the other Actor is being redirected to the logs. - await asyncio.sleep(2) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] - ) - - -@pytest.mark.parametrize( - ('log_from_start', 'expected_log_count'), - [ - (True, len(_EXPECTED_MESSAGES_AND_LEVELS)), - (False, len(_EXPECTED_MESSAGES_AND_LEVELS) - _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH), - ], -) -@respx.mock -def test_redirected_logs_sync( - *, - caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - log_from_start: bool, - expected_log_count: int, -) -> None: - """Test that redirected logs are formatted correctly.""" - - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) - - with patch('apify_client.clients.resource_clients.log.datetime') as mocked_datetime: - # Mock `now()` so that it has timestamp bigger than the first 3 logs - mocked_datetime.now.return_value = datetime.fromisoformat('2025-05-13T07:24:14.132+00:00') - streamed_log = run_client.get_streamed_log(from_start=log_from_start) - - # Set `propagate=True` during the tests, so that caplog can see the logs.. - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - - with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: - # Do stuff while the log from the other Actor is being redirected to the logs. - time.sleep(2) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] - ) - - -@respx.mock -async def test_actor_call_redirect_logs_to_default_logger_async( - caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test that logs are redirected correctly to the default logger. - - Caplog contains logs before formatting, so formatting is not included in the test expectations.""" - logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' - logger = logging.getLogger(logger_name) - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - await actor_client.call() - - # Ensure expected handler and formater - assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) - assert isinstance(logger.handlers[0], logging.StreamHandler) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES - ) - - -@respx.mock -def test_actor_call_redirect_logs_to_default_logger_sync( - caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test that logs are redirected correctly to the default logger. - - Caplog contains logs before formatting, so formatting is not included in the test expectations.""" - logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' - logger = logging.getLogger(logger_name) - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - actor_client.call() - - # Ensure expected handler and formater - assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) - assert isinstance(logger.handlers[0], logging.StreamHandler) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES - ) - - -@respx.mock -async def test_actor_call_no_redirect_logs_async( - caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture -) -> None: - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - await actor_client.call(logger=None) - - assert len(caplog.records) == 0 - - -@respx.mock -def test_actor_call_no_redirect_logs_sync( - caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture -) -> None: - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - actor_client.call(logger=None) - - assert len(caplog.records) == 0 - - -@respx.mock -async def test_actor_call_redirect_logs_to_custom_logger_async( - caplog: LogCaptureFixture, - mock_api_async: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test that logs are redirected correctly to the custom logger.""" - logger_name = 'custom_logger' - logger = logging.getLogger(logger_name) - actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - await actor_client.call(logger=logger) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES - ) - - -@respx.mock -def test_actor_call_redirect_logs_to_custom_logger_sync( - caplog: LogCaptureFixture, - mock_api_sync: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test that logs are redirected correctly to the custom logger.""" - logger_name = 'custom_logger' - logger = logging.getLogger(logger_name) - actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) - - with caplog.at_level(logging.DEBUG, logger=logger_name): - actor_client.call(logger=logger) - - # Ensure logs are propagated - assert {(record.message, record.levelno) for record in caplog.records} == set( - _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES - ) - - -@respx.mock -async def test_redirect_status_message_async( - *, - caplog: LogCaptureFixture, - mock_api: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test redirected status and status messages.""" - - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) - - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - - status_message_redirector = await run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) - with caplog.at_level(logging.DEBUG, logger=logger_name): - async with status_message_redirector: - # Do stuff while the status from the other Actor is being redirected to the logs. - await asyncio.sleep(3) - - assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' - assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' - assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' - - -@respx.mock -def test_redirect_status_message_sync( - *, - caplog: LogCaptureFixture, - mock_api: None, # noqa: ARG001, fixture - propagate_stream_logs: None, # noqa: ARG001, fixture - reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture -) -> None: - """Test redirected status and status messages.""" - - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) - - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - - status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) - with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: - # Do stuff while the status from the other Actor is being redirected to the logs. - time.sleep(3) - - assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' - assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' - assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 52d05f06..b6aff7c5 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -117,9 +117,16 @@ def _streaming_log_handler(_request: Request) -> Response: def generate_logs() -> Iterator[bytes]: for chunk in _MOCKED_ACTOR_LOGS: yield chunk - time.sleep(0.01) + time.sleep(0.05) - return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') + total_size = sum(len(chunk) for chunk in _MOCKED_ACTOR_LOGS) + + return Response( + response=generate_logs(), + status=200, + mimetype='application/octet-stream', + headers={'Content-Length': str(total_size)}, + ) @pytest.fixture From e06f2c2fc7b2909c289e0c3553ff6e23b4bb1abf Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 01:31:29 +0000 Subject: [PATCH 3/8] wincheck --- tests/unit/test_logging.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index b6aff7c5..97694b54 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -3,6 +3,7 @@ import asyncio import json import logging +import sys import time from datetime import datetime, timedelta from typing import TYPE_CHECKING @@ -68,6 +69,11 @@ ) +def _get_windows_sleep_time() -> float: + """Get adjusted sleep time for Windows systems.""" + return 5.0 if sys.platform == 'win32' else 2.0 + + class StatusResponseGenerator: """Generator for actor run status responses to simulate changing status over time.""" @@ -244,7 +250,7 @@ def test_redirected_logs_sync( with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: # Do stuff while the log from the other Actor is being redirected to the logs. - time.sleep(2) + time.sleep(_get_windows_sleep_time()) # Ensure logs are propagated assert {(record.message, record.levelno) for record in caplog.records} == set( @@ -421,7 +427,7 @@ def test_redirect_status_message_sync( status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: # Do stuff while the status from the other Actor is being redirected to the logs. - time.sleep(3) + time.sleep(_get_windows_sleep_time()) assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' From 1b8358465a2b51fe1f3ca3fabe477f74140c2892 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 01:34:39 +0000 Subject: [PATCH 4/8] wincheck --- tests/unit/test_logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 97694b54..42df378d 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -71,7 +71,7 @@ def _get_windows_sleep_time() -> float: """Get adjusted sleep time for Windows systems.""" - return 5.0 if sys.platform == 'win32' else 2.0 + return 10.0 if sys.platform == 'win32' else 2.0 class StatusResponseGenerator: From 09dabbfeb9701abee644d312d324b3dba3bc1309 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 01:53:16 +0000 Subject: [PATCH 5/8] wincheck --- tests/unit/test_logging.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 42df378d..0f5b7dd3 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -69,11 +69,6 @@ ) -def _get_windows_sleep_time() -> float: - """Get adjusted sleep time for Windows systems.""" - return 10.0 if sys.platform == 'win32' else 2.0 - - class StatusResponseGenerator: """Generator for actor run status responses to simulate changing status over time.""" @@ -123,7 +118,7 @@ def _streaming_log_handler(_request: Request) -> Response: def generate_logs() -> Iterator[bytes]: for chunk in _MOCKED_ACTOR_LOGS: yield chunk - time.sleep(0.05) + time.sleep(0.01) total_size = sum(len(chunk) for chunk in _MOCKED_ACTOR_LOGS) @@ -211,7 +206,7 @@ async def test_redirected_logs_async( with caplog.at_level(logging.DEBUG, logger=logger_name): async with streamed_log: # Do stuff while the log from the other Actor is being redirected to the logs. - await asyncio.sleep(2) + await asyncio.sleep(4.0 if sys.platform == 'win32' else 2.0) # Ensure logs are propagated assert {(record.message, record.levelno) for record in caplog.records} == set( @@ -250,7 +245,7 @@ def test_redirected_logs_sync( with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: # Do stuff while the log from the other Actor is being redirected to the logs. - time.sleep(_get_windows_sleep_time()) + time.sleep(4.0 if sys.platform == 'win32' else 2.0) # Ensure logs are propagated assert {(record.message, record.levelno) for record in caplog.records} == set( @@ -427,7 +422,7 @@ def test_redirect_status_message_sync( status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: # Do stuff while the status from the other Actor is being redirected to the logs. - time.sleep(_get_windows_sleep_time()) + time.sleep(3) assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' From 3f15d023b27d2873c4b6a6f538d514879ef6550a Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 14:17:40 +0000 Subject: [PATCH 6/8] without localhost --- tests/unit/conftest.py | 10 ++++++---- tests/unit/test_logging.py | 11 ++++------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index aea69820..747a99bd 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -10,7 +10,7 @@ def make_httpserver() -> Iterable[HTTPServer]: werkzeug_logger = getLogger('werkzeug') werkzeug_logger.disabled = True - server = HTTPServer(threaded=True) + server = HTTPServer(threaded=True, host='127.0.0.1') server.start() yield server server.clear() # type: ignore[no-untyped-call] @@ -18,9 +18,11 @@ def make_httpserver() -> Iterable[HTTPServer]: server.stop() # type: ignore[no-untyped-call] -@pytest.fixture(scope='session') -def httpserver(make_httpserver: HTTPServer) -> HTTPServer: - return make_httpserver +@pytest.fixture +def httpserver(make_httpserver: HTTPServer) -> Iterable[HTTPServer]: + server = make_httpserver + yield server + server.clear() # type: ignore[no-untyped-call] @pytest.fixture diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 0f5b7dd3..c518935b 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import sys import time from datetime import datetime, timedelta from typing import TYPE_CHECKING @@ -133,8 +132,6 @@ def generate_logs() -> Iterator[bytes]: @pytest.fixture def mock_api(httpserver: HTTPServer) -> None: """Set up HTTP server with mocked API endpoints.""" - httpserver.clear() # type: ignore[no-untyped-call] - status_generator = StatusResponseGenerator() # Add actor run status endpoint @@ -206,7 +203,7 @@ async def test_redirected_logs_async( with caplog.at_level(logging.DEBUG, logger=logger_name): async with streamed_log: # Do stuff while the log from the other Actor is being redirected to the logs. - await asyncio.sleep(4.0 if sys.platform == 'win32' else 2.0) + await asyncio.sleep(1) # Ensure logs are propagated assert {(record.message, record.levelno) for record in caplog.records} == set( @@ -245,7 +242,7 @@ def test_redirected_logs_sync( with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: # Do stuff while the log from the other Actor is being redirected to the logs. - time.sleep(4.0 if sys.platform == 'win32' else 2.0) + time.sleep(1) # Ensure logs are propagated assert {(record.message, record.levelno) for record in caplog.records} == set( @@ -398,7 +395,7 @@ async def test_redirect_status_message_async( with caplog.at_level(logging.DEBUG, logger=logger_name): async with status_message_redirector: # Do stuff while the status from the other Actor is being redirected to the logs. - await asyncio.sleep(3) + await asyncio.sleep(1) assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' @@ -422,7 +419,7 @@ def test_redirect_status_message_sync( status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: # Do stuff while the status from the other Actor is being redirected to the logs. - time.sleep(3) + time.sleep(1) assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' From bfdf114ed9dc4527599aca93be0995faf0b533b9 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 6 Aug 2025 20:19:28 +0000 Subject: [PATCH 7/8] add werkzeug in dev dependency --- pyproject.toml | 1 + uv.lock | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5309aa78..8ae54b6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ dev = [ "ruff~=0.12.0", "setuptools", # setuptools are used by pytest but not explicitly required "types-colorama~=0.4.15.20240106", + "werkzeug~=3.0.0", # Werkzeug is used by pytest-httpserver ] [tool.hatch.build.targets.wheel] diff --git a/uv.lock b/uv.lock index f0e88fa6..cddc278d 100644 --- a/uv.lock +++ b/uv.lock @@ -50,6 +50,7 @@ dev = [ { name = "ruff" }, { name = "setuptools" }, { name = "types-colorama" }, + { name = "werkzeug" }, ] [package.metadata] @@ -78,6 +79,7 @@ dev = [ { name = "ruff", specifier = "~=0.12.0" }, { name = "setuptools" }, { name = "types-colorama", specifier = "~=0.4.15.20240106" }, + { name = "werkzeug", specifier = "~=3.0.0" }, ] [[package]] @@ -1218,14 +1220,14 @@ wheels = [ [[package]] name = "werkzeug" -version = "3.1.3" +version = "3.0.6" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "markupsafe" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9f/69/83029f1f6300c5fb2471d621ab06f6ec6b3324685a2ce0f9777fd4a8b71e/werkzeug-3.1.3.tar.gz", hash = "sha256:60723ce945c19328679790e3282cc758aa4a6040e4bb330f53d30fa546d44746", size = 806925, upload-time = "2024-11-08T15:52:18.093Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d4/f9/0ba83eaa0df9b9e9d1efeb2ea351d0677c37d41ee5d0f91e98423c7281c9/werkzeug-3.0.6.tar.gz", hash = "sha256:a8dd59d4de28ca70471a34cba79bed5f7ef2e036a76b3ab0835474246eb41f8d", size = 805170, upload-time = "2024-10-25T18:52:31.688Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/52/24/ab44c871b0f07f491e5d2ad12c9bd7358e527510618cb1b803a88e986db1/werkzeug-3.1.3-py3-none-any.whl", hash = "sha256:54b78bf3716d19a65be4fceccc0d1d7b89e608834989dfae50ea87564639213e", size = 224498, upload-time = "2024-11-08T15:52:16.132Z" }, + { url = "https://files.pythonhosted.org/packages/6c/69/05837f91dfe42109203ffa3e488214ff86a6d68b2ed6c167da6cdc42349b/werkzeug-3.0.6-py3-none-any.whl", hash = "sha256:1bc0c2310d2fbb07b1dd1105eba2f7af72f322e1e455f2f93c993bee8c8a5f17", size = 227979, upload-time = "2024-10-25T18:52:30.129Z" }, ] [[package]] From 8a353156a8e62a17c793d698a95a3977321bbac8 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Fri, 8 Aug 2025 13:48:40 +0000 Subject: [PATCH 8/8] update test for error stream --- tests/unit/test_client_errors.py | 61 ++++++++++++++++---------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/tests/unit/test_client_errors.py b/tests/unit/test_client_errors.py index fc1783e0..ca410ce4 100644 --- a/tests/unit/test_client_errors.py +++ b/tests/unit/test_client_errors.py @@ -1,19 +1,20 @@ from __future__ import annotations import json +import time from typing import TYPE_CHECKING -import httpx import pytest -import respx +from werkzeug import Response from apify_client._errors import ApifyApiError from apify_client._http_client import HTTPClient, HTTPClientAsync if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterator + from collections.abc import Iterator from pytest_httpserver import HTTPServer + from werkzeug import Request _TEST_PATH = '/errors' _EXPECTED_MESSAGE = 'some_message' @@ -41,6 +42,22 @@ def test_endpoint(httpserver: HTTPServer) -> str: return str(httpserver.url_for(_TEST_PATH)) +def streaming_handler(_request: Request) -> Response: + """Handler for streaming log requests.""" + + def generate_response() -> Iterator[bytes]: + for i in range(len(RAW_ERROR)): + yield RAW_ERROR[i : i + 1] + time.sleep(0.01) + + return Response( + response=(RAW_ERROR[i : i + 1] for i in range(len(RAW_ERROR))), + status=403, + mimetype='application/octet-stream', + headers={'Content-Length': str(len(RAW_ERROR))}, + ) + + def test_client_apify_api_error_with_data(test_endpoint: str) -> None: """Test that client correctly throws ApifyApiError with error data from response.""" client = HTTPClient() @@ -65,51 +82,33 @@ async def test_async_client_apify_api_error_with_data(test_endpoint: str) -> Non assert e.value.data == _EXPECTED_DATA -def test_client_apify_api_error_streamed() -> None: +def test_client_apify_api_error_streamed(httpserver: HTTPServer) -> None: """Test that client correctly throws ApifyApiError when the response has stream.""" error = json.loads(RAW_ERROR.decode()) - class ByteStream(httpx._types.SyncByteStream): - def __iter__(self) -> Iterator[bytes]: - yield RAW_ERROR - - def close(self) -> None: - pass - - stream_url = 'http://some-stream-url.com' - client = HTTPClient() - with respx.mock() as respx_mock: - respx_mock.get(url=stream_url).mock(return_value=httpx.Response(stream=ByteStream(), status_code=403)) - with pytest.raises(ApifyApiError) as e: - client.call(method='GET', url=stream_url, stream=True, parse_response=False) + httpserver.expect_request('/stream_error').respond_with_handler(streaming_handler) + + with pytest.raises(ApifyApiError) as e: + client.call(method='GET', url=httpserver.url_for('/stream_error'), stream=True, parse_response=False) assert e.value.message == error['error']['message'] assert e.value.type == error['error']['type'] -async def test_async_client_apify_api_error_streamed() -> None: +async def test_async_client_apify_api_error_streamed(httpserver: HTTPServer) -> None: """Test that async client correctly throws ApifyApiError when the response has stream.""" error = json.loads(RAW_ERROR.decode()) - class AsyncByteStream(httpx._types.AsyncByteStream): - async def __aiter__(self) -> AsyncIterator[bytes]: - yield RAW_ERROR - - async def aclose(self) -> None: - pass - - stream_url = 'http://some-stream-url.com' - client = HTTPClientAsync() - with respx.mock() as respx_mock: - respx_mock.get(url=stream_url).mock(return_value=httpx.Response(stream=AsyncByteStream(), status_code=403)) - with pytest.raises(ApifyApiError) as e: - await client.call(method='GET', url=stream_url, stream=True, parse_response=False) + httpserver.expect_request('/stream_error').respond_with_handler(streaming_handler) + + with pytest.raises(ApifyApiError) as e: + await client.call(method='GET', url=httpserver.url_for('/stream_error'), stream=True, parse_response=False) assert e.value.message == error['error']['message'] assert e.value.type == error['error']['type']