Skip to content

Commit c305634

Browse files
committed
Update public url handling
Refactor storage creation to improve code reuse
1 parent 6e4f55d commit c305634

File tree

6 files changed

+105
-2742
lines changed

6 files changed

+105
-2742
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ keywords = [
3434
"scraping",
3535
]
3636
dependencies = [
37-
"apify-client>=2.0.0,<3.0.0",
37+
"apify-client>=2.1.0,<3.0.0",
3838
"apify-shared>=2.0.0,<3.0.0",
3939
"crawlee==1.0.0rc1",
4040
"cachetools>=5.5.0",

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
from typing_extensions import override
88

9-
from apify_client import ApifyClientAsync
109
from crawlee._utils.byte_size import ByteSize
1110
from crawlee._utils.file import json_dumps
1211
from crawlee.storage_clients._base import DatasetClient
1312
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1413

14+
from apify.storage_clients._apify._utils import create_apify_client, resolve_storage_id
15+
1516
if TYPE_CHECKING:
1617
from collections.abc import AsyncIterator
1718

@@ -39,7 +40,6 @@ def __init__(
3940
self,
4041
*,
4142
api_client: DatasetClientAsync,
42-
api_public_base_url: str,
4343
lock: asyncio.Lock,
4444
) -> None:
4545
"""Initialize a new instance.
@@ -49,9 +49,6 @@ def __init__(
4949
self._api_client = api_client
5050
"""The Apify dataset client for API operations."""
5151

52-
self._api_public_base_url = api_public_base_url
53-
"""The public base URL for accessing the key-value store records."""
54-
5552
self._lock = lock
5653
"""A lock to ensure that only one operation is performed at a time."""
5754

@@ -90,52 +87,16 @@ async def open(
9087
are provided, or if neither `id` nor `name` is provided and no default storage ID is available in
9188
the configuration.
9289
"""
93-
token = configuration.token
94-
if not token:
95-
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')
96-
97-
api_url = configuration.api_base_url
98-
if not api_url:
99-
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')
100-
101-
api_public_base_url = configuration.api_public_base_url
102-
if not api_public_base_url:
103-
raise ValueError(
104-
'Apify storage client requires a valid API public base URL in Configuration '
105-
f'(api_public_base_url={api_public_base_url}).'
106-
)
107-
108-
# Create Apify client with the provided token and API URL.
109-
apify_client_async = ApifyClientAsync(
110-
token=token,
111-
api_url=api_url,
112-
max_retries=8,
113-
min_delay_between_retries_millis=500,
114-
timeout_secs=360,
115-
)
90+
apify_client_async = create_apify_client(configuration=configuration)
11691
apify_datasets_client = apify_client_async.datasets()
11792

118-
# If both id and name are provided, raise an error.
119-
if id and name:
120-
raise ValueError('Only one of "id" or "name" can be specified, not both.')
121-
122-
# If id is provided, get the storage by ID.
123-
if id and name is None:
124-
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
125-
126-
# If name is provided, get or create the storage by name.
127-
if name and id is None:
128-
id = DatasetMetadata.model_validate(
93+
async def id_getter() -> str:
94+
return DatasetMetadata.model_validate(
12995
await apify_datasets_client.get_or_create(name=name),
13096
).id
131-
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
13297

133-
# If both id and name are None, try to get the default storage ID from environment variables.
134-
# The default storage ID environment variable is set by the Apify platform. It also contains
135-
# a new storage ID after Actor's reboot or migration.
136-
if id is None and name is None:
137-
id = configuration.default_dataset_id
138-
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
98+
id = await resolve_storage_id(id, name, default_id=configuration.default_dataset_id, id_getter=id_getter())
99+
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
139100

140101
# Fetch its metadata.
141102
metadata = await apify_dataset_client.get()
@@ -154,7 +115,6 @@ async def open(
154115

155116
return cls(
156117
api_client=apify_dataset_client,
157-
api_public_base_url=api_public_base_url,
158118
lock=asyncio.Lock(),
159119
)
160120

src/apify/storage_clients/_apify/_key_value_store_client.py

Lines changed: 20 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
from typing_extensions import override
88
from yarl import URL
99

10-
from apify_client import ApifyClientAsync
1110
from crawlee.storage_clients._base import KeyValueStoreClient
1211
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata
1312

1413
from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
14+
from ._utils import create_apify_client, resolve_storage_id
1515
from apify._crypto import create_hmac_signature
1616

1717
if TYPE_CHECKING:
@@ -31,7 +31,6 @@ def __init__(
3131
self,
3232
*,
3333
api_client: KeyValueStoreClientAsync,
34-
api_public_base_url: str,
3534
lock: asyncio.Lock,
3635
) -> None:
3736
"""Initialize a new instance.
@@ -41,9 +40,6 @@ def __init__(
4140
self._api_client = api_client
4241
"""The Apify KVS client for API operations."""
4342

44-
self._api_public_base_url = api_public_base_url
45-
"""The public base URL for accessing the key-value store records."""
46-
4743
self._lock = lock
4844
"""A lock to ensure that only one operation is performed at a time."""
4945

@@ -82,55 +78,29 @@ async def open(
8278
are provided, or if neither `id` nor `name` is provided and no default storage ID is available
8379
in the configuration.
8480
"""
85-
token = configuration.token
86-
if not token:
87-
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')
88-
89-
api_url = configuration.api_base_url
90-
if not api_url:
91-
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')
92-
93-
api_public_base_url = configuration.api_public_base_url
94-
if not api_public_base_url:
95-
raise ValueError(
96-
'Apify storage client requires a valid API public base URL in Configuration '
97-
f'(api_public_base_url={api_public_base_url}).'
98-
)
99-
100-
# Create Apify client with the provided token and API URL.
101-
apify_client_async = ApifyClientAsync(
102-
token=token,
103-
api_url=api_url,
104-
max_retries=8,
105-
min_delay_between_retries_millis=500,
106-
timeout_secs=360,
107-
)
81+
apify_client_async = create_apify_client(configuration=configuration)
10882
apify_kvss_client = apify_client_async.key_value_stores()
10983

11084
# If both id and name are provided, raise an error.
11185
if id and name:
11286
raise ValueError('Only one of "id" or "name" can be specified, not both.')
11387

114-
# If id is provided, get the storage by ID.
115-
if id and name is None:
116-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
117-
118-
# If name is provided, get or create the storage by name.
119-
if name and id is None:
120-
id = ApifyKeyValueStoreMetadata.model_validate(
88+
async def id_getter() -> str:
89+
return ApifyKeyValueStoreMetadata.model_validate(
12190
await apify_kvss_client.get_or_create(name=name),
12291
).id
123-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
12492

125-
# If both id and name are None, try to get the default storage ID from environment variables.
126-
# The default storage ID environment variable is set by the Apify platform. It also contains
127-
# a new storage ID after Actor's reboot or migration.
128-
if id is None and name is None:
129-
id = configuration.default_key_value_store_id
130-
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
93+
def client_creator(id: str) -> KeyValueStoreClientAsync:
94+
return apify_client_async.key_value_store(key_value_store_id=id)
13195

132-
# Fetch its metadata.
133-
metadata = await apify_kvs_client.get()
96+
id = await resolve_storage_id(
97+
id, name, default_id=configuration.default_key_value_store_id, id_getter=id_getter()
98+
)
99+
# Create the resource client
100+
apify_rq_client = client_creator(id=id)
101+
102+
# Verify that the storage exists by fetching its metadata.
103+
metadata = await apify_rq_client.get()
134104

135105
# If metadata is None, it means the storage does not exist, so we create it.
136106
if metadata is None:
@@ -146,7 +116,6 @@ async def open(
146116

147117
return cls(
148118
api_client=apify_kvs_client,
149-
api_public_base_url=api_public_base_url,
150119
lock=asyncio.Lock(),
151120
)
152121

@@ -231,7 +200,12 @@ async def get_public_url(self, key: str) -> str:
231200
raise ValueError('resource_id cannot be None when generating a public URL')
232201

233202
public_url = (
234-
URL(self._api_public_base_url) / 'v2' / 'key-value-stores' / self._api_client.resource_id / 'records' / key
203+
URL(self._api_client.root_client.public_base_url)
204+
/ 'v2'
205+
/ 'key-value-stores'
206+
/ self._api_client.resource_id
207+
/ 'records'
208+
/ key
235209
)
236210
metadata = await self.get_metadata()
237211

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212
from cachetools import LRUCache
1313
from typing_extensions import override
1414

15-
from apify_client import ApifyClientAsync
1615
from crawlee._utils.crypto import crypto_random_object_id
1716
from crawlee.storage_clients._base import RequestQueueClient
1817
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1918

2019
from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead
20+
from ._utils import create_apify_client, resolve_storage_id
2121
from apify import Request
2222

2323
if TYPE_CHECKING:
@@ -66,6 +66,7 @@ def __init__(
6666
self,
6767
*,
6868
api_client: RequestQueueClientAsync,
69+
lock: asyncio.Lock,
6970
metadata: RequestQueueMetadata,
7071
) -> None:
7172
"""Initialize a new instance.
@@ -90,7 +91,7 @@ def __init__(
9091
self._should_check_for_forefront_requests = False
9192
"""Whether to check for forefront requests in the next list_head call."""
9293

93-
self._fetch_lock = asyncio.Lock()
94+
self._fetch_lock = lock
9495
"""Fetch lock to minimize race conditions when communicating with API."""
9596

9697
async def _get_metadata_estimate(self) -> RequestQueueMetadata:
@@ -160,51 +161,17 @@ async def open(
160161
are provided, or if neither `id` nor `name` is provided and no default storage ID is available
161162
in the configuration.
162163
"""
163-
token = configuration.token
164-
if not token:
165-
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')
166-
167-
api_url = configuration.api_base_url
168-
if not api_url:
169-
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')
170-
171-
api_public_base_url = configuration.api_public_base_url
172-
if not api_public_base_url:
173-
raise ValueError(
174-
'Apify storage client requires a valid API public base URL in Configuration '
175-
f'(api_public_base_url={api_public_base_url}).'
176-
)
177-
178-
# Create Apify client with the provided token and API URL.
179-
apify_client_async = ApifyClientAsync(
180-
token=token,
181-
api_url=api_url,
182-
max_retries=8,
183-
min_delay_between_retries_millis=500,
184-
timeout_secs=360,
185-
)
164+
apify_client_async = create_apify_client(configuration=configuration)
186165
apify_rqs_client = apify_client_async.request_queues()
187166

188-
match (id, name):
189-
case (None, None):
190-
# If both id and name are None, try to get the default storage ID from environment variables.
191-
# The default storage ID environment variable is set by the Apify platform. It also contains
192-
# a new storage ID after Actor's reboot or migration.
193-
id = configuration.default_request_queue_id
194-
case (None, name):
195-
# If only name is provided, get or create the storage by name.
196-
id = RequestQueueMetadata.model_validate(
197-
await apify_rqs_client.get_or_create(name=name),
198-
).id
199-
case (_, None):
200-
# If only id is provided, use it.
201-
pass
202-
case (_, _):
203-
# If both id and name are provided, raise an error.
204-
raise ValueError('Only one of "id" or "name" can be specified, not both.')
205-
if id is None:
206-
raise RuntimeError('Unreachable code')
167+
async def id_getter() -> str:
168+
return RequestQueueMetadata.model_validate(await apify_rqs_client.get_or_create(name=name)).id
169+
170+
id = await resolve_storage_id(
171+
id, name, default_id=configuration.default_request_queue_id, id_getter=id_getter()
172+
)
207173

174+
# Create the resource client
208175
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
209176
# It should persist across migrated or resurrected Actor runs on the Apify platform.
210177
_api_max_client_key_length = 32
@@ -233,6 +200,7 @@ async def open(
233200

234201
return cls(
235202
api_client=apify_rq_client,
203+
lock=asyncio.Lock(),
236204
metadata=metadata_model,
237205
)
238206

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
from apify_client import ApifyClientAsync
6+
7+
if TYPE_CHECKING:
8+
from collections.abc import Coroutine
9+
10+
from apify import Configuration
11+
12+
13+
def create_apify_client(configuration: Configuration) -> ApifyClientAsync:
14+
"""Create and return an ApifyClientAsync instance using the provided configuration."""
15+
token = configuration.token
16+
if not token:
17+
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')
18+
19+
api_url = configuration.api_base_url
20+
if not api_url:
21+
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')
22+
23+
api_public_base_url = configuration.api_public_base_url
24+
if not api_public_base_url:
25+
raise ValueError(
26+
'Apify storage client requires a valid API public base URL in Configuration '
27+
f'(api_public_base_url={api_public_base_url}).'
28+
)
29+
30+
# Create Apify client with the provided token and API URL.
31+
return ApifyClientAsync(
32+
token=token,
33+
api_url=api_url,
34+
api_public_url=api_public_base_url,
35+
max_retries=8,
36+
min_delay_between_retries_millis=500,
37+
timeout_secs=360,
38+
)
39+
40+
41+
async def resolve_storage_id(
42+
id: str | None, name: str | None, default_id: str, id_getter: Coroutine[None, None, str]
43+
) -> str:
44+
"""Resolve and return the storage ID from arguments."""
45+
try:
46+
match (id, name):
47+
case (None, None):
48+
# If both id and name are None, try to get the default storage ID from environment variables.
49+
# The default storage ID environment variable is set by the Apify platform. It also contains
50+
# a new storage ID after Actor's reboot or migration.
51+
id = default_id
52+
case (None, name):
53+
# If only name is provided, get or create the storage by name.
54+
id = await id_getter
55+
case (_, None):
56+
# If only id is provided, use it.
57+
pass
58+
case (_, _):
59+
# If both id and name are provided, raise an error.
60+
raise ValueError('Only one of "id" or "name" can be specified, not both.')
61+
if id is None:
62+
raise RuntimeError('Unreachable code')
63+
return id
64+
finally:
65+
id_getter.close()

0 commit comments

Comments
 (0)