Skip to content

Commit cc5075f

Browse files
authored
fix: Properly process pre-existing Actor input file (#591)
### Description - Properly process pre-existing `Actor` input file in `ApifyFileSystemKeyValueStoreClient` - Ensuring that the `ApifyFileSystemKeyValueStoreClient` is actually used will be done in a separate PR that reworks the way storage is set to the `Actor` #576 ### Issues - Closes: #590
1 parent 9f12c4e commit cc5075f

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

src/apify/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import json
3+
from pathlib import Path
24

35
from typing_extensions import override
46

@@ -23,9 +25,15 @@ async def purge(self) -> None:
2325
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
2426
"""
2527
kvs_input_key = Configuration.get_global_configuration().input_key
28+
29+
# First try to find the alternative format of the input file and process it if it exists.
30+
for file_path in self.path_to_kvs.glob('*'):
31+
if file_path.name == f'{kvs_input_key}.json':
32+
await self._process_input_json(file_path)
33+
2634
async with self._lock:
2735
for file_path in self.path_to_kvs.glob('*'):
28-
if file_path.name in {METADATA_FILENAME, f'{kvs_input_key}.json'}:
36+
if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}:
2937
continue
3038
if file_path.is_file():
3139
await asyncio.to_thread(file_path.unlink, missing_ok=True)
@@ -34,3 +42,16 @@ async def purge(self) -> None:
3442
update_accessed_at=True,
3543
update_modified_at=True,
3644
)
45+
46+
async def _process_input_json(self, path: Path) -> None:
47+
"""Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
48+
49+
For example: INPUT.json -> INPUT, INPUT.json.metadata
50+
"""
51+
try:
52+
f = await asyncio.to_thread(path.open)
53+
input_data = json.load(f)
54+
finally:
55+
f.close()
56+
await asyncio.to_thread(path.unlink, missing_ok=True)
57+
await self.set_value(key=path.stem, value=input_data)

tests/unit/storage_clients/test_file_system.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import json
5+
from typing import TYPE_CHECKING
46

7+
from crawlee import service_locator
58
from crawlee._consts import METADATA_FILENAME
69

7-
from apify import Configuration
8-
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient
10+
from apify import Actor, Configuration
11+
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient
12+
13+
if TYPE_CHECKING:
14+
from pathlib import Path
915

1016

1117
async def test_purge_preserves_input_file_and_metadata() -> None:
@@ -23,18 +29,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None:
2329
kvs_path = kvs_storage_client.path_to_kvs
2430

2531
# Create various files
26-
input_file = kvs_path / f'{configuration.input_key}.json'
32+
input_file = kvs_path / f'{configuration.input_key}'
33+
input_metadata_file = kvs_path / f'{configuration.input_key}.{METADATA_FILENAME}.json'
2734
metadata_file = kvs_path / METADATA_FILENAME
2835
regular_file1 = kvs_path / 'regular_file1.json'
2936
regular_file2 = kvs_path / 'another_file.txt'
3037

3138
# Write content to files
3239
await asyncio.to_thread(input_file.write_text, '{"test": "input"}')
40+
await asyncio.to_thread(input_metadata_file.write_text, 'some text content')
3341
await asyncio.to_thread(regular_file1.write_text, '{"test": "data1"}')
3442
await asyncio.to_thread(regular_file2.write_text, 'some text content')
3543

3644
# Verify all files exist before purge
3745
assert input_file.exists()
46+
assert input_metadata_file.exists()
3847
assert metadata_file.exists() # Should exist from client creation
3948
assert regular_file1.exists()
4049
assert regular_file2.exists()
@@ -53,3 +62,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None:
5362
# Verify INPUT.json content is unchanged
5463
input_content = await asyncio.to_thread(input_file.read_text)
5564
assert input_content == '{"test": "input"}'
65+
66+
67+
async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None:
68+
pre_existing_input = {
69+
'foo': 'bar',
70+
}
71+
72+
configuration = Configuration.get_global_configuration()
73+
# Create pre-existing INPUT.json file
74+
path_to_input = tmp_path / 'key_value_stores' / 'default'
75+
path_to_input.mkdir(parents=True)
76+
(path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input))
77+
78+
# Remove this line after https://github.com/apify/apify-sdk-python/pull/576
79+
service_locator.set_storage_client(ApifyFileSystemStorageClient())
80+
81+
async with Actor():
82+
assert pre_existing_input == await Actor.get_input()

0 commit comments

Comments
 (0)