Skip to content

Commit b00d2b9

Browse files
committed
Use factory in Rq and Dataset
1 parent 1dacd35 commit b00d2b9

File tree

4 files changed

+92
-107
lines changed

4 files changed

+92
-107
lines changed

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 28 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,19 @@
77

88
from typing_extensions import override
99

10+
from apify_client.clients import DatasetClientAsync
1011
from crawlee._utils.byte_size import ByteSize
1112
from crawlee._utils.file import json_dumps
1213
from crawlee.storage_clients._base import DatasetClient
1314
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1415
from crawlee.storages import Dataset
1516

16-
from ._utils import AliasResolver, create_apify_client
17+
from ._utils import ApiClientFactory
1718

1819
if TYPE_CHECKING:
1920
from collections.abc import AsyncIterator
2021

21-
from apify_client.clients import DatasetClientAsync
22+
from apify_client.clients import DatasetCollectionClientAsync
2223
from crawlee._types import JsonSerializable
2324

2425
from apify import Configuration
@@ -101,66 +102,12 @@ async def open(
101102
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
102103
in the configuration.
103104
"""
104-
if sum(1 for param in [id, name, alias] if param is not None) > 1:
105-
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
106-
107-
apify_client_async = create_apify_client(configuration)
108-
apify_datasets_client = apify_client_async.datasets()
109-
110-
# Normalize unnamed default storage in cases where not defined in `configuration.default_dataset_id` to unnamed
111-
# storage aliased as `__default__`
112-
if not any([alias, name, id, configuration.default_dataset_id]):
113-
alias = '__default__'
114-
115-
if alias:
116-
# Check if there is pre-existing alias mapping in the default KVS.
117-
async with AliasResolver(storage_type=Dataset, alias=alias, configuration=configuration) as _alias:
118-
id = await _alias.resolve_id()
119-
120-
# There was no pre-existing alias in the mapping.
121-
# Create a new unnamed storage and store the mapping.
122-
if id is None:
123-
new_storage_metadata = DatasetMetadata.model_validate(
124-
await apify_datasets_client.get_or_create(),
125-
)
126-
id = new_storage_metadata.id
127-
await _alias.store_mapping(storage_id=id)
128-
129-
# If name is provided, get or create the storage by name.
130-
elif name:
131-
id = DatasetMetadata.model_validate(
132-
await apify_datasets_client.get_or_create(name=name),
133-
).id
134-
135-
# If none are provided, try to get the default storage ID from environment variables.
136-
elif id is None:
137-
id = configuration.default_dataset_id
138-
if not id:
139-
raise ValueError(
140-
'Dataset "id", "name", or "alias" must be specified, '
141-
'or a default dataset ID must be set in the configuration.'
142-
)
143-
144-
# Now create the client for the determined ID
145-
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
146-
147-
# Fetch its metadata.
148-
metadata = await apify_dataset_client.get()
149-
150-
# If metadata is None, it means the storage does not exist, so we create it.
151-
if metadata is None:
152-
id = DatasetMetadata.model_validate(
153-
await apify_datasets_client.get_or_create(),
154-
).id
155-
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
156-
157-
# Verify that the storage exists by fetching its metadata again.
158-
metadata = await apify_dataset_client.get()
159-
if metadata is None:
160-
raise ValueError(f'Opening dataset with id={id}, name={name}, and alias={alias} failed.')
105+
api_client, _ = await DatasetApiClientFactory(
106+
configuration=configuration, alias=alias, name=name, id=id
107+
).get_client_with_metadata()
161108

162109
return cls(
163-
api_client=apify_dataset_client,
110+
api_client=api_client,
164111
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
165112
lock=asyncio.Lock(),
166113
)
@@ -309,3 +256,24 @@ async def _chunk_by_size(self, items: AsyncIterator[str]) -> AsyncIterator[str]:
309256
last_chunk_size = payload_size + ByteSize(2) # Add 2 bytes for [] wrapper.
310257

311258
yield f'[{",".join(current_chunk)}]'
259+
260+
261+
class DatasetApiClientFactory(ApiClientFactory[DatasetClientAsync, DatasetMetadata]):
262+
@property
263+
def _collection_client(self) -> DatasetCollectionClientAsync:
264+
return self._api_client.datasets()
265+
266+
def _get_resource_client(self, id: str) -> DatasetClientAsync:
267+
return self._api_client.dataset(dataset_id=id)
268+
269+
@property
270+
def _default_id(self) -> str | None:
271+
return self._configuration.default_dataset_id
272+
273+
@property
274+
def _storage_type(self) -> type[Dataset]:
275+
return Dataset
276+
277+
@staticmethod
278+
def _get_metadata(raw_metadata: dict | None) -> DatasetMetadata:
279+
return DatasetMetadata.model_validate(raw_metadata)

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,6 @@
2929
logger = getLogger(__name__)
3030

3131

32-
class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]):
33-
@property
34-
def _collection_client(self) -> KeyValueStoreCollectionClientAsync:
35-
return self._api_client.key_value_stores()
36-
37-
def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync:
38-
return self._api_client.key_value_store(key_value_store_id=id)
39-
40-
@property
41-
def _default_id(self) -> str | None:
42-
return self._configuration.default_key_value_store_id
43-
44-
@property
45-
def _storage_type(self) -> type[KeyValueStore]:
46-
return KeyValueStore
47-
48-
@staticmethod
49-
def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata:
50-
return ApifyKeyValueStoreMetadata.model_validate(raw_metadata)
51-
52-
5332
class ApifyKeyValueStoreClient(KeyValueStoreClient):
5433
"""An Apify platform implementation of the key-value store client."""
5534

@@ -116,9 +95,9 @@ async def open(
11695
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
11796
in the configuration.
11897
"""
119-
api_client, _ =await KvsApiClientFactory(
120-
configuration=configuration, alias=alias, name=name, id=id
121-
).get_client_with_metadata()
98+
api_client, _ = await KvsApiClientFactory(
99+
configuration=configuration, alias=alias, name=name, id=id
100+
).get_client_with_metadata()
122101
return cls(
123102
api_client=api_client,
124103
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
@@ -203,3 +182,24 @@ async def get_public_url(self, key: str) -> str:
203182
A public URL that can be used to access the value of the given key in the KVS.
204183
"""
205184
return await self._api_client.get_record_public_url(key=key)
185+
186+
187+
class KvsApiClientFactory(ApiClientFactory[KeyValueStoreClientAsync, ApifyKeyValueStoreMetadata]):
188+
@property
189+
def _collection_client(self) -> KeyValueStoreCollectionClientAsync:
190+
return self._api_client.key_value_stores()
191+
192+
def _get_resource_client(self, id: str) -> KeyValueStoreClientAsync:
193+
return self._api_client.key_value_store(key_value_store_id=id)
194+
195+
@property
196+
def _default_id(self) -> str | None:
197+
return self._configuration.default_key_value_store_id
198+
199+
@property
200+
def _storage_type(self) -> type[KeyValueStore]:
201+
return KeyValueStore
202+
203+
@staticmethod
204+
def _get_metadata(raw_metadata: dict | None) -> ApifyKeyValueStoreMetadata:
205+
return ApifyKeyValueStoreMetadata.model_validate(raw_metadata)

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55

66
from typing_extensions import override
77

8+
from apify_client.clients import RequestQueueClientAsync
89
from crawlee.storage_clients._base import RequestQueueClient
10+
from crawlee.storages import RequestQueue
911

1012
from ._models import ApifyRequestQueueMetadata, RequestQueueStats
1113
from ._request_queue_shared_client import ApifyRequestQueueSharedClient
1214
from ._request_queue_single_client import ApifyRequestQueueSingleClient
15+
from ._utils import ApiClientFactory
1316

1417
if TYPE_CHECKING:
1518
from collections.abc import Sequence
1619

17-
from apify_client.clients import RequestQueueClientAsync
20+
from apify_client.clients import RequestQueueCollectionClientAsync
1821
from crawlee import Request
1922
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
2023

@@ -221,13 +224,11 @@ async def open(
221224
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
222225
in the configuration.
223226
"""
224-
_api_client, metadata =await RqApiClientFactory(
225-
configuration=configuration, alias=alias, name=name, id=id
226-
).get_client_with_metadata()
227+
_api_client, metadata = await RqApiClientFactory(
228+
configuration=configuration, alias=alias, name=name, id=id
229+
).get_client_with_metadata()
227230
return cls(
228-
api_client=await RqApiClientFactory(
229-
configuration=configuration, alias=alias, name=name, id=id
230-
).create_api_client(),
231+
api_client=_api_client,
231232
metadata=metadata,
232233
access=access,
233234
)
@@ -242,3 +243,24 @@ async def purge(self) -> None:
242243
@override
243244
async def drop(self) -> None:
244245
await self._api_client.delete()
246+
247+
248+
class RqApiClientFactory(ApiClientFactory[RequestQueueClientAsync, ApifyRequestQueueMetadata]):
249+
@property
250+
def _collection_client(self) -> RequestQueueCollectionClientAsync:
251+
return self._api_client.request_queues()
252+
253+
def _get_resource_client(self, id: str) -> RequestQueueClientAsync:
254+
return self._api_client.request_queue(request_queue_id=id)
255+
256+
@property
257+
def _default_id(self) -> str | None:
258+
return self._configuration.default_request_queue_id
259+
260+
@property
261+
def _storage_type(self) -> type[RequestQueue]:
262+
return RequestQueue
263+
264+
@staticmethod
265+
def _get_metadata(raw_metadata: dict | None) -> ApifyRequestQueueMetadata:
266+
return ApifyRequestQueueMetadata.model_validate(raw_metadata)

src/apify/storage_clients/_apify/_utils.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@
4242
TResourceClient = TypeVar(
4343
'TResourceClient', bound=KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync
4444
)
45-
TStorageMetadata = TypeVar(
46-
'TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata
47-
)
45+
TStorageMetadata = TypeVar('TStorageMetadata', bound=KeyValueStoreMetadata | RequestQueueMetadata | DatasetMetadata)
46+
4847

4948
class AliasResolver:
5049
"""Class for handling aliases.
@@ -245,26 +244,24 @@ def __init__(self, configuration: Configuration, alias: str | None, name: str |
245244
if sum(1 for param in [id, name, alias] if param is not None) > 1:
246245
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
247246

248-
self.alias = alias
249-
self.name = name
250-
self.id = id
247+
self._alias = alias
248+
self._name = name
249+
self._id = id
251250
self._configuration = configuration
252251
self._api_client = create_apify_client(configuration)
253252

254253
async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetadata]:
255-
match (self.alias, self.name, self.id, self._default_id):
254+
match (self._alias, self._name, self._id, self._default_id):
256255
case (None, None, None, None):
257256
# Normalize unnamed default storage in cases where not defined in `self._default_id` to
258257
# unnamed storage aliased as `__default__`. Used only when running locally.
259258
return await self._open_by_alias('__default__')
260259

261260
case (str(), None, None, _):
262-
return await self._open_by_alias(self.alias)
261+
return await self._open_by_alias(self._alias)
263262

264263
case (None, None, None, str()):
265-
# Now create the client for the determined ID
266264
resource_client = self._get_resource_client(id=self._default_id)
267-
# Fetch its metadata.
268265
raw_metadata = await resource_client.get()
269266
metadata = self._get_metadata(raw_metadata)
270267
if not raw_metadata:
@@ -276,22 +273,20 @@ async def get_client_with_metadata(self) -> tuple[TResourceClient, TStorageMetad
276273
return resource_client, metadata
277274

278275
case (None, str(), None, _):
279-
metadata = self._get_metadata(
280-
await self._collection_client.get_or_create(name=self.name))
276+
metadata = self._get_metadata(await self._collection_client.get_or_create(name=self._name))
281277
# Freshly fetched named storage. No need to fetch it again.
282278
return self._get_resource_client(id=metadata.id), metadata
283279

284280
case (None, None, str(), _):
285-
# Now create the client for the determined ID
286-
resource_client = self._get_resource_client(id=self.id)
281+
resource_client = self._get_resource_client(id=self._id)
287282
# Fetch its metadata.
288283
raw_metadata = await resource_client.get()
289284
# If metadata is None, it means the storage does not exist.
290285
if raw_metadata is None:
291-
raise ValueError(f'Opening key-value store with id={self.id} failed.')
286+
raise ValueError(f'Opening {self._storage_type} with id={self._id} failed.')
292287
return resource_client, self._get_metadata(raw_metadata)
293288

294-
raise RuntimeError('Will never happen')
289+
raise RuntimeError('Unreachable code')
295290

296291
@property
297292
@abstractmethod

0 commit comments

Comments
 (0)