Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timedelta
from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Annotated, Any

from pydantic import AliasChoices, BeforeValidator, Field, model_validator
Expand Down Expand Up @@ -421,6 +422,14 @@ def disable_browser_sandbox_on_platform(self) -> Self:
logger.warning('Actor is running on the Apify platform, `disable_browser_sandbox` was changed to True.')
return self

@property
def canonical_input_key(self) -> str:
return Path(self.input_key).stem

@property
def input_key_candidates(self) -> set[str]:
return {self.input_key, self.canonical_input_key, Path(self.canonical_input_key).with_suffix('.json').name}

@classmethod
def get_global_configuration(cls) -> Configuration:
"""Retrieve the global instance of the configuration.
Expand Down
49 changes: 41 additions & 8 deletions src/apify/storage_clients/_file_system/_key_value_store_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import asyncio
import json
import logging
from pathlib import Path

from more_itertools import flatten
from typing_extensions import override

from crawlee._consts import METADATA_FILENAME
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreRecord

from apify._configuration import Configuration

logger = logging.getLogger(__name__)


class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
"""Apify-specific implementation of the `FileSystemKeyValueStoreClient`.
Expand All @@ -24,16 +29,21 @@ async def purge(self) -> None:
It deletes all files in the key-value store directory, except for the metadata file and
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
"""
kvs_input_key = Configuration.get_global_configuration().input_key
configuration = Configuration.get_global_configuration()

# First try to find the alternative format of the input file and process it if it exists.
for file_path in self.path_to_kvs.glob('*'):
if file_path.name == f'{kvs_input_key}.json':
await self._process_input_json(file_path)
if file_path.name in configuration.input_key_candidates:
await self._sanitize_input_json(file_path)

async with self._lock:
files_to_keep = set(
flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates)
)
files_to_keep.add(METADATA_FILENAME)

for file_path in self.path_to_kvs.glob('*'):
if file_path.name in {METADATA_FILENAME, kvs_input_key, f'{kvs_input_key}.{METADATA_FILENAME}'}:
if file_path.name in files_to_keep:
continue
if file_path.is_file():
await asyncio.to_thread(file_path.unlink, missing_ok=True)
Expand All @@ -43,15 +53,38 @@ async def purge(self) -> None:
update_modified_at=True,
)

async def _process_input_json(self, path: Path) -> None:
"""Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
async def _sanitize_input_json(self, path: Path) -> None:
"""Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient.

For example: INPUT.json -> INPUT, INPUT.json.metadata
"""
configuration = Configuration.get_global_configuration()

f = None
try:
f = await asyncio.to_thread(path.open)
input_data = json.load(f)
finally:
f.close()
if f is not None:
f.close()

if await self.record_exists(key=configuration.canonical_input_key):
logger.warning(f'Redundant input file found: {path}')
return

logger.info(f'Renaming input file: {path.name} -> {configuration.canonical_input_key}')

await asyncio.to_thread(path.unlink, missing_ok=True)
await self.set_value(key=path.stem, value=input_data)
await self.set_value(key=configuration.canonical_input_key, value=input_data)

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
configuration = Configuration.get_global_configuration()

if key in configuration.input_key_candidates:
for candidate in configuration.input_key_candidates:
value = await super().get_value(key=candidate)
if value is not None:
return value

return await super().get_value(key=key)
10 changes: 6 additions & 4 deletions tests/unit/storage_clients/test_file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
from typing import TYPE_CHECKING

import pytest

from crawlee._consts import METADATA_FILENAME

from apify import Actor, Configuration
Expand Down Expand Up @@ -61,19 +63,19 @@ async def test_purge_preserves_input_file_and_metadata() -> None:

# Verify INPUT.json content is unchanged
input_content = await asyncio.to_thread(input_file.read_text)
assert input_content == '{"test": "input"}'
assert json.loads(input_content) == json.loads('{"test": "input"}')


async def test_pre_existing_input_used_by_actor(tmp_path: Path) -> None:
@pytest.mark.parametrize('input_file_name', ['INPUT', 'INPUT.json'])
async def test_pre_existing_input_used_by_actor(tmp_path: Path, input_file_name: str) -> None:
pre_existing_input = {
'foo': 'bar',
}

configuration = Configuration.get_global_configuration()
# Create pre-existing INPUT.json file
path_to_input = tmp_path / 'key_value_stores' / 'default'
path_to_input.mkdir(parents=True)
(path_to_input / f'{configuration.input_key}.json').write_text(json.dumps(pre_existing_input))
(path_to_input / input_file_name).write_text(json.dumps(pre_existing_input))

async with Actor():
assert pre_existing_input == await Actor.get_input()