diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index d0b882c8..5a339982 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -1,4 +1,6 @@ import asyncio +import json +from pathlib import Path from typing_extensions import override @@ -23,9 +25,15 @@ async def purge(self) -> None: the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged. """ kvs_input_key = Configuration.get_global_configuration().input_key + + # First try to find the alternative format of the input file and process it if it exists. + for file_path in self.path_to_kvs.glob('*'): + if file_path.name == f'{kvs_input_key}.json': + await self._process_input_json(file_path) + async with self._lock: for file_path in self.path_to_kvs.glob('*'): - if file_path.name in {METADATA_FILENAME, f'{kvs_input_key}.json'}: + if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}: continue if file_path.is_file(): await asyncio.to_thread(file_path.unlink, missing_ok=True) @@ -34,3 +42,16 @@ async def purge(self) -> None: update_accessed_at=True, update_modified_at=True, ) + + async def _process_input_json(self, path: Path) -> None: + """Process simple input json file to format expected by the FileSystemKeyValueStoreClient. + + For example: INPUT.json -> INPUT, INPUT.json.metadata + """ + try: + f = await asyncio.to_thread(path.open) + input_data = json.load(f) + finally: + f.close() + await asyncio.to_thread(path.unlink, missing_ok=True) + await self.set_value(key=path.stem, value=input_data) diff --git a/tests/unit/storage_clients/test_file_system.py b/tests/unit/storage_clients/test_file_system.py index c14e9813..ed7cf413 100644 --- a/tests/unit/storage_clients/test_file_system.py +++ b/tests/unit/storage_clients/test_file_system.py @@ -1,11 +1,17 @@ from __future__ import annotations import asyncio +import json +from typing import TYPE_CHECKING +from crawlee import service_locator from crawlee._consts import METADATA_FILENAME -from apify import Configuration -from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient +from apify import Actor, Configuration +from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient + +if TYPE_CHECKING: + from pathlib import Path async def test_purge_preserves_input_file_and_metadata() -> None: @@ -23,18 +29,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None: kvs_path = kvs_storage_client.path_to_kvs # Create various files - input_file = kvs_path / f'{configuration.input_key}.json' + input_file = kvs_path / f'{configuration.input_key}' + input_metadata_file = kvs_path / f'{configuration.input_key}.{METADATA_FILENAME}.json' metadata_file = kvs_path / METADATA_FILENAME regular_file1 = kvs_path / 'regular_file1.json' regular_file2 = kvs_path / 'another_file.txt' # Write content to files await asyncio.to_thread(input_file.write_text, '{"test": "input"}') + await asyncio.to_thread(input_metadata_file.write_text, 'some text content') await asyncio.to_thread(regular_file1.write_text, '{"test": "data1"}') await asyncio.to_thread(regular_file2.write_text, 'some text content') # Verify all files exist before purge assert input_file.exists() + assert input_metadata_file.exists() assert metadata_file.exists() # Should exist from client creation assert regular_file1.exists() assert regular_file2.exists() @@ -53,3 +62,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None: # Verify INPUT.json content is unchanged input_content = await asyncio.to_thread(input_file.read_text) assert input_content == '{"test": "input"}' + + +async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None: + pre_existing_input = { + 'foo': 'bar', + } + + configuration = Configuration.get_global_configuration() + # Create pre-existing INPUT.json file + path_to_input = tmp_path / 'key_value_stores' / 'default' + path_to_input.mkdir(parents=True) + (path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input)) + + # Remove this line after https://github.com/apify/apify-sdk-python/pull/576 + service_locator.set_storage_client(ApifyFileSystemStorageClient()) + + async with Actor(): + assert pre_existing_input == await Actor.get_input()