diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index 28158b55..b5ebf442 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta from decimal import Decimal from logging import getLogger +from pathlib import Path from typing import Annotated, Any from pydantic import AliasChoices, BeforeValidator, Field, model_validator @@ -421,6 +422,14 @@ def disable_browser_sandbox_on_platform(self) -> Self: logger.warning('Actor is running on the Apify platform, `disable_browser_sandbox` was changed to True.') return self + @property + def canonical_input_key(self) -> str: + return Path(self.input_key).stem + + @property + def input_key_candidates(self) -> set[str]: + return {self.input_key, self.canonical_input_key, Path(self.canonical_input_key).with_suffix('.json').name} + @classmethod def get_global_configuration(cls) -> Configuration: """Retrieve the global instance of the configuration. diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 5a339982..7d8d1efa 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -1,14 +1,18 @@ import asyncio import json -from pathlib import Path +import logging +from more_itertools import flatten from typing_extensions import override from crawlee._consts import METADATA_FILENAME from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient +from crawlee.storage_clients.models import KeyValueStoreRecord from apify._configuration import Configuration +logger = logging.getLogger(__name__) + class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient): """Apify-specific implementation of the `FileSystemKeyValueStoreClient`. @@ -24,16 +28,18 @@ async def purge(self) -> None: It deletes all files in the key-value store directory, except for the metadata file and the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged. """ - kvs_input_key = Configuration.get_global_configuration().input_key + configuration = Configuration.get_global_configuration() - # First try to find the alternative format of the input file and process it if it exists. - for file_path in self.path_to_kvs.glob('*'): - if file_path.name == f'{kvs_input_key}.json': - await self._process_input_json(file_path) + await self._sanitize_input_json_files() async with self._lock: + files_to_keep = set( + flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates) + ) + files_to_keep.add(METADATA_FILENAME) + for file_path in self.path_to_kvs.glob('*'): - if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}: + if file_path.name in files_to_keep: continue if file_path.is_file(): await asyncio.to_thread(file_path.unlink, missing_ok=True) @@ -43,15 +49,40 @@ async def purge(self) -> None: update_modified_at=True, ) - async def _process_input_json(self, path: Path) -> None: - """Process simple input json file to format expected by the FileSystemKeyValueStoreClient. + async def _sanitize_input_json_files(self) -> None: + """Handle missing metadata for input files.""" + configuration = Configuration.get_global_configuration() + alternative_keys = configuration.input_key_candidates - {configuration.canonical_input_key} - For example: INPUT.json -> INPUT, INPUT.json.metadata - """ - try: - f = await asyncio.to_thread(path.open) - input_data = json.load(f) - finally: - f.close() - await asyncio.to_thread(path.unlink, missing_ok=True) - await self.set_value(key=path.stem, value=input_data) + if (self.path_to_kvs / configuration.canonical_input_key).exists(): + # Handle missing metadata + if not await self.record_exists(key=configuration.canonical_input_key): + input_data = await asyncio.to_thread( + lambda: json.loads((self.path_to_kvs / configuration.canonical_input_key).read_text()) + ) + await self.set_value(key=configuration.canonical_input_key, value=input_data) + + for alternative_key in alternative_keys: + if (alternative_input_file := self.path_to_kvs / alternative_key).exists(): + logger.warning(f'Redundant input file found: {alternative_input_file}') + else: + for alternative_key in alternative_keys: + alternative_input_file = self.path_to_kvs / alternative_key + + # Handle missing metadata + if alternative_input_file.exists() and not await self.record_exists(key=alternative_key): + with alternative_input_file.open() as f: + input_data = await asyncio.to_thread(lambda: json.load(f)) + await self.set_value(key=configuration.canonical_input_key, value=input_data) + + @override + async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: + configuration = Configuration.get_global_configuration() + + if key in configuration.input_key_candidates: + for candidate in configuration.input_key_candidates: + value = await super().get_value(key=candidate) + if value is not None: + return value + + return await super().get_value(key=key) diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index 7b938416..f0cc2bbd 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -4,6 +4,8 @@ import json from typing import TYPE_CHECKING +import pytest + from crawlee._consts import METADATA_FILENAME from apify import Actor, Configuration @@ -61,19 +63,22 @@ async def test_purge_preserves_input_file_and_metadata() -> None: # Verify INPUT.json content is unchanged input_content = await asyncio.to_thread(input_file.read_text) - assert input_content == '{"test": "input"}' + assert json.loads(input_content) == json.loads('{"test": "input"}') -async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None: +@pytest.mark.parametrize('input_file_name', ['INPUT', 'INPUT.json']) +async def test_pre_existing_input_used_by_actor(tmp_path: Path, input_file_name: str) -> None: pre_existing_input = { 'foo': 'bar', } - configuration = Configuration.get_global_configuration() # Create pre-existing INPUT.json file path_to_input = tmp_path / 'key_value_stores' / 'default' path_to_input.mkdir(parents=True) - (path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input)) + (path_to_input / input_file_name).write_text(json.dumps(pre_existing_input)) async with Actor(): assert pre_existing_input == await Actor.get_input() + + # Make sure that the input file doesn't get renamed in the process + assert (path_to_input / input_file_name).exists()