Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timedelta
from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Annotated, Any

from pydantic import AliasChoices, BeforeValidator, Field, model_validator
Expand Down Expand Up @@ -421,6 +422,14 @@ def disable_browser_sandbox_on_platform(self) -> Self:
logger.warning('Actor is running on the Apify platform, `disable_browser_sandbox` was changed to True.')
return self

@property
def canonical_input_key(self) -> str:
return Path(self.input_key).stem

@property
def input_key_candidates(self) -> set[str]:
return {self.input_key, self.canonical_input_key, Path(self.canonical_input_key).with_suffix('.json').name}

@classmethod
def get_global_configuration(cls) -> Configuration:
"""Retrieve the global instance of the configuration.
Expand Down
67 changes: 49 additions & 18 deletions src/apify/storage_clients/_file_system/_key_value_store_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import asyncio
import json
from pathlib import Path
import logging

from more_itertools import flatten
from typing_extensions import override

from crawlee._consts import METADATA_FILENAME
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreRecord

from apify._configuration import Configuration

logger = logging.getLogger(__name__)


class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
"""Apify-specific implementation of the `FileSystemKeyValueStoreClient`.
Expand All @@ -24,16 +28,18 @@ async def purge(self) -> None:
It deletes all files in the key-value store directory, except for the metadata file and
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
"""
kvs_input_key = Configuration.get_global_configuration().input_key
configuration = Configuration.get_global_configuration()

# First try to find the alternative format of the input file and process it if it exists.
for file_path in self.path_to_kvs.glob('*'):
if file_path.name == f'{kvs_input_key}.json':
await self._process_input_json(file_path)
await self._sanitize_input_json_files()

async with self._lock:
files_to_keep = set(
flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates)
)
files_to_keep.add(METADATA_FILENAME)

for file_path in self.path_to_kvs.glob('*'):
if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}:
if file_path.name in files_to_keep:
continue
if file_path.is_file():
await asyncio.to_thread(file_path.unlink, missing_ok=True)
Expand All @@ -43,15 +49,40 @@ async def purge(self) -> None:
update_modified_at=True,
)

async def _process_input_json(self, path: Path) -> None:
"""Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
async def _sanitize_input_json_files(self) -> None:
"""Handle missing metadata for input files."""
configuration = Configuration.get_global_configuration()
alternative_keys = configuration.input_key_candidates - {configuration.canonical_input_key}

For example: INPUT.json -> INPUT, INPUT.json.metadata
"""
try:
f = await asyncio.to_thread(path.open)
input_data = json.load(f)
finally:
f.close()
await asyncio.to_thread(path.unlink, missing_ok=True)
await self.set_value(key=path.stem, value=input_data)
if (self.path_to_kvs / configuration.canonical_input_key).exists():
# Handle missing metadata
if not await self.record_exists(key=configuration.canonical_input_key):
input_data = await asyncio.to_thread(
lambda: json.loads((self.path_to_kvs / configuration.canonical_input_key).read_text())
)
await self.set_value(key=configuration.canonical_input_key, value=input_data)

for alternative_key in alternative_keys:
if (alternative_input_file := self.path_to_kvs / alternative_key).exists():
logger.warning(f'Redundant input file found: {alternative_input_file}')
else:
for alternative_key in alternative_keys:
alternative_input_file = self.path_to_kvs / alternative_key

# Handle missing metadata
if alternative_input_file.exists() and not await self.record_exists(key=alternative_key):
with alternative_input_file.open() as f:
input_data = await asyncio.to_thread(lambda: json.load(f))
await self.set_value(key=configuration.canonical_input_key, value=input_data)

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
configuration = Configuration.get_global_configuration()

if key in configuration.input_key_candidates:
for candidate in configuration.input_key_candidates:
value = await super().get_value(key=candidate)
if value is not None:
return value

return await super().get_value(key=key)
13 changes: 9 additions & 4 deletions tests/unit/storage_clients/test_file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
from typing import TYPE_CHECKING

import pytest

from crawlee._consts import METADATA_FILENAME

from apify import Actor, Configuration
Expand Down Expand Up @@ -61,19 +63,22 @@ async def test_purge_preserves_input_file_and_metadata() -> None:

# Verify INPUT.json content is unchanged
input_content = await asyncio.to_thread(input_file.read_text)
assert input_content == '{"test": "input"}'
assert json.loads(input_content) == json.loads('{"test": "input"}')


async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None:
@pytest.mark.parametrize('input_file_name', ['INPUT', 'INPUT.json'])
async def test_pre_existing_input_used_by_actor(tmp_path: Path, input_file_name: str) -> None:
pre_existing_input = {
'foo': 'bar',
}

configuration = Configuration.get_global_configuration()
# Create pre-existing INPUT.json file
path_to_input = tmp_path / 'key_value_stores' / 'default'
path_to_input.mkdir(parents=True)
(path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input))
(path_to_input / input_file_name).write_text(json.dumps(pre_existing_input))

async with Actor():
assert pre_existing_input == await Actor.get_input()

# Make sure that the input file doesn't get renamed in the process
assert (path_to_input / input_file_name).exists()
Loading