Skip to content

Commit 2d61f1e

Browse files
committed
Review call changes 2
1 parent 70890e7 commit 2d61f1e

File tree

5 files changed

+123
-197
lines changed

5 files changed

+123
-197
lines changed

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1414
from crawlee.storages import Dataset
1515

16-
from ._utils import _Alias
16+
from apify.storage_clients._apify._utils import Alias
1717

1818
if TYPE_CHECKING:
1919
from collections.abc import AsyncIterator
@@ -128,14 +128,18 @@ async def open(
128128
alias = None if alias == 'default' else alias
129129

130130
if alias:
131-
# Create a new storage and store the alias mapping
132-
new_storage_metadata = DatasetMetadata.model_validate(
133-
await apify_datasets_client.get_or_create(),
134-
)
135-
id = new_storage_metadata.id
136-
await _Alias(storage_type=Dataset, alias=alias, token=token, api_url=api_url).store_mapping_to_apify_kvs(
137-
storage_id=id
138-
)
131+
# Check if there is pre-existing alias mapping in the default KVS.
132+
async with Alias(storage_type=Dataset, alias=alias, configuration=configuration) as _alias:
133+
id = await _alias.resolve_id()
134+
135+
# There was no pre-existing alias in the mapping.
136+
# Create a new unnamed storage and store the mapping.
137+
if id is None:
138+
new_storage_metadata = DatasetMetadata.model_validate(
139+
await apify_datasets_client.get_or_create(),
140+
)
141+
id = new_storage_metadata.id
142+
await _alias.store_mapping(storage_id=id)
139143

140144
# If name is provided, get or create the storage by name.
141145
elif name:

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from crawlee.storages import KeyValueStore
1414

1515
from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
16-
from ._utils import _Alias
16+
from ._utils import Alias
1717
from apify._crypto import create_hmac_signature
1818

1919
if TYPE_CHECKING:
@@ -119,14 +119,19 @@ async def open(
119119
alias = None if alias == 'default' else alias
120120

121121
if alias:
122-
# Create a new storage and store the alias mapping
123-
new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate(
124-
await apify_kvss_client.get_or_create(),
125-
)
126-
id = new_storage_metadata.id
127-
await _Alias(
128-
storage_type=KeyValueStore, alias=alias, token=token, api_url=api_url
129-
).store_mapping_to_apify_kvs(storage_id=id)
122+
# Check if there is pre-existing alias mapping in the default KVS.
123+
async with Alias(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias:
124+
id = await _alias.resolve_id()
125+
126+
# There was no pre-existing alias in the mapping.
127+
# Create a new unnamed storage and store the mapping.
128+
if id is None:
129+
# Create a new storage and store the alias mapping
130+
new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate(
131+
await apify_kvss_client.get_or_create(),
132+
)
133+
id = new_storage_metadata.id
134+
await _alias.store_mapping(storage_id=id)
130135

131136
# If name is provided, get or create the storage by name.
132137
elif name:

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from crawlee.storages import RequestQueue
2020

2121
from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead
22-
from ._utils import _Alias
22+
from ._utils import Alias
2323
from apify import Request
2424

2525
if TYPE_CHECKING:
@@ -194,14 +194,18 @@ async def open(
194194
apify_rqs_client = apify_client_async.request_queues()
195195

196196
if alias:
197-
# Create a new storage and store the alias mapping
198-
new_storage_metadata = RequestQueueMetadata.model_validate(
199-
await apify_rqs_client.get_or_create(),
200-
)
201-
id = new_storage_metadata.id
202-
await _Alias(
203-
storage_type=RequestQueue, alias=alias, token=token, api_url=api_url
204-
).store_mapping_to_apify_kvs(storage_id=id)
197+
# Check if there is pre-existing alias mapping in the default KVS.
198+
async with Alias(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias:
199+
id = await _alias.resolve_id()
200+
201+
# There was no pre-existing alias in the mapping.
202+
# Create a new unnamed storage and store the mapping.
203+
if id is None:
204+
new_storage_metadata = RequestQueueMetadata.model_validate(
205+
await apify_rqs_client.get_or_create(),
206+
)
207+
id = new_storage_metadata.id
208+
await _alias.store_mapping(storage_id=id)
205209

206210
# If name is provided, get or create the storage by name.
207211
elif name:
Lines changed: 2 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
from __future__ import annotations
22

3-
import logging
4-
from asyncio import Lock
53
from typing import TYPE_CHECKING
64

75
from typing_extensions import override
86

9-
from crawlee import service_locator
107
from crawlee.storage_clients._base import StorageClient
118

129
from ._dataset_client import ApifyDatasetClient
1310
from ._key_value_store_client import ApifyKeyValueStoreClient
1411
from ._request_queue_client import ApifyRequestQueueClient
15-
from ._utils import _ALIAS_MAPPING_KEY, _Alias
16-
from apify._configuration import Configuration
12+
from ._utils import Alias
1713
from apify._configuration import Configuration as ApifyConfiguration
1814
from apify._utils import docs_group
1915

@@ -27,11 +23,6 @@
2723
class ApifyStorageClient(StorageClient):
2824
"""Apify storage client."""
2925

30-
_alias_storages_initialized = False
31-
"""Flag that indicates whether the pre-existing alias storages were already initialized."""
32-
_alias_init_lock: Lock | None = None
33-
"""Lock for creating alias storages. Only one alias storage can be created at the time."""
34-
3526
# This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent.
3627
_lsp_violation_error_message_template = (
3728
'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.'
@@ -42,7 +33,7 @@ def get_additional_cache_key(self, configuration: CrawleeConfiguration) -> Hasha
4233
if isinstance(configuration, ApifyConfiguration):
4334
if configuration.api_base_url is None or configuration.token is None:
4435
raise ValueError("'Configuration.api_base_url' and 'Configuration.token' must be set.")
45-
return _Alias.get_additional_cache_key(configuration.api_base_url, configuration.token)
36+
return Alias.get_additional_cache_key(configuration)
4637
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
4738

4839
@override
@@ -56,10 +47,6 @@ async def create_dataset_client(
5647
) -> ApifyDatasetClient:
5748
configuration = configuration or ApifyConfiguration.get_global_configuration()
5849
if isinstance(configuration, ApifyConfiguration):
59-
if alias:
60-
await self._initialize_alias_storages()
61-
async with self.get_alias_init_lock():
62-
return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)
6350
return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)
6451

6552
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
@@ -75,12 +62,6 @@ async def create_kvs_client(
7562
) -> ApifyKeyValueStoreClient:
7663
configuration = configuration or ApifyConfiguration.get_global_configuration()
7764
if isinstance(configuration, ApifyConfiguration):
78-
if alias:
79-
await self._initialize_alias_storages()
80-
async with self.get_alias_init_lock():
81-
return await ApifyKeyValueStoreClient.open(
82-
id=id, name=name, alias=alias, configuration=configuration
83-
)
8465
return await ApifyKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration)
8566

8667
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
@@ -96,66 +77,6 @@ async def create_rq_client(
9677
) -> ApifyRequestQueueClient:
9778
configuration = configuration or ApifyConfiguration.get_global_configuration()
9879
if isinstance(configuration, ApifyConfiguration):
99-
if alias:
100-
await self._initialize_alias_storages()
101-
async with self.get_alias_init_lock():
102-
return await ApifyRequestQueueClient.open(
103-
id=id, name=name, alias=alias, configuration=configuration
104-
)
10580
return await ApifyRequestQueueClient.open(id=id, name=name, alias=alias, configuration=configuration)
10681

10782
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
108-
109-
@classmethod
110-
def get_alias_init_lock(cls) -> Lock:
111-
if not cls._alias_init_lock:
112-
cls._alias_init_lock = Lock()
113-
return cls._alias_init_lock
114-
115-
@classmethod
116-
async def _initialize_alias_storages(cls) -> None:
117-
"""Initialize alias storages.
118-
119-
This method is called once to populate storage_instance_manager alias related cache. All existing alias
120-
storages are saved in storage_instance_manager cache. If the alias storage is not there, it does not exist yet.
121-
"""
122-
if not Configuration.get_global_configuration().is_at_home:
123-
logging.getLogger(__name__).warning(
124-
'Alias storage limited retention is only supported on Apify platform. '
125-
'No pre-existing storages are imported.'
126-
)
127-
cls._alias_storages_initialized = True
128-
return
129-
130-
async with cls.get_alias_init_lock():
131-
if cls._alias_storages_initialized:
132-
return
133-
134-
cache = service_locator.storage_instance_manager._cache_by_storage_client[ApifyStorageClient] # noqa: SLF001
135-
136-
default_kvs_client = await _Alias.get_default_kvs_client()
137-
138-
record = await default_kvs_client.get_record(key=_ALIAS_MAPPING_KEY)
139-
140-
if record is not None and 'value' in record:
141-
# get_record can return {key: ..., value: ..., content_type: ...}
142-
alias_export_map = record['value']
143-
144-
for export_key, storage_id in alias_export_map.value.items():
145-
exported_alias = _Alias.from_exported_string(export_key)
146-
147-
# Re-create custom config used to open the storage
148-
custom_config = Configuration()
149-
custom_config.api_base_url = exported_alias.api_url
150-
custom_config.token = exported_alias.token
151-
152-
# Populate the id cache by opening storage by id
153-
storage = await exported_alias.storage_type.open(
154-
id=storage_id, configuration=custom_config, storage_client=ApifyStorageClient()
155-
)
156-
# Populate the alias cache as well
157-
cache.by_alias[exported_alias.storage_type][exported_alias.alias][
158-
exported_alias.additional_cache_key
159-
] = storage
160-
161-
cls._alias_storages_initialized = True

0 commit comments

Comments
 (0)