Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/apify/storage_clients/_file_system/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from itertools import chain
from pathlib import Path
from typing import Self
from typing import Any, Self

from typing_extensions import override

Expand Down Expand Up @@ -89,10 +89,32 @@ async def purge(self) -> None:

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
if key == self._input_key:
# Potentially point to custom input file name instead
key = self._input_key_filename
return await super().get_value(key=key)
return await super().get_value(key=self._resolve_input_key(key))

@override
async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None:
await super().set_value(key=self._resolve_input_key(key), value=value, content_type=content_type)

@override
async def record_exists(self, *, key: str) -> bool:
return await super().record_exists(key=self._resolve_input_key(key))

@override
async def delete_value(self, *, key: str) -> None:
await super().delete_value(key=self._resolve_input_key(key))

@override
async def get_public_url(self, *, key: str) -> str:
return await super().get_public_url(key=self._resolve_input_key(key))

def _resolve_input_key(self, key: str) -> str:
"""Redirect the logical input key to the actual input file name on disk.

The platform may store the Actor input under a name with an extension (e.g. `INPUT.json`) while the
logical key stays `INPUT`. Redirecting keeps every record operation pointed at that single file, so
e.g. `set_value` overwrites it instead of creating a duplicate that would later be rejected on open.
"""
return self._input_key_filename if key == self._input_key else key

@staticmethod
async def _create_missing_metadata_for_input_file(key: str, record_path: Path) -> None:
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/storage_clients/test_file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,45 @@ async def test_pre_existing_input_used_by_actor(input_file_name: str) -> None:
path_to_input / input_file_name,
path_to_input / f'{input_file_name}.__metadata__.json',
}


async def test_set_value_with_input_key_targets_existing_input_file() -> None:
"""`set_value` with the input key overwrites the existing `INPUT.json` instead of creating a duplicate."""
configuration = Configuration.get_global_configuration()

# Pre-create a custom-named input file (with extension) before opening the client.
kvs_path = Path(configuration.storage_dir) / 'key_value_stores' / 'default'
kvs_path.mkdir(parents=True)
(kvs_path / 'INPUT.json').write_text(json.dumps({'foo': 'bar'}))

client = await ApifyFileSystemKeyValueStoreClient.open(id=None, name=None, alias=None, configuration=configuration)
await client.set_value(key=configuration.input_key, value={'foo': 'baz'})

# The existing input file is overwritten in place. No second input file (e.g. `INPUT`) is created.
assert set(kvs_path.glob('*')) == {
kvs_path / '__metadata__.json',
kvs_path / 'INPUT.json',
kvs_path / f'INPUT.json.{METADATA_FILENAME}',
}

# Reopening must not raise "Only one input file is allowed", i.e. no duplicate input file was created.
client = await ApifyFileSystemKeyValueStoreClient.open(id=None, name=None, alias=None, configuration=configuration)
record = await client.get_value(key=configuration.input_key)
assert record is not None
assert record.value == {'foo': 'baz'}


async def test_record_exists_and_delete_value_target_existing_input_file() -> None:
"""`record_exists` and `delete_value` with the input key operate on the existing `INPUT.json`."""
configuration = Configuration.get_global_configuration()

kvs_path = Path(configuration.storage_dir) / 'key_value_stores' / 'default'
kvs_path.mkdir(parents=True)
(kvs_path / 'INPUT.json').write_text(json.dumps({'foo': 'bar'}))

client = await ApifyFileSystemKeyValueStoreClient.open(id=None, name=None, alias=None, configuration=configuration)
assert await client.record_exists(key=configuration.input_key) is True

await client.delete_value(key=configuration.input_key)
assert await client.record_exists(key=configuration.input_key) is False
assert not (kvs_path / 'INPUT.json').exists()