Skip to content

Commit 3813ea3

Browse files
committed
Add docs and compute_short_hash for additional cache key
1 parent 2d61f1e commit 3813ea3

File tree

6 files changed

+90
-16
lines changed

6 files changed

+90
-16
lines changed

src/apify/storage_clients/_apify/_storage_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ class ApifyStorageClient(StorageClient):
3131
@override
3232
def get_additional_cache_key(self, configuration: CrawleeConfiguration) -> Hashable:
3333
if isinstance(configuration, ApifyConfiguration):
34-
if configuration.api_base_url is None or configuration.token is None:
35-
raise ValueError("'Configuration.api_base_url' and 'Configuration.token' must be set.")
3634
return Alias.get_additional_cache_key(configuration)
37-
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
35+
36+
config_class = type(configuration)
37+
raise TypeError(
38+
self._lsp_violation_error_message_template.format(f'{config_class.__module__}.{config_class.__name__}')
39+
)
3840

3941
@override
4042
async def create_dataset_client(

src/apify/storage_clients/_apify/_utils.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING, ClassVar
77

88
from apify_client import ApifyClientAsync
9+
from crawlee._utils.crypto import compute_short_hash
910
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
1011

1112
from apify._configuration import Configuration
@@ -25,15 +26,18 @@
2526
class Alias:
2627
"""Class for handling aliases.
2728
28-
It includes helper methods for serialization/deserialization and initialization from kvs.
29+
The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using
30+
default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid
31+
unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on
32+
Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot.
2933
"""
3034

3135
_alias_map: ClassVar[dict[str, str]] = {}
3236
"""Map containing pre-existing alias storages and their ids. Global for all instances."""
3337
_alias_init_lock: Lock | None = None
3438
"""Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances."""
3539

36-
ALIAS_STORAGE_KEY_SEPARATOR = '|'
40+
ALIAS_STORAGE_KEY_SEPARATOR = ','
3741
ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING'
3842

3943
def __init__(self, storage_type: _StorageT, alias: str, configuration: Configuration) -> None:
@@ -55,12 +59,24 @@ async def __aexit__(
5559

5660
@classmethod
5761
async def _get_alias_init_lock(cls) -> Lock:
62+
"""Get lock for controlling the creation of the alias storages.
63+
64+
The lock is shared for all instances of the Alias class.
65+
It is created in async method to ensure that some event loop is already running.
66+
"""
5867
if cls._alias_init_lock is None:
5968
cls._alias_init_lock = Lock()
6069
return cls._alias_init_lock
6170

6271
@classmethod
6372
async def get_alias_map(cls) -> dict[str, str]:
73+
"""Get the aliases and storage ids mapping from the default kvs.
74+
75+
Mapping is loaded from kvs only once and is shared for all instances of the Alias class.
76+
77+
Returns:
78+
Map of aliases and storage ids.
79+
"""
6480
if not cls._alias_map:
6581
default_kvs_client = await get_default_kvs_client()
6682

@@ -79,13 +95,17 @@ async def get_alias_map(cls) -> dict[str, str]:
7995

8096
@classmethod
8197
def get_additional_cache_key(cls, configuration: Configuration) -> str:
82-
"""Get additional cache key based on api_url and token."""
83-
if configuration.api_base_url is None or configuration.token is None:
84-
raise ValueError("'Configuration.api_base_url' and 'Configuration.token' must be set.")
85-
return str((configuration.api_base_url, configuration.token))
98+
"""Get additional cache key based on configuration.
99+
100+
Use only api_public_base_url and token as the relevant for differentiating storages.
101+
"""
102+
if configuration.api_public_base_url is None or configuration.token is None:
103+
raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.")
104+
return compute_short_hash(f'{configuration.api_public_base_url}{configuration.token}'.encode())
86105

87106
@property
88107
def storage_key(self) -> str:
108+
"""Get a unique storage key used for storing the alias in the mapping."""
89109
return self.ALIAS_STORAGE_KEY_SEPARATOR.join(
90110
[
91111
self.storage_type.__name__,
@@ -95,11 +115,19 @@ def storage_key(self) -> str:
95115
)
96116

97117
async def resolve_id(self) -> str | None:
118+
"""Get id of the aliased storage.
119+
120+
Either locate the id in the in-memory mapping or create the new storage.
121+
122+
Returns:
123+
Storage id if it exists, None otherwise.
124+
"""
98125
return (await self.get_alias_map()).get(self.storage_key, None)
99126

100127
async def store_mapping(self, storage_id: str) -> None:
101-
"""Add alias and related storage id to the mapping in default kvs."""
102-
self._alias_map[self.storage_key] = storage_id
128+
"""Add alias and related storage id to the mapping in default kvs and local in-memory mapping."""
129+
# Update in-memory mapping
130+
(await self.get_alias_map())[self.storage_key] = storage_id
103131
if not Configuration.get_global_configuration().is_at_home:
104132
logging.getLogger(__name__).warning(
105133
'Alias storage limited retention is only supported on Apify platform. Storage is not exported.'
@@ -117,7 +145,6 @@ async def store_mapping(self, storage_id: str) -> None:
117145
record = record['value']
118146

119147
# Update or create the record with the new alias mapping
120-
121148
if isinstance(record, dict):
122149
record[self.storage_key] = storage_id
123150
else:
@@ -126,7 +153,7 @@ async def store_mapping(self, storage_id: str) -> None:
126153
# Store the mapping back in the KVS.
127154
await default_kvs_client.set_record(self.ALIAS_MAPPING_KEY, record)
128155
except Exception as exc:
129-
logger.warning(f'Error accessing alias mapping for {self.alias}: {exc}')
156+
logger.warning(f'Error storing alias mapping for {self.alias}: {exc}')
130157

131158

132159
async def get_default_kvs_client() -> KeyValueStoreClientAsync:

tests/integration/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from ._utils import generate_unique_resource_name
2121
from apify import Actor
2222
from apify._models import ActorRun
23+
from apify.storage_clients._apify._utils import Alias
2324

2425
if TYPE_CHECKING:
2526
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping
@@ -60,6 +61,10 @@ def _prepare_test_env() -> None:
6061
service_locator._storage_client = None
6162
service_locator.storage_instance_manager.clear_cache()
6263

64+
# Reset the Alias class state.
65+
Alias._alias_map = {}
66+
Alias._alias_init_lock = None
67+
6368
# Verify that the test environment was set up correctly.
6469
assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path)
6570

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from crawlee import service_locator
6+
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
7+
8+
from apify import Configuration
9+
from apify.storage_clients import ApifyStorageClient
10+
11+
12+
@pytest.mark.parametrize(
13+
'storage',
14+
[Dataset, KeyValueStore, RequestQueue],
15+
)
16+
async def test_alias_concurrent_creation_local(
17+
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
18+
) -> None:
19+
"""Test that storages created with same alias are created only once even when created concurrently."""
20+
service_locator.set_configuration(Configuration(token=apify_token))
21+
service_locator.set_storage_client(ApifyStorageClient())
22+
tasks = [asyncio.create_task(storage_type.open(alias='test')) for _ in range(2)]
23+
24+
storages = await asyncio.gather(*tasks)
25+
unique_storage_ids = {storage.id for storage in storages}
26+
try:
27+
# Only one aliased storage should be created.
28+
assert len(unique_storage_ids) == 1
29+
30+
# Clean up
31+
await storages[0].drop()
32+
except AssertionError:
33+
for storage in storages:
34+
await storage.drop()

tests/unit/actor/test_apify_storage.py renamed to tests/unit/actor/test_apify_storages.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
async def test_get_additional_cache_key(
2525
storage: Storage, _storage_client: ApifyDatasetClient | ApifyKeyValueStoreClient | ApifyRequestQueueClient
2626
) -> None:
27-
"""Test that Storages based on `ApifyStorageClient` include `token` and `api_base_url` in additional cache key."""
27+
"""Test that Storages based on `ApifyStorageClient` include `token` and `api_public_base_url` in
28+
additional cache key."""
2829

2930
def create_metadata(id: str) -> StorageMetadata:
3031
now = datetime.now(tz=timezone.utc)
@@ -36,10 +37,10 @@ def create_metadata(id: str) -> StorageMetadata:
3637

3738
config_1 = Configuration(token='a')
3839
config_2 = Configuration(token='b')
39-
config_3 = Configuration(token='a', api_base_url='https://super_custom_api.com')
40+
config_3 = Configuration(token='a', api_public_base_url='https://super_custom_api.com')
4041

4142
config_4 = Configuration(token='a')
42-
config_5 = Configuration(token='a', api_base_url='https://super_custom_api.com')
43+
config_5 = Configuration(token='a', api_public_base_url='https://super_custom_api.com')
4344

4445
mocked_client = AsyncMock(spec=type[_storage_client])
4546
mocked_client.get_metadata = AsyncMock(side_effect=lambda: create_metadata(next(storage_ids)))

tests/unit/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import apify._actor
1919
import apify.log
20+
from apify.storage_clients._apify._utils import Alias
2021

2122
if TYPE_CHECKING:
2223
from collections.abc import Callable, Iterator
@@ -72,6 +73,10 @@ def _prepare_test_env() -> None:
7273
service_locator._storage_client = None
7374
service_locator.storage_instance_manager.clear_cache()
7475

76+
# Reset the Alias class state.
77+
Alias._alias_map = {}
78+
Alias._alias_init_lock = None
79+
7580
# Verify that the test environment was set up correctly.
7681
assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path)
7782

0 commit comments

Comments
 (0)