From dd94567ed26587c19934e3888977ad762a7dddc0 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Mon, 6 Oct 2025 22:43:36 +0200 Subject: [PATCH 1/7] fix: Also load input from a file with a .json extension in file system storage --- src/apify/_configuration.py | 9 ++++++ .../_file_system/_key_value_store_client.py | 28 +++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index 28158b55..b5ebf442 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta from decimal import Decimal from logging import getLogger +from pathlib import Path from typing import Annotated, Any from pydantic import AliasChoices, BeforeValidator, Field, model_validator @@ -421,6 +422,14 @@ def disable_browser_sandbox_on_platform(self) -> Self: logger.warning('Actor is running on the Apify platform, `disable_browser_sandbox` was changed to True.') return self + @property + def canonical_input_key(self) -> str: + return Path(self.input_key).stem + + @property + def input_key_candidates(self) -> set[str]: + return {self.input_key, self.canonical_input_key, Path(self.canonical_input_key).with_suffix('.json').name} + @classmethod def get_global_configuration(cls) -> Configuration: """Retrieve the global instance of the configuration. diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 5a339982..83c4573c 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -2,10 +2,12 @@ import json from pathlib import Path +from more_itertools import flatten from typing_extensions import override from crawlee._consts import METADATA_FILENAME from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient +from crawlee.storage_clients.models import KeyValueStoreRecord from apify._configuration import Configuration @@ -24,16 +26,24 @@ async def purge(self) -> None: It deletes all files in the key-value store directory, except for the metadata file and the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged. """ - kvs_input_key = Configuration.get_global_configuration().input_key + configuration = Configuration.get_global_configuration() # First try to find the alternative format of the input file and process it if it exists. for file_path in self.path_to_kvs.glob('*'): - if file_path.name == f'{kvs_input_key}.json': + if ( + file_path.name in configuration.input_key_candidates + and file_path.name != configuration.canonical_input_key + ): await self._process_input_json(file_path) async with self._lock: + files_to_keep = set( + flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates) + ) + files_to_keep.add(METADATA_FILENAME) + for file_path in self.path_to_kvs.glob('*'): - if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}: + if file_path.name in files_to_keep: continue if file_path.is_file(): await asyncio.to_thread(file_path.unlink, missing_ok=True) @@ -55,3 +65,15 @@ async def _process_input_json(self, path: Path) -> None: f.close() await asyncio.to_thread(path.unlink, missing_ok=True) await self.set_value(key=path.stem, value=input_data) + + @override + async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: + configuration = Configuration.get_global_configuration() + + if key in configuration.input_key_candidates: + for candidate in configuration.input_key_candidates: + value = await super().get_value(key=candidate) + if value is not None: + return value + + return await super().get_value(key=key) From e0d647b9ad0958fdf872d0e94fd7d19fdf05adba Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Mon, 6 Oct 2025 23:00:29 +0200 Subject: [PATCH 2/7] extend test, fix bug --- .../_file_system/_key_value_store_client.py | 5 +---- tests/unit/storage_clients/test_file_system.py | 8 +++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 83c4573c..f47fc368 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -30,10 +30,7 @@ async def purge(self) -> None: # First try to find the alternative format of the input file and process it if it exists. for file_path in self.path_to_kvs.glob('*'): - if ( - file_path.name in configuration.input_key_candidates - and file_path.name != configuration.canonical_input_key - ): + if file_path.name in configuration.input_key_candidates: await self._process_input_json(file_path) async with self._lock: diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index 7b938416..10ea6e1b 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -4,6 +4,8 @@ import json from typing import TYPE_CHECKING +import pytest + from crawlee._consts import METADATA_FILENAME from apify import Actor, Configuration @@ -64,16 +66,16 @@ async def test_purge_preserves_input_file_and_metadata() -> None: assert input_content == '{"test": "input"}' -async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None: +@pytest.mark.parametrize('input_file_name', ['INPUT', 'INPUT.json']) +async def test_pre_existing_input_used_by_actor(tmp_path: Path, input_file_name: str) -> None: pre_existing_input = { 'foo': 'bar', } - configuration = Configuration.get_global_configuration() # Create pre-existing INPUT.json file path_to_input = tmp_path / 'key_value_stores' / 'default' path_to_input.mkdir(parents=True) - (path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input)) + (path_to_input / input_file_name).write_text(json.dumps(pre_existing_input)) async with Actor(): assert pre_existing_input == await Actor.get_input() From 649227d8c1cf7315dd6406668a718bd8aeb53a87 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Oct 2025 09:33:20 +0200 Subject: [PATCH 3/7] relax test --- tests/unit/storage_clients/test_file_system.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index 10ea6e1b..e2a77f70 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -63,7 +63,7 @@ async def test_purge_preserves_input_file_and_metadata() -> None: # Verify INPUT.json content is unchanged input_content = await asyncio.to_thread(input_file.read_text) - assert input_content == '{"test": "input"}' + assert json.loads(input_content) == json.loads('{"test": "input"}') @pytest.mark.parametrize('input_file_name', ['INPUT', 'INPUT.json']) From 5b416ee518ddca0ca0cdf7321e607631970ce6a7 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Oct 2025 09:49:39 +0200 Subject: [PATCH 4/7] Better naming + safety measures --- .../_file_system/_key_value_store_client.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index f47fc368..24013ed3 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -1,5 +1,6 @@ import asyncio import json +import logging from pathlib import Path from more_itertools import flatten @@ -11,6 +12,8 @@ from apify._configuration import Configuration +logger = logging.getLogger(__name__) + class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient): """Apify-specific implementation of the `FileSystemKeyValueStoreClient`. @@ -31,7 +34,7 @@ async def purge(self) -> None: # First try to find the alternative format of the input file and process it if it exists. for file_path in self.path_to_kvs.glob('*'): if file_path.name in configuration.input_key_candidates: - await self._process_input_json(file_path) + await self._sanitize_input_json(file_path) async with self._lock: files_to_keep = set( @@ -50,18 +53,29 @@ async def purge(self) -> None: update_modified_at=True, ) - async def _process_input_json(self, path: Path) -> None: - """Process simple input json file to format expected by the FileSystemKeyValueStoreClient. + async def _sanitize_input_json(self, path: Path) -> None: + """Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient. For example: INPUT.json -> INPUT, INPUT.json.metadata """ + configuration = Configuration.get_global_configuration() + + f = None try: f = await asyncio.to_thread(path.open) input_data = json.load(f) finally: - f.close() + if f is not None: + f.close() + + if await self.record_exists(key=configuration.canonical_input_key): + logger.warning(f'Redundant input file found: {path}') + return + + logger.info(f'Renaming input file: {path.name} -> {configuration.canonical_input_key}') + await asyncio.to_thread(path.unlink, missing_ok=True) - await self.set_value(key=path.stem, value=input_data) + await self.set_value(key=configuration.canonical_input_key, value=input_data) @override async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: From 81d74fa86b9088da102927130ab08fcb7f63bd97 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Oct 2025 10:42:44 +0200 Subject: [PATCH 5/7] fix comment --- .../storage_clients/_file_system/_key_value_store_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 24013ed3..8b27e27b 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -56,7 +56,7 @@ async def purge(self) -> None: async def _sanitize_input_json(self, path: Path) -> None: """Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient. - For example: INPUT.json -> INPUT, INPUT.json.metadata + For example: INPUT.json -> INPUT, INPUT.__metadata__.json """ configuration = Configuration.get_global_configuration() From 1195b2c4ae11d18ba1a649f62120bdf16031345e Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Oct 2025 11:20:07 +0200 Subject: [PATCH 6/7] update expected behavior --- tests/unit/storage_clients/test_file_system.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index e2a77f70..f0cc2bbd 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -79,3 +79,6 @@ async def test_pre_existing_input_used_by_actor(tmp_path: Path, input_file_name: async with Actor(): assert pre_existing_input == await Actor.get_input() + + # Make sure that the input file doesn't get renamed in the process + assert (path_to_input / input_file_name).exists() From 71a9e0c8cb1fa2224c5cc2b74ce2c0a389ab43d1 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 7 Oct 2025 11:42:53 +0200 Subject: [PATCH 7/7] Do not actually rename files --- .../_file_system/_key_value_store_client.py | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 8b27e27b..7d8d1efa 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -1,7 +1,6 @@ import asyncio import json import logging -from pathlib import Path from more_itertools import flatten from typing_extensions import override @@ -31,10 +30,7 @@ async def purge(self) -> None: """ configuration = Configuration.get_global_configuration() - # First try to find the alternative format of the input file and process it if it exists. - for file_path in self.path_to_kvs.glob('*'): - if file_path.name in configuration.input_key_candidates: - await self._sanitize_input_json(file_path) + await self._sanitize_input_json_files() async with self._lock: files_to_keep = set( @@ -53,29 +49,31 @@ async def purge(self) -> None: update_modified_at=True, ) - async def _sanitize_input_json(self, path: Path) -> None: - """Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient. - - For example: INPUT.json -> INPUT, INPUT.__metadata__.json - """ + async def _sanitize_input_json_files(self) -> None: + """Handle missing metadata for input files.""" configuration = Configuration.get_global_configuration() - - f = None - try: - f = await asyncio.to_thread(path.open) - input_data = json.load(f) - finally: - if f is not None: - f.close() - - if await self.record_exists(key=configuration.canonical_input_key): - logger.warning(f'Redundant input file found: {path}') - return - - logger.info(f'Renaming input file: {path.name} -> {configuration.canonical_input_key}') - - await asyncio.to_thread(path.unlink, missing_ok=True) - await self.set_value(key=configuration.canonical_input_key, value=input_data) + alternative_keys = configuration.input_key_candidates - {configuration.canonical_input_key} + + if (self.path_to_kvs / configuration.canonical_input_key).exists(): + # Handle missing metadata + if not await self.record_exists(key=configuration.canonical_input_key): + input_data = await asyncio.to_thread( + lambda: json.loads((self.path_to_kvs / configuration.canonical_input_key).read_text()) + ) + await self.set_value(key=configuration.canonical_input_key, value=input_data) + + for alternative_key in alternative_keys: + if (alternative_input_file := self.path_to_kvs / alternative_key).exists(): + logger.warning(f'Redundant input file found: {alternative_input_file}') + else: + for alternative_key in alternative_keys: + alternative_input_file = self.path_to_kvs / alternative_key + + # Handle missing metadata + if alternative_input_file.exists() and not await self.record_exists(key=alternative_key): + with alternative_input_file.open() as f: + input_data = await asyncio.to_thread(lambda: json.load(f)) + await self.set_value(key=configuration.canonical_input_key, value=input_data) @override async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: