Skip to content

Commit 1dacd35

Browse files
committed
Finished KVS, TODO: RQ and Dataset
1 parent 7107caf commit 1dacd35

File tree

3 files changed

+180
-165
lines changed

3 files changed

+180
-165
lines changed

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 36 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,49 @@
77

88
from typing_extensions import override
99

10+
from apify_client.clients import (
11+
KeyValueStoreClientAsync,
12+
)
1013
from crawlee.storage_clients._base import KeyValueStoreClient
1114
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata
1215
from crawlee.storages import KeyValueStore
1316

1417
from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
15-
from ._utils import AliasResolver, create_apify_client
18+
from ._utils import ApiClientFactory
1619

1720
if TYPE_CHECKING:
1821
from collections.abc import AsyncIterator
1922

20-
from apify_client.clients import KeyValueStoreClientAsync
23+
from apify_client.clients import (
24+
KeyValueStoreCollectionClientAsync,
25+
)
2126

2227
from apify import Configuration
2328

2429
logger = getLogger(__name__)
2530

2631

32+
class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]):
33+
@property
34+
def _collection_client(self) -> KeyValueStoreCollectionClientAsync:
35+
return self._api_client.key_value_stores()
36+
37+
def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync:
38+
return self._api_client.key_value_store(key_value_store_id=id)
39+
40+
@property
41+
def _default_id(self) -> str | None:
42+
return self._configuration.default_key_value_store_id
43+
44+
@property
45+
def _storage_type(self) -> type[KeyValueStore]:
46+
return KeyValueStore
47+
48+
@staticmethod
49+
def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata:
50+
return ApifyKeyValueStoreMetadata.model_validate(raw_metadata)
51+
52+
2753
class ApifyKeyValueStoreClient(KeyValueStoreClient):
2854
"""An Apify platform implementation of the key-value store client."""
2955

