Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
from pathlib import Path

from typing_extensions import override

Expand All @@ -23,9 +25,15 @@ async def purge(self) -> None:
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
"""
kvs_input_key = Configuration.get_global_configuration().input_key

# First try to find the alternative format of the input file and process it if it exists.
for file_path in self.path_to_kvs.glob('*'):
if file_path.name == f'{kvs_input_key}.json':
await self._process_input_json(file_path)

async with self._lock:
for file_path in self.path_to_kvs.glob('*'):
if file_path.name in {METADATA_FILENAME, f'{kvs_input_key}.json'}:
if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}:
continue
if file_path.is_file():
await asyncio.to_thread(file_path.unlink, missing_ok=True)
Expand All @@ -34,3 +42,16 @@ async def purge(self) -> None:
update_accessed_at=True,
update_modified_at=True,
)

async def _process_input_json(self, path: Path) -> None:
"""Process simple input json file to format expected by the FileSystemKeyValueStoreClient.

For example: INPUT.json -> INPUT, INPUT.json.metadata
"""
try:
f = await asyncio.to_thread(path.open)
input_data = json.load(f)
finally:
f.close()
await asyncio.to_thread(path.unlink, missing_ok=True)
await self.set_value(key=path.stem, value=input_data)
33 changes: 30 additions & 3 deletions tests/unit/storage_clients/test_file_system.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from __future__ import annotations

import asyncio
import json
from typing import TYPE_CHECKING

from crawlee import service_locator
from crawlee._consts import METADATA_FILENAME

from apify import Configuration
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient
from apify import Actor, Configuration
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient

if TYPE_CHECKING:
from pathlib import Path


async def test_purge_preserves_input_file_and_metadata() -> None:
Expand All @@ -23,18 +29,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None:
kvs_path = kvs_storage_client.path_to_kvs

# Create various files
input_file = kvs_path / f'{configuration.input_key}.json'
input_file = kvs_path / f'{configuration.input_key}'
input_metadata_file = kvs_path / f'{configuration.input_key}.{METADATA_FILENAME}.json'
metadata_file = kvs_path / METADATA_FILENAME
regular_file1 = kvs_path / 'regular_file1.json'
regular_file2 = kvs_path / 'another_file.txt'

# Write content to files
await asyncio.to_thread(input_file.write_text, '{"test": "input"}')
await asyncio.to_thread(input_metadata_file.write_text, 'some text content')
await asyncio.to_thread(regular_file1.write_text, '{"test": "data1"}')
await asyncio.to_thread(regular_file2.write_text, 'some text content')

# Verify all files exist before purge
assert input_file.exists()
assert input_metadata_file.exists()
assert metadata_file.exists() # Should exist from client creation
assert regular_file1.exists()
assert regular_file2.exists()
Expand All @@ -53,3 +62,21 @@ async def test_purge_preserves_input_file_and_metadata() -> None:
# Verify INPUT.json content is unchanged
input_content = await asyncio.to_thread(input_file.read_text)
assert input_content == '{"test": "input"}'


async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None:
pre_existing_input = {
'foo': 'bar',
}

configuration = Configuration.get_global_configuration()
# Create pre-existing INPUT.json file
path_to_input = tmp_path / 'key_value_stores' / 'default'
path_to_input.mkdir(parents=True)
(path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input))

# Remove this line after https://github.com/apify/apify-sdk-python/pull/576
service_locator.set_storage_client(ApifyFileSystemStorageClient())

async with Actor():
assert pre_existing_input == await Actor.get_input()
Loading