Skip to content

Commit c2c8ca5

Browse files
committed
Update NDU creation logic based on updated Crawlee
1 parent f28fcd7 commit c2c8ca5

File tree

5 files changed

+206
-141
lines changed

5 files changed

+206
-141
lines changed

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from crawlee._utils.file import json_dumps
1212
from crawlee.storage_clients._base import DatasetClient
1313
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
14+
from crawlee.storages import Dataset
1415

15-
from ._utils import resolve_alias_to_id, store_alias_mapping
16+
from ._utils import _Alias
1617

1718
if TYPE_CHECKING:
1819
from collections.abc import AsyncIterator
@@ -126,19 +127,15 @@ async def open(
126127
# Normalize 'default' alias to None
127128
alias = None if alias == 'default' else alias
128129

129-
# Handle alias resolution
130130
if alias:
131-
# Try to resolve alias to existing storage ID
132-
resolved_id = await resolve_alias_to_id(alias, 'dataset', configuration)
133-
if resolved_id:
134-
id = resolved_id
135-
else:
136-
# Create a new storage and store the alias mapping
137-
new_storage_metadata = DatasetMetadata.model_validate(
138-
await apify_datasets_client.get_or_create(),
139-
)
140-
id = new_storage_metadata.id
141-
await store_alias_mapping(alias, 'dataset', id, configuration)
131+
# Create a new storage and store the alias mapping
132+
new_storage_metadata = DatasetMetadata.model_validate(
133+
await apify_datasets_client.get_or_create(),
134+
)
135+
id = new_storage_metadata.id
136+
await _Alias(storage_type=Dataset, alias=alias, token=token, api_url=api_url).store_mapping_to_apify_kvs(
137+
storage_id=id
138+
)
142139

143140
# If name is provided, get or create the storage by name.
144141
elif name:

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
from apify_client import ApifyClientAsync
1111
from crawlee.storage_clients._base import KeyValueStoreClient
1212
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata
13+
from crawlee.storages import KeyValueStore
1314

1415
from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
15-
from ._utils import resolve_alias_to_id, store_alias_mapping
16+
from ._utils import _Alias
1617
from apify._crypto import create_hmac_signature
1718

1819
if TYPE_CHECKING:
@@ -117,19 +118,15 @@ async def open(
117118
# Normalize 'default' alias to None
118119
alias = None if alias == 'default' else alias
119120

120-
# Handle alias resolution
121121
if alias:
122-
# Try to resolve alias to existing storage ID
123-
resolved_id = await resolve_alias_to_id(alias, 'kvs', configuration)
124-
if resolved_id:
125-
id = resolved_id
126-
else:
127-
# Create a new storage and store the alias mapping
128-
new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate(
129-
await apify_kvss_client.get_or_create(),
130-
)
131-
id = new_storage_metadata.id
132-
await store_alias_mapping(alias, 'kvs', id, configuration)
122+
# Create a new storage and store the alias mapping
123+
new_storage_metadata = ApifyKeyValueStoreMetadata.model_validate(
124+
await apify_kvss_client.get_or_create(),
125+
)
126+
id = new_storage_metadata.id
127+
await _Alias(
128+
storage_type=KeyValueStore, alias=alias, token=token, api_url=api_url
129+
).store_mapping_to_apify_kvs(storage_id=id)
133130

134131
# If name is provided, get or create the storage by name.
135132
elif name:

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
from crawlee._utils.crypto import crypto_random_object_id
1717
from crawlee.storage_clients._base import RequestQueueClient
1818
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
19+
from crawlee.storages import RequestQueue
1920

2021
from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead
21-
from ._utils import resolve_alias_to_id, store_alias_mapping
22+
from ._utils import _Alias
2223
from apify import Request
2324

2425
if TYPE_CHECKING:
@@ -192,22 +193,15 @@ async def open(
192193
)
193194
apify_rqs_client = apify_client_async.request_queues()
194195

195-
# Normalize 'default' alias to None
196-
alias = None if alias == 'default' else alias
197-
198-
# Handle alias resolution
199196
if alias:
200-
# Try to resolve alias to existing storage ID
201-
resolved_id = await resolve_alias_to_id(alias, 'rq', configuration)
202-
if resolved_id:
203-
id = resolved_id
204-
else:
205-
# Create a new storage and store the alias mapping
206-
new_storage_metadata = RequestQueueMetadata.model_validate(
207-
await apify_rqs_client.get_or_create(),
208-
)
209-
id = new_storage_metadata.id
210-
await store_alias_mapping(alias, 'rq', id, configuration)
197+
# Create a new storage and store the alias mapping
198+
new_storage_metadata = RequestQueueMetadata.model_validate(
199+
await apify_rqs_client.get_or_create(),
200+
)
201+
id = new_storage_metadata.id
202+
await _Alias(
203+
storage_type=RequestQueue, alias=alias, token=token, api_url=api_url
204+
).store_mapping_to_apify_kvs(storage_id=id)
211205

212206
# If name is provided, get or create the storage by name.
213207
elif name:

src/apify/storage_clients/_apify/_storage_client.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
from __future__ import annotations
22

3+
from asyncio import Lock
34
from typing import TYPE_CHECKING
45

56
from typing_extensions import override
67

8+
from crawlee import service_locator
79
from crawlee.storage_clients._base import StorageClient
810

911
from ._dataset_client import ApifyDatasetClient
1012
from ._key_value_store_client import ApifyKeyValueStoreClient
1113
from ._request_queue_client import ApifyRequestQueueClient
14+
from ._utils import _ALIAS_MAPPING_KEY, _Alias
15+
from apify._configuration import Configuration
1216
from apify._configuration import Configuration as ApifyConfiguration
1317
from apify._utils import docs_group
1418

@@ -22,6 +26,11 @@
2226
class ApifyStorageClient(StorageClient):
2327
"""Apify storage client."""
2428

29+
_alias_storages_initialized = False
30+
"""Flag that indicates whether the pre-existing alias storages were already initialized."""
31+
_alias_init_lock: Lock | None = None
32+
"""Lock for creating alias storages. Only one alias storage can be created at the time."""
33+
2534
# This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent.
2635
_lsp_violation_error_message_template = (
2736
'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.'
@@ -30,7 +39,9 @@ class ApifyStorageClient(StorageClient):
3039
@override
3140
def get_additional_cache_key(self, configuration: CrawleeConfiguration) -> Hashable:
3241
if isinstance(configuration, ApifyConfiguration):
33-
return f'{configuration.api_base_url},{configuration.token}'
42+
if configuration.api_base_url is None or configuration.token is None:
43+
raise ValueError("'Configuration.api_base_url' and 'Configuration.token' must be set.")
44+
return _Alias.get_additional_cache_key(configuration.api_base_url, configuration.token)
3445
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
3546

3647
@override
@@ -44,6 +55,10 @@ async def create_dataset_client(
4455
) -> ApifyDatasetClient:
4556
configuration = configuration or ApifyConfiguration.get_global_configuration()
4657
if isinstance(configuration, ApifyConfiguration):
58+
if alias:
59+
await self._initialize_alias_storages()
60+
async with self.get_alias_init_lock():
61+
return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)
4762
return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)
4863

4964
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
@@ -59,6 +74,12 @@ async def create_kvs_client(
5974
) -> ApifyKeyValueStoreClient:
6075
configuration = configuration or ApifyConfiguration.get_global_configuration()
6176
if isinstance(configuration, ApifyConfiguration):
77+
if alias:
78+
await self._initialize_alias_storages()
79+
async with self.get_alias_init_lock():
80+
return await ApifyKeyValueStoreClient.open(
81+
id=id, name=name, alias=alias, configuration=configuration
82+
)
6283
return await ApifyKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration)
6384

6485
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
@@ -74,6 +95,58 @@ async def create_rq_client(
7495
) -> ApifyRequestQueueClient:
7596
configuration = configuration or ApifyConfiguration.get_global_configuration()
7697
if isinstance(configuration, ApifyConfiguration):
98+
if alias:
99+
await self._initialize_alias_storages()
100+
async with self.get_alias_init_lock():
101+
return await ApifyRequestQueueClient.open(
102+
id=id, name=name, alias=alias, configuration=configuration
103+
)
77104
return await ApifyRequestQueueClient.open(id=id, name=name, alias=alias, configuration=configuration)
78105

79106
raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
107+
108+
@classmethod
109+
def get_alias_init_lock(cls) -> Lock:
110+
if not cls._alias_init_lock:
111+
cls._alias_init_lock = Lock()
112+
return cls._alias_init_lock
113+
114+
@classmethod
115+
async def _initialize_alias_storages(cls) -> None:
116+
"""Initialize alias storages.
117+
118+
This method is called once to populate storage_instance_manager alias related cache. All existing alias
119+
storages are saved in storage_instance_manager cache. If the alias storage is not there, it does not exist yet.
120+
"""
121+
async with cls.get_alias_init_lock():
122+
if cls._alias_storages_initialized:
123+
return
124+
125+
cache = service_locator.storage_instance_manager._cache_by_storage_client[ApifyStorageClient] # noqa: SLF001
126+
127+
default_kvs_client = await _Alias.get_default_kvs_client()
128+
129+
record = await default_kvs_client.get_record(key=_ALIAS_MAPPING_KEY)
130+
131+
if record is not None and 'value' in record:
132+
# get_record can return {key: ..., value: ..., content_type: ...}
133+
alias_export_map = record['value']
134+
135+
for export_key, storage_id in alias_export_map.value.items():
136+
exported_alias = _Alias.from_exported_string(export_key)
137+
138+
# Re-create custom config used to open the storage
139+
custom_config = Configuration()
140+
custom_config.api_base_url = exported_alias.api_url
141+
custom_config.token = exported_alias.token
142+
143+
# Populate the id cache by opening storage by id
144+
storage = await exported_alias.storage_type.open(
145+
id=storage_id, configuration=custom_config, storage_client=ApifyStorageClient()
146+
)
147+
# Populate the alias cache as well
148+
cache.by_alias[exported_alias.storage_type][exported_alias.alias][
149+
exported_alias.additional_cache_key
150+
] = storage
151+
152+
cls._alias_storages_initialized = True

0 commit comments

Comments
 (0)