@@ -90,91 +116,14 @@ async def open(
90116
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
91117
in the configuration.
92118
"""
93-
if sum(1 for param in [id, name, alias] if param is not None) > 1:
94-
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
95-
96-
apify_client_async = create_apify_client(configuration)
97-
apify_kvss_client = apify_client_async.key_value_stores()
98-
99-
# Normalize unnamed default storage in cases where not defined in `configuration.default_key_value_store_id` to
100-
# unnamed storage aliased as `__default__`
101-
if not any([alias, name, id, configuration.default_key_value_store_id]):
102-
alias = '__default__'
103-
104-
if alias:
105-
# Check if there is pre-existing alias mapping in the default KVS.
106-
async with AliasResolver(storage_type=KeyValueStore, alias=alias, configuration=configuration) as _alias:
107-
id = await _alias.resolve_id()
108-
109-
if id:
110-
# There was id, storage has to exist, fetch metadata to confirm it.
111-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
112-
raw_metadata = await apify_kvs_client.get()
113-
if raw_metadata:
114-
return cls(
115-
api_client=apify_kvs_client,
116-
api_public_base_url='',
117-
# Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
118-
lock=asyncio.Lock(),
119-
)
120-
121-
# There was no pre-existing alias in the mapping or the id did not point to existing storage.
122-
# Create a new unnamed storage and store the alias mapping.
123-
metadata = ApifyKeyValueStoreMetadata.model_validate(
124-
await apify_kvss_client.get_or_create(),
125-
)
126-
await _alias.store_mapping(storage_id=metadata.id)
127-
128-
# Return the client for the newly created storage directly.
129-
# It was just created, no need to refetch it.
130-
return cls(
131-
api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id),
132-
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
133-
lock=asyncio.Lock(),
134-
)
135-
136-
# If name is provided, get or create the storage by name.
137-
elif name:
138-
metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name))
139-
140-
# Freshly fetched named storage. No need to fetch it again.
141-
return cls(
142-
api_client=apify_client_async.key_value_store(key_value_store_id=metadata.id),
143-
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
144-
lock=asyncio.Lock(),
145-
)
146-
# If id is provided, then storage has to exists.
147-
elif id:
148-
# Now create the client for the determined ID
149-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
150-
# Fetch its metadata.
151-
raw_metadata = await apify_kvs_client.get()
152-
# If metadata is None, it means the storage does not exist.
153-
if raw_metadata is None:
154-
raise ValueError(f'Opening key-value store with id={id} failed.')
155-
return cls(
156-
api_client=apify_kvs_client,
157-
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
158-
lock=asyncio.Lock(),
159-
)
160-
# Default key-value store ID from configuration
161-
elif configuration.default_key_value_store_id:
162-
# Now create the client for the determined ID
163-
apify_kvs_client = apify_client_async.key_value_store(
164-
key_value_store_id=configuration.default_key_value_store_id
165-
)
166-
# Fetch its metadata.
167-
raw_metadata = await apify_kvs_client.get()
168-
if not raw_metadata:
169-
metadata = ApifyKeyValueStoreMetadata.model_validate(await apify_kvss_client.get_or_create(name=name))
170-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=metadata.id)
171-
172-
return cls(
173-
api_client=apify_kvs_client,
174-
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
175-
lock=asyncio.Lock(),
176-
)
177-
raise RuntimeError('Will never happen')
119+
api_client, _ =await KvsApiClientFactory(
120+
configuration=configuration, alias=alias, name=name, id=id
121+
).get_client_with_metadata()
122+
return cls(
123+
api_client=api_client,
124+
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
125+
lock=asyncio.Lock(),
126+
)
178127

179128
@override
180129
async def purge(self) -> None:

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 11 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,18 @@
55

66
from typing_extensions import override
77

8-
from crawlee._utils.crypto import crypto_random_object_id
98
from crawlee.storage_clients._base import RequestQueueClient
10-
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
11-
from crawlee.storages import RequestQueue
129

1310
from ._models import ApifyRequestQueueMetadata, RequestQueueStats
1411
from ._request_queue_shared_client import ApifyRequestQueueSharedClient
1512
from ._request_queue_single_client import ApifyRequestQueueSingleClient
16-
from ._utils import AliasResolver, create_apify_client
1713

1814
if TYPE_CHECKING:
1915
from collections.abc import Sequence
2016

2117
from apify_client.clients import RequestQueueClientAsync
2218
from crawlee import Request
19+
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
2320

2421
from apify import Configuration
2522

@@ -224,73 +221,16 @@ async def open(
224221
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
225222
in the configuration.
226223
"""
227-
if sum(1 for param in [id, name, alias] if param is not None) > 1:
228-
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
229-
230-
apify_client_async = create_apify_client(configuration)
231-
apify_rqs_client = apify_client_async.request_queues()
232-
233-
# Normalize unnamed default storage in cases where not defined in `configuration.default_request_queue_id` to
234-
# unnamed storage aliased as `__default__`
235-
if not any([alias, name, id, configuration.default_request_queue_id]):
236-
alias = '__default__'
237-
238-
if alias:
239-
# Check if there is pre-existing alias mapping in the default KVS.
240-
async with AliasResolver(storage_type=RequestQueue, alias=alias, configuration=configuration) as _alias:
241-
id = await _alias.resolve_id()
242-
243-
# There was no pre-existing alias in the mapping.
244-
# Create a new unnamed storage and store the mapping.
245-
if id is None:
246-
new_storage_metadata = RequestQueueMetadata.model_validate(
247-
await apify_rqs_client.get_or_create(),
248-
)
249-
id = new_storage_metadata.id
250-
await _alias.store_mapping(storage_id=id)
251-
252-
# If name is provided, get or create the storage by name.
253-
elif name:
254-
id = RequestQueueMetadata.model_validate(
255-
await apify_rqs_client.get_or_create(name=name),
256-
).id
257-
258-
# If none are provided, try to get the default storage ID from environment variables.
259-
elif id is None:
260-
id = configuration.default_request_queue_id
261-
if not id:
262-
raise ValueError(
263-
'RequestQueue "id", "name", or "alias" must be specified, '
264-
'or a default default_request_queue_id ID must be set in the configuration.'
265-
)
266-
267-
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
268-
# It should persist across migrated or resurrected Actor runs on the Apify platform.
269-
_api_max_client_key_length = 32
270-
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
271-
:_api_max_client_key_length
272-
]
273-
274-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
275-
276-
# Fetch its metadata.
277-
metadata = await apify_rq_client.get()
278-
279-
# If metadata is None, it means the storage does not exist, so we create it.
280-
if metadata is None:
281-
id = RequestQueueMetadata.model_validate(
282-
await apify_rqs_client.get_or_create(),
283-
).id
284-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
285-
286-
# Verify that the storage exists by fetching its metadata again.
287-
metadata = await apify_rq_client.get()
288-
if metadata is None:
289-
raise ValueError(f'Opening request queue with id={id}, name={name}, and alias={alias} failed.')
290-
291-
metadata_model = RequestQueueMetadata.model_validate(metadata)
292-
293-
return cls(api_client=apify_rq_client, metadata=metadata_model, access=access)
224+
_api_client, metadata =await RqApiClientFactory(
225+
configuration=configuration, alias=alias, name=name, id=id
226+
).get_client_with_metadata()
227+
return cls(
228+
api_client=await RqApiClientFactory(
229+
configuration=configuration, alias=alias, name=name, id=id
230+
).create_api_client(),
231+
metadata=metadata,
232+
access=access,
233+
)
294234

295235
@override
296236
async def purge(self) -> None:

0 commit comments

Comments
 (0